xref: /aosp_15_r20/external/pigweed/pw_multisink/multisink_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker // Copyright 2021 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker 
15*61c4878aSAndroid Build Coastguard Worker #include "pw_multisink/multisink.h"
16*61c4878aSAndroid Build Coastguard Worker 
17*61c4878aSAndroid Build Coastguard Worker #include <array>
18*61c4878aSAndroid Build Coastguard Worker #include <cstdint>
19*61c4878aSAndroid Build Coastguard Worker #include <cstring>
20*61c4878aSAndroid Build Coastguard Worker #include <optional>
21*61c4878aSAndroid Build Coastguard Worker #include <string_view>
22*61c4878aSAndroid Build Coastguard Worker 
23*61c4878aSAndroid Build Coastguard Worker #include "pw_function/function.h"
24*61c4878aSAndroid Build Coastguard Worker #include "pw_span/span.h"
25*61c4878aSAndroid Build Coastguard Worker #include "pw_status/status.h"
26*61c4878aSAndroid Build Coastguard Worker #include "pw_unit_test/framework.h"
27*61c4878aSAndroid Build Coastguard Worker 
28*61c4878aSAndroid Build Coastguard Worker namespace pw::multisink {
29*61c4878aSAndroid Build Coastguard Worker namespace {
30*61c4878aSAndroid Build Coastguard Worker 
31*61c4878aSAndroid Build Coastguard Worker using Drain = MultiSink::Drain;
32*61c4878aSAndroid Build Coastguard Worker using Listener = MultiSink::Listener;
33*61c4878aSAndroid Build Coastguard Worker 
34*61c4878aSAndroid Build Coastguard Worker class CountingListener : public Listener {
35*61c4878aSAndroid Build Coastguard Worker  public:
OnNewEntryAvailable()36*61c4878aSAndroid Build Coastguard Worker   void OnNewEntryAvailable() override { notification_count_++; }
37*61c4878aSAndroid Build Coastguard Worker 
GetNotificationCount()38*61c4878aSAndroid Build Coastguard Worker   size_t GetNotificationCount() { return notification_count_; }
39*61c4878aSAndroid Build Coastguard Worker 
ResetNotificationCount()40*61c4878aSAndroid Build Coastguard Worker   void ResetNotificationCount() { notification_count_ = 0; }
41*61c4878aSAndroid Build Coastguard Worker 
42*61c4878aSAndroid Build Coastguard Worker  private:
43*61c4878aSAndroid Build Coastguard Worker   size_t notification_count_ = 0;
44*61c4878aSAndroid Build Coastguard Worker };
45*61c4878aSAndroid Build Coastguard Worker 
46*61c4878aSAndroid Build Coastguard Worker class EntriesSizeMonitorListener : public Listener {
47*61c4878aSAndroid Build Coastguard Worker  public:
EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain & drain)48*61c4878aSAndroid Build Coastguard Worker   EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain& drain)
49*61c4878aSAndroid Build Coastguard Worker       : last_seen_unread_size_(0u), drain_(drain) {}
OnNewEntryAvailable()50*61c4878aSAndroid Build Coastguard Worker   void OnNewEntryAvailable() override {
51*61c4878aSAndroid Build Coastguard Worker     last_seen_unread_size_ = drain_.UnsafeGetUnreadEntriesSize();
52*61c4878aSAndroid Build Coastguard Worker   }
53*61c4878aSAndroid Build Coastguard Worker 
GetLastSeenUnreadEntriesSize() const54*61c4878aSAndroid Build Coastguard Worker   size_t GetLastSeenUnreadEntriesSize() const { return last_seen_unread_size_; }
55*61c4878aSAndroid Build Coastguard Worker 
56*61c4878aSAndroid Build Coastguard Worker  private:
57*61c4878aSAndroid Build Coastguard Worker   size_t last_seen_unread_size_;
58*61c4878aSAndroid Build Coastguard Worker   pw::multisink::MultiSink::Drain& drain_;
59*61c4878aSAndroid Build Coastguard Worker };
60*61c4878aSAndroid Build Coastguard Worker 
61*61c4878aSAndroid Build Coastguard Worker class MultiSinkTest : public ::testing::Test {
62*61c4878aSAndroid Build Coastguard Worker  protected:
63*61c4878aSAndroid Build Coastguard Worker   static constexpr std::byte kMessage[] = {
64*61c4878aSAndroid Build Coastguard Worker       (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
65*61c4878aSAndroid Build Coastguard Worker   static constexpr std::byte kMessageOther[] = {
66*61c4878aSAndroid Build Coastguard Worker       (std::byte)0x12, (std::byte)0x34, (std::byte)0x56, (std::byte)0x78};
67*61c4878aSAndroid Build Coastguard Worker   static constexpr size_t kMaxDrains = 3;
68*61c4878aSAndroid Build Coastguard Worker   static constexpr size_t kMaxListeners = 3;
69*61c4878aSAndroid Build Coastguard Worker   static constexpr size_t kEntryBufferSize = 1024;
70*61c4878aSAndroid Build Coastguard Worker   static constexpr size_t kBufferSize = 5 * kEntryBufferSize;
71*61c4878aSAndroid Build Coastguard Worker 
MultiSinkTest()72*61c4878aSAndroid Build Coastguard Worker   MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
73*61c4878aSAndroid Build Coastguard Worker 
74*61c4878aSAndroid Build Coastguard Worker   // Expects the peeked or popped message to equal the provided non-empty
75*61c4878aSAndroid Build Coastguard Worker   // message, and the drop count to match. If `expected_message` is empty, the
76*61c4878aSAndroid Build Coastguard Worker   // Pop call status expected is OUT_OF_RANGE.
ExpectMessageAndDropCounts(Result<ConstByteSpan> & result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)77*61c4878aSAndroid Build Coastguard Worker   void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
78*61c4878aSAndroid Build Coastguard Worker                                   uint32_t result_drop_count,
79*61c4878aSAndroid Build Coastguard Worker                                   uint32_t result_ingress_drop_count,
80*61c4878aSAndroid Build Coastguard Worker                                   std::optional<ConstByteSpan> expected_message,
81*61c4878aSAndroid Build Coastguard Worker                                   uint32_t expected_drop_count,
82*61c4878aSAndroid Build Coastguard Worker                                   uint32_t expected_ingress_drop_count) {
83*61c4878aSAndroid Build Coastguard Worker     if (!expected_message.has_value()) {
84*61c4878aSAndroid Build Coastguard Worker       EXPECT_EQ(Status::OutOfRange(), result.status());
85*61c4878aSAndroid Build Coastguard Worker     } else {
86*61c4878aSAndroid Build Coastguard Worker       ASSERT_EQ(result.status(), OkStatus());
87*61c4878aSAndroid Build Coastguard Worker       if (!expected_message.value().empty()) {
88*61c4878aSAndroid Build Coastguard Worker         ASSERT_FALSE(result.value().empty());
89*61c4878aSAndroid Build Coastguard Worker         ASSERT_EQ(result.value().size_bytes(),
90*61c4878aSAndroid Build Coastguard Worker                   expected_message.value().size_bytes());
91*61c4878aSAndroid Build Coastguard Worker         EXPECT_EQ(memcmp(result.value().data(),
92*61c4878aSAndroid Build Coastguard Worker                          expected_message.value().data(),
93*61c4878aSAndroid Build Coastguard Worker                          expected_message.value().size_bytes()),
94*61c4878aSAndroid Build Coastguard Worker                   0);
95*61c4878aSAndroid Build Coastguard Worker       }
96*61c4878aSAndroid Build Coastguard Worker     }
97*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(result_drop_count, expected_drop_count);
98*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
99*61c4878aSAndroid Build Coastguard Worker   }
100*61c4878aSAndroid Build Coastguard Worker 
VerifyPopEntry(Drain & drain,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)101*61c4878aSAndroid Build Coastguard Worker   void VerifyPopEntry(Drain& drain,
102*61c4878aSAndroid Build Coastguard Worker                       std::optional<ConstByteSpan> expected_message,
103*61c4878aSAndroid Build Coastguard Worker                       uint32_t expected_drop_count,
104*61c4878aSAndroid Build Coastguard Worker                       uint32_t expected_ingress_drop_count) {
105*61c4878aSAndroid Build Coastguard Worker     uint32_t drop_count = 0;
106*61c4878aSAndroid Build Coastguard Worker     uint32_t ingress_drop_count = 0;
107*61c4878aSAndroid Build Coastguard Worker     Result<ConstByteSpan> result =
108*61c4878aSAndroid Build Coastguard Worker         drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
109*61c4878aSAndroid Build Coastguard Worker     ExpectMessageAndDropCounts(result,
110*61c4878aSAndroid Build Coastguard Worker                                drop_count,
111*61c4878aSAndroid Build Coastguard Worker                                ingress_drop_count,
112*61c4878aSAndroid Build Coastguard Worker                                expected_message,
113*61c4878aSAndroid Build Coastguard Worker                                expected_drop_count,
114*61c4878aSAndroid Build Coastguard Worker                                expected_ingress_drop_count);
115*61c4878aSAndroid Build Coastguard Worker   }
116*61c4878aSAndroid Build Coastguard Worker 
VerifyPeekResult(const Result<Drain::PeekedEntry> & peek_result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)117*61c4878aSAndroid Build Coastguard Worker   void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
118*61c4878aSAndroid Build Coastguard Worker                         uint32_t result_drop_count,
119*61c4878aSAndroid Build Coastguard Worker                         uint32_t result_ingress_drop_count,
120*61c4878aSAndroid Build Coastguard Worker                         std::optional<ConstByteSpan> expected_message,
121*61c4878aSAndroid Build Coastguard Worker                         uint32_t expected_drop_count,
122*61c4878aSAndroid Build Coastguard Worker                         uint32_t expected_ingress_drop_count) {
123*61c4878aSAndroid Build Coastguard Worker     if (peek_result.ok()) {
124*61c4878aSAndroid Build Coastguard Worker       ASSERT_FALSE(peek_result.value().entry().empty());
125*61c4878aSAndroid Build Coastguard Worker       Result<ConstByteSpan> verify_result(peek_result.value().entry());
126*61c4878aSAndroid Build Coastguard Worker       ExpectMessageAndDropCounts(verify_result,
127*61c4878aSAndroid Build Coastguard Worker                                  result_drop_count,
128*61c4878aSAndroid Build Coastguard Worker                                  result_ingress_drop_count,
129*61c4878aSAndroid Build Coastguard Worker                                  expected_message,
130*61c4878aSAndroid Build Coastguard Worker                                  expected_drop_count,
131*61c4878aSAndroid Build Coastguard Worker                                  expected_ingress_drop_count);
132*61c4878aSAndroid Build Coastguard Worker       return;
133*61c4878aSAndroid Build Coastguard Worker     }
134*61c4878aSAndroid Build Coastguard Worker     if (expected_message.has_value()) {
135*61c4878aSAndroid Build Coastguard Worker       // Fail since we expected OkStatus.
136*61c4878aSAndroid Build Coastguard Worker       ASSERT_EQ(peek_result.status(), OkStatus());
137*61c4878aSAndroid Build Coastguard Worker     }
138*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(Status::OutOfRange(), peek_result.status());
139*61c4878aSAndroid Build Coastguard Worker   }
140*61c4878aSAndroid Build Coastguard Worker 
ExpectNotificationCount(CountingListener & listener,size_t expected_notification_count)141*61c4878aSAndroid Build Coastguard Worker   void ExpectNotificationCount(CountingListener& listener,
142*61c4878aSAndroid Build Coastguard Worker                                size_t expected_notification_count) {
143*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(listener.GetNotificationCount(), expected_notification_count);
144*61c4878aSAndroid Build Coastguard Worker     listener.ResetNotificationCount();
145*61c4878aSAndroid Build Coastguard Worker   }
146*61c4878aSAndroid Build Coastguard Worker 
147*61c4878aSAndroid Build Coastguard Worker   std::byte buffer_[kBufferSize];
148*61c4878aSAndroid Build Coastguard Worker   std::byte entry_buffer_[kEntryBufferSize];
149*61c4878aSAndroid Build Coastguard Worker   CountingListener listeners_[kMaxListeners];
150*61c4878aSAndroid Build Coastguard Worker   Drain drains_[kMaxDrains];
151*61c4878aSAndroid Build Coastguard Worker   MultiSink multisink_;
152*61c4878aSAndroid Build Coastguard Worker };
153*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,SingleDrain)154*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, SingleDrain) {
155*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
156*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachListener(listeners_[0]);
157*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 1u);
158*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
159*61c4878aSAndroid Build Coastguard Worker 
160*61c4878aSAndroid Build Coastguard Worker   // Single entry push and pop.
161*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 1u);
162*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
163*61c4878aSAndroid Build Coastguard Worker   // Single empty entry push and pop.
164*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(ConstByteSpan());
165*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 1u);
166*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
167*61c4878aSAndroid Build Coastguard Worker 
168*61c4878aSAndroid Build Coastguard Worker   // Multiple entries with intermittent drops.
169*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
170*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleDropped();
171*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
172*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 3u);
173*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
174*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
175*61c4878aSAndroid Build Coastguard Worker 
176*61c4878aSAndroid Build Coastguard Worker   // Send drops only.
177*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleDropped();
178*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 1u);
179*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
180*61c4878aSAndroid Build Coastguard Worker 
181*61c4878aSAndroid Build Coastguard Worker   // Confirm out-of-range if no entries are expected.
182*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 0u);
183*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
184*61c4878aSAndroid Build Coastguard Worker }
185*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,MultipleDrain)186*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, MultipleDrain) {
187*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
188*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[1]);
189*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachListener(listeners_[0]);
190*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachListener(listeners_[1]);
191*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 1u);
192*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[1], 1u);
193*61c4878aSAndroid Build Coastguard Worker 
194*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
195*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
196*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleDropped();
197*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
198*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleDropped();
199*61c4878aSAndroid Build Coastguard Worker 
200*61c4878aSAndroid Build Coastguard Worker   // Drain one drain entirely.
201*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 5u);
202*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[1], 5u);
203*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
204*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
205*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
206*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
207*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
208*61c4878aSAndroid Build Coastguard Worker 
209*61c4878aSAndroid Build Coastguard Worker   // Confirm the other drain can be drained separately.
210*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 0u);
211*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[1], 0u);
212*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
213*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
214*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
215*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
216*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
217*61c4878aSAndroid Build Coastguard Worker }
218*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,LateDrainRegistration)219*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, LateDrainRegistration) {
220*61c4878aSAndroid Build Coastguard Worker   // Drains attached after entries are pushed should still observe those entries
221*61c4878aSAndroid Build Coastguard Worker   // if they have not been evicted from the ring buffer.
222*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
223*61c4878aSAndroid Build Coastguard Worker 
224*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
225*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachListener(listeners_[0]);
226*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 1u);
227*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
228*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
229*61c4878aSAndroid Build Coastguard Worker 
230*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
231*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 1u);
232*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
233*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
234*61c4878aSAndroid Build Coastguard Worker }
235*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,DynamicDrainRegistration)236*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, DynamicDrainRegistration) {
237*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
238*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachListener(listeners_[0]);
239*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 1u);
240*61c4878aSAndroid Build Coastguard Worker 
241*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleDropped();
242*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
243*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleDropped();
244*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
245*61c4878aSAndroid Build Coastguard Worker 
246*61c4878aSAndroid Build Coastguard Worker   // Drain out one message and detach it.
247*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 4u);
248*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
249*61c4878aSAndroid Build Coastguard Worker   multisink_.DetachDrain(drains_[0]);
250*61c4878aSAndroid Build Coastguard Worker   multisink_.DetachListener(listeners_[0]);
251*61c4878aSAndroid Build Coastguard Worker 
252*61c4878aSAndroid Build Coastguard Worker   // Re-attaching the drain should reproduce the last observed message. Note
253*61c4878aSAndroid Build Coastguard Worker   // that notifications are not expected, nor are drops observed before the
254*61c4878aSAndroid Build Coastguard Worker   // first valid message in the buffer.
255*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
256*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachListener(listeners_[0]);
257*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 1u);
258*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
259*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
260*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
261*61c4878aSAndroid Build Coastguard Worker 
262*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
263*61c4878aSAndroid Build Coastguard Worker   ExpectNotificationCount(listeners_[0], 1u);
264*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
265*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
266*61c4878aSAndroid Build Coastguard Worker }
267*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,TooSmallBuffer)268*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, TooSmallBuffer) {
269*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
270*61c4878aSAndroid Build Coastguard Worker 
271*61c4878aSAndroid Build Coastguard Worker   // Insert an entry and a drop, then try to read into an insufficient buffer.
272*61c4878aSAndroid Build Coastguard Worker   uint32_t drop_count = 0;
273*61c4878aSAndroid Build Coastguard Worker   uint32_t ingress_drop_count = 0;
274*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleDropped();
275*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
276*61c4878aSAndroid Build Coastguard Worker 
277*61c4878aSAndroid Build Coastguard Worker   // Attempting to acquire an entry with a small buffer should result in
278*61c4878aSAndroid Build Coastguard Worker   // RESOURCE_EXHAUSTED and remove it.
279*61c4878aSAndroid Build Coastguard Worker   Result<ConstByteSpan> result = drains_[0].PopEntry(
280*61c4878aSAndroid Build Coastguard Worker       span(entry_buffer_, 1), drop_count, ingress_drop_count);
281*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(result.status(), Status::ResourceExhausted());
282*61c4878aSAndroid Build Coastguard Worker 
283*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
284*61c4878aSAndroid Build Coastguard Worker }
285*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,Iterator)286*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, Iterator) {
287*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
288*61c4878aSAndroid Build Coastguard Worker 
289*61c4878aSAndroid Build Coastguard Worker   // Insert entries and consume them all.
290*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
291*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
292*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
293*61c4878aSAndroid Build Coastguard Worker 
294*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
295*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
296*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
297*61c4878aSAndroid Build Coastguard Worker 
298*61c4878aSAndroid Build Coastguard Worker   // Confirm that the iterator still observes the messages in the ring buffer.
299*61c4878aSAndroid Build Coastguard Worker   size_t iterated_entries = 0;
300*61c4878aSAndroid Build Coastguard Worker   for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
301*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
302*61c4878aSAndroid Build Coastguard Worker     iterated_entries++;
303*61c4878aSAndroid Build Coastguard Worker   }
304*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(iterated_entries, 3u);
305*61c4878aSAndroid Build Coastguard Worker }
306*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,IteratorNoDrains)307*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, IteratorNoDrains) {
308*61c4878aSAndroid Build Coastguard Worker   // Insert entries with no drains attached. Even though there are no consumers,
309*61c4878aSAndroid Build Coastguard Worker   // iterators should still walk from the oldest entry.
310*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
311*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
312*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
313*61c4878aSAndroid Build Coastguard Worker 
314*61c4878aSAndroid Build Coastguard Worker   // Confirm that the iterator still observes the messages in the ring buffer.
315*61c4878aSAndroid Build Coastguard Worker   size_t iterated_entries = 0;
316*61c4878aSAndroid Build Coastguard Worker   for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
317*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
318*61c4878aSAndroid Build Coastguard Worker     iterated_entries++;
319*61c4878aSAndroid Build Coastguard Worker   }
320*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(iterated_entries, 3u);
321*61c4878aSAndroid Build Coastguard Worker }
322*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,IteratorNoEntries)323*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, IteratorNoEntries) {
324*61c4878aSAndroid Build Coastguard Worker   // Attach a drain, but don't add any entries.
325*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
326*61c4878aSAndroid Build Coastguard Worker   // Confirm that the iterator has no entries.
327*61c4878aSAndroid Build Coastguard Worker   MultiSink::UnsafeIterationWrapper unsafe_iterator =
328*61c4878aSAndroid Build Coastguard Worker       multisink_.UnsafeIteration();
329*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
330*61c4878aSAndroid Build Coastguard Worker }
331*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,PeekEntryNoEntries)332*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, PeekEntryNoEntries) {
333*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
334*61c4878aSAndroid Build Coastguard Worker 
335*61c4878aSAndroid Build Coastguard Worker   // Peek empty multisink.
336*61c4878aSAndroid Build Coastguard Worker   uint32_t drop_count = 0;
337*61c4878aSAndroid Build Coastguard Worker   uint32_t ingress_drop_count = 0;
338*61c4878aSAndroid Build Coastguard Worker   auto peek_result =
339*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
340*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
341*61c4878aSAndroid Build Coastguard Worker }
342*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,PeekAndPop)343*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, PeekAndPop) {
344*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
345*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[1]);
346*61c4878aSAndroid Build Coastguard Worker 
347*61c4878aSAndroid Build Coastguard Worker   // Peek entry after multisink has some entries.
348*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
349*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessageOther);
350*61c4878aSAndroid Build Coastguard Worker   uint32_t drop_count = 0;
351*61c4878aSAndroid Build Coastguard Worker   uint32_t ingress_drop_count = 0;
352*61c4878aSAndroid Build Coastguard Worker   auto first_peek_result =
353*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
354*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(
355*61c4878aSAndroid Build Coastguard Worker       first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
356*61c4878aSAndroid Build Coastguard Worker 
357*61c4878aSAndroid Build Coastguard Worker   // Multiple peeks must return the front message.
358*61c4878aSAndroid Build Coastguard Worker   auto peek_duplicate =
359*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
360*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(
361*61c4878aSAndroid Build Coastguard Worker       peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
362*61c4878aSAndroid Build Coastguard Worker   // A second drain must peek the front message.
363*61c4878aSAndroid Build Coastguard Worker   auto peek_other_drain =
364*61c4878aSAndroid Build Coastguard Worker       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
365*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(
366*61c4878aSAndroid Build Coastguard Worker       peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
367*61c4878aSAndroid Build Coastguard Worker 
368*61c4878aSAndroid Build Coastguard Worker   // After a drain pops a peeked entry, the next peek call must return the next
369*61c4878aSAndroid Build Coastguard Worker   // message.
370*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
371*61c4878aSAndroid Build Coastguard Worker   auto second_peek_result =
372*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
373*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(
374*61c4878aSAndroid Build Coastguard Worker       second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
375*61c4878aSAndroid Build Coastguard Worker   // Slower readers must be unchanged.
376*61c4878aSAndroid Build Coastguard Worker   auto peek_other_drain_duplicate =
377*61c4878aSAndroid Build Coastguard Worker       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
378*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(peek_other_drain_duplicate,
379*61c4878aSAndroid Build Coastguard Worker                    drop_count,
380*61c4878aSAndroid Build Coastguard Worker                    ingress_drop_count,
381*61c4878aSAndroid Build Coastguard Worker                    kMessage,
382*61c4878aSAndroid Build Coastguard Worker                    0,
383*61c4878aSAndroid Build Coastguard Worker                    0);
384*61c4878aSAndroid Build Coastguard Worker 
385*61c4878aSAndroid Build Coastguard Worker   // PopEntry prior to popping the previously peeked entry.
386*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
387*61c4878aSAndroid Build Coastguard Worker   // Popping an entry already handled must not trigger errors.
388*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
389*61c4878aSAndroid Build Coastguard Worker   // Popping with an old peek context must not trigger errors.
390*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
391*61c4878aSAndroid Build Coastguard Worker 
392*61c4878aSAndroid Build Coastguard Worker   // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
393*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
394*61c4878aSAndroid Build Coastguard Worker   auto empty_peek_result =
395*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
396*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(
397*61c4878aSAndroid Build Coastguard Worker       empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
398*61c4878aSAndroid Build Coastguard Worker 
399*61c4878aSAndroid Build Coastguard Worker   // // Slower readers must be unchanged.
400*61c4878aSAndroid Build Coastguard Worker   auto peek_other_drain_unchanged =
401*61c4878aSAndroid Build Coastguard Worker       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
402*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(peek_other_drain_unchanged,
403*61c4878aSAndroid Build Coastguard Worker                    drop_count,
404*61c4878aSAndroid Build Coastguard Worker                    ingress_drop_count,
405*61c4878aSAndroid Build Coastguard Worker                    kMessage,
406*61c4878aSAndroid Build Coastguard Worker                    0,
407*61c4878aSAndroid Build Coastguard Worker                    0);
408*61c4878aSAndroid Build Coastguard Worker }
409*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,PeekReportsIngressDropCount)410*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
411*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
412*61c4878aSAndroid Build Coastguard Worker 
413*61c4878aSAndroid Build Coastguard Worker   // Peek entry after multisink has some entries.
414*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
415*61c4878aSAndroid Build Coastguard Worker   const uint32_t ingress_drops = 10;
416*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleDropped(ingress_drops);
417*61c4878aSAndroid Build Coastguard Worker 
418*61c4878aSAndroid Build Coastguard Worker   uint32_t drop_count = 0;
419*61c4878aSAndroid Build Coastguard Worker   uint32_t ingress_drop_count = 0;
420*61c4878aSAndroid Build Coastguard Worker   auto peek_result1 =
421*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
422*61c4878aSAndroid Build Coastguard Worker   // No drops reported until the drain finds a gap in the sequence IDs.
423*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(
424*61c4878aSAndroid Build Coastguard Worker       peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
425*61c4878aSAndroid Build Coastguard Worker 
426*61c4878aSAndroid Build Coastguard Worker   // Popping the peeked entry advances the drain, and a new peek will find the
427*61c4878aSAndroid Build Coastguard Worker   // gap in sequence IDs.
428*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
429*61c4878aSAndroid Build Coastguard Worker   auto peek_result2 =
430*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
431*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
432*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drop_count, 0u);
433*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(ingress_drop_count, ingress_drops);
434*61c4878aSAndroid Build Coastguard Worker }
435*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,PeekReportsSlowDrainDropCount)436*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
437*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
438*61c4878aSAndroid Build Coastguard Worker 
439*61c4878aSAndroid Build Coastguard Worker   // Add entries until buffer is full and drain has to be advanced.
440*61c4878aSAndroid Build Coastguard Worker   // The sequence ID takes 1 byte when less than 128.
441*61c4878aSAndroid Build Coastguard Worker   const size_t max_multisink_messages = 128;
442*61c4878aSAndroid Build Coastguard Worker   const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
443*61c4878aSAndroid Build Coastguard Worker   // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
444*61c4878aSAndroid Build Coastguard Worker   const size_t message_size = buffer_entry_size - 2;
445*61c4878aSAndroid Build Coastguard Worker   std::array<std::byte, message_size> message;
446*61c4878aSAndroid Build Coastguard Worker   std::memset(message.data(), 'a', message.size());
447*61c4878aSAndroid Build Coastguard Worker   for (size_t i = 0; i < max_multisink_messages; ++i) {
448*61c4878aSAndroid Build Coastguard Worker     message[0] = static_cast<std::byte>(i);
449*61c4878aSAndroid Build Coastguard Worker     multisink_.HandleEntry(message);
450*61c4878aSAndroid Build Coastguard Worker   }
451*61c4878aSAndroid Build Coastguard Worker 
452*61c4878aSAndroid Build Coastguard Worker   // At this point the buffer is full, but the sequence ID will take 1 more byte
453*61c4878aSAndroid Build Coastguard Worker   // in the preamble, meaning that adding N new entries, drops N + 1 entries.
454*61c4878aSAndroid Build Coastguard Worker   // Account for that offset.
455*61c4878aSAndroid Build Coastguard Worker   const size_t expected_drops = 5;
456*61c4878aSAndroid Build Coastguard Worker   for (size_t i = 1; i < expected_drops; ++i) {
457*61c4878aSAndroid Build Coastguard Worker     message[0] = static_cast<std::byte>(200 + i);
458*61c4878aSAndroid Build Coastguard Worker     multisink_.HandleEntry(message);
459*61c4878aSAndroid Build Coastguard Worker   }
460*61c4878aSAndroid Build Coastguard Worker 
461*61c4878aSAndroid Build Coastguard Worker   uint32_t drop_count = 0;
462*61c4878aSAndroid Build Coastguard Worker   uint32_t ingress_drop_count = 0;
463*61c4878aSAndroid Build Coastguard Worker 
464*61c4878aSAndroid Build Coastguard Worker   auto peek_result =
465*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
466*61c4878aSAndroid Build Coastguard Worker   // The message peeked is the 6th message added.
467*61c4878aSAndroid Build Coastguard Worker   message[0] = static_cast<std::byte>(5);
468*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(
469*61c4878aSAndroid Build Coastguard Worker       peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
470*61c4878aSAndroid Build Coastguard Worker 
471*61c4878aSAndroid Build Coastguard Worker   // Add 3 more messages since we peeked the multisink, generating 2 more drops.
472*61c4878aSAndroid Build Coastguard Worker   const size_t expected_drops2 = 2;
473*61c4878aSAndroid Build Coastguard Worker   for (size_t i = 0; i < expected_drops2 + 1; ++i) {
474*61c4878aSAndroid Build Coastguard Worker     message[0] = static_cast<std::byte>(220 + i);
475*61c4878aSAndroid Build Coastguard Worker     multisink_.HandleEntry(message);
476*61c4878aSAndroid Build Coastguard Worker   }
477*61c4878aSAndroid Build Coastguard Worker 
478*61c4878aSAndroid Build Coastguard Worker   // Pop the 6th message now, even though it was already dropped.
479*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[0].PopEntry(peek_result.value()), OkStatus());
480*61c4878aSAndroid Build Coastguard Worker 
481*61c4878aSAndroid Build Coastguard Worker   // A new peek would get the 9th message because two more messages were
482*61c4878aSAndroid Build Coastguard Worker   // dropped. Given that PopEntry() was called with peek_result, all the dropped
483*61c4878aSAndroid Build Coastguard Worker   // messages before peek_result should be considered handled and only the two
484*61c4878aSAndroid Build Coastguard Worker   // new drops should be reported here.
485*61c4878aSAndroid Build Coastguard Worker   auto peek_result2 =
486*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
487*61c4878aSAndroid Build Coastguard Worker   message[0] = static_cast<std::byte>(8);  // 9th message
488*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(peek_result2,
489*61c4878aSAndroid Build Coastguard Worker                    drop_count,
490*61c4878aSAndroid Build Coastguard Worker                    ingress_drop_count,
491*61c4878aSAndroid Build Coastguard Worker                    message,
492*61c4878aSAndroid Build Coastguard Worker                    expected_drops2,
493*61c4878aSAndroid Build Coastguard Worker                    0);
494*61c4878aSAndroid Build Coastguard Worker }
495*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,IngressDropCountOverflow)496*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, IngressDropCountOverflow) {
497*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
498*61c4878aSAndroid Build Coastguard Worker 
499*61c4878aSAndroid Build Coastguard Worker   // Make drain's last handled drop larger than multisink drop count, which
500*61c4878aSAndroid Build Coastguard Worker   // overflowed.
501*61c4878aSAndroid Build Coastguard Worker   const uint32_t drop_count_close_to_overflow =
502*61c4878aSAndroid Build Coastguard Worker       std::numeric_limits<uint32_t>::max() - 3;
503*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleDropped(drop_count_close_to_overflow);
504*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
505*61c4878aSAndroid Build Coastguard Worker 
506*61c4878aSAndroid Build Coastguard Worker   // Catch up drain's drop count.
507*61c4878aSAndroid Build Coastguard Worker   uint32_t drop_count = 0;
508*61c4878aSAndroid Build Coastguard Worker   uint32_t ingress_drop_count = 0;
509*61c4878aSAndroid Build Coastguard Worker   auto peek_result1 =
510*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
511*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(peek_result1,
512*61c4878aSAndroid Build Coastguard Worker                    drop_count,
513*61c4878aSAndroid Build Coastguard Worker                    ingress_drop_count,
514*61c4878aSAndroid Build Coastguard Worker                    kMessage,
515*61c4878aSAndroid Build Coastguard Worker                    0,
516*61c4878aSAndroid Build Coastguard Worker                    drop_count_close_to_overflow);
517*61c4878aSAndroid Build Coastguard Worker   // Popping the peeked entry advances the drain, and a new peek will find the
518*61c4878aSAndroid Build Coastguard Worker   // gap in sequence IDs.
519*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
520*61c4878aSAndroid Build Coastguard Worker 
521*61c4878aSAndroid Build Coastguard Worker   // Overflow multisink's drop count.
522*61c4878aSAndroid Build Coastguard Worker   const uint32_t expected_ingress_drop_count = 10;
523*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleDropped(expected_ingress_drop_count);
524*61c4878aSAndroid Build Coastguard Worker 
525*61c4878aSAndroid Build Coastguard Worker   auto peek_result2 =
526*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
527*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
528*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drop_count, 0u);
529*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
530*61c4878aSAndroid Build Coastguard Worker 
531*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
532*61c4878aSAndroid Build Coastguard Worker   auto peek_result3 =
533*61c4878aSAndroid Build Coastguard Worker       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
534*61c4878aSAndroid Build Coastguard Worker   VerifyPeekResult(
535*61c4878aSAndroid Build Coastguard Worker       peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
536*61c4878aSAndroid Build Coastguard Worker }
537*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,DetachedDrainReportsDropCount)538*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
539*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
540*61c4878aSAndroid Build Coastguard Worker 
541*61c4878aSAndroid Build Coastguard Worker   const uint32_t ingress_drops = 10;
542*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleDropped(ingress_drops);
543*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
544*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 1u);
545*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
546*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
547*61c4878aSAndroid Build Coastguard Worker 
548*61c4878aSAndroid Build Coastguard Worker   // Detaching and attaching drain should report the same drops.
549*61c4878aSAndroid Build Coastguard Worker   multisink_.DetachDrain(drains_[0]);
550*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
551*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 1u);
552*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
553*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
554*61c4878aSAndroid Build Coastguard Worker }
555*61c4878aSAndroid Build Coastguard Worker 
TEST_F(MultiSinkTest,DrainUnreadEntriesSize)556*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, DrainUnreadEntriesSize) {
557*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[0]);
558*61c4878aSAndroid Build Coastguard Worker 
559*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
560*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
561*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
562*61c4878aSAndroid Build Coastguard Worker   multisink_.HandleEntry(kMessage);
563*61c4878aSAndroid Build Coastguard Worker   const size_t unread_entries_size = drains_[0].GetUnreadEntriesSize();
564*61c4878aSAndroid Build Coastguard Worker   EXPECT_GT(unread_entries_size, 0u);
565*61c4878aSAndroid Build Coastguard Worker 
566*61c4878aSAndroid Build Coastguard Worker   // Attach another drain, it sees the same unread entriess size as the first
567*61c4878aSAndroid Build Coastguard Worker   // drain.
568*61c4878aSAndroid Build Coastguard Worker   multisink_.AttachDrain(drains_[1]);
569*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
570*61c4878aSAndroid Build Coastguard Worker 
571*61c4878aSAndroid Build Coastguard Worker   // Pop entries from the first drain.
572*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0],
573*61c4878aSAndroid Build Coastguard Worker                  /*expected_message=*/kMessage,
574*61c4878aSAndroid Build Coastguard Worker                  /*expected_drop_count=*/0,
575*61c4878aSAndroid Build Coastguard Worker                  /*expected_ingress_drop_count=*/0);
576*61c4878aSAndroid Build Coastguard Worker   VerifyPopEntry(drains_[0],
577*61c4878aSAndroid Build Coastguard Worker                  /*expected_message=*/kMessage,
578*61c4878aSAndroid Build Coastguard Worker                  /*expected_drop_count=*/0,
579*61c4878aSAndroid Build Coastguard Worker                  /*expected_ingress_drop_count=*/0);
580*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
581*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
582*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
583*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(drains_[1].GetUnreadEntriesCount(), 2u);
584*61c4878aSAndroid Build Coastguard Worker }
585*61c4878aSAndroid Build Coastguard Worker 
TEST(UnsafeGetUnreadEntriesSize,ReadFromListener)586*61c4878aSAndroid Build Coastguard Worker TEST(UnsafeGetUnreadEntriesSize, ReadFromListener) {
587*61c4878aSAndroid Build Coastguard Worker   std::array<std::byte, 32> buffer;
588*61c4878aSAndroid Build Coastguard Worker   MultiSink multisink(buffer);
589*61c4878aSAndroid Build Coastguard Worker 
590*61c4878aSAndroid Build Coastguard Worker   pw::multisink::MultiSink::Drain drain;
591*61c4878aSAndroid Build Coastguard Worker   multisink.AttachDrain(drain);
592*61c4878aSAndroid Build Coastguard Worker 
593*61c4878aSAndroid Build Coastguard Worker   EntriesSizeMonitorListener listener(drain);
594*61c4878aSAndroid Build Coastguard Worker   multisink.AttachListener(listener);
595*61c4878aSAndroid Build Coastguard Worker 
596*61c4878aSAndroid Build Coastguard Worker   ASSERT_EQ(listener.GetLastSeenUnreadEntriesSize(), 0u);
597*61c4878aSAndroid Build Coastguard Worker 
598*61c4878aSAndroid Build Coastguard Worker   constexpr std::string_view kEntryToPush = "one";
599*61c4878aSAndroid Build Coastguard Worker   multisink.HandleEntry(as_bytes(span<const char>(kEntryToPush)));
600*61c4878aSAndroid Build Coastguard Worker 
601*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(listener.GetLastSeenUnreadEntriesSize(),
602*61c4878aSAndroid Build Coastguard Worker             drain.GetUnreadEntriesSize());
603*61c4878aSAndroid Build Coastguard Worker }
604*61c4878aSAndroid Build Coastguard Worker 
TEST(UnsafeIteration,NoLimit)605*61c4878aSAndroid Build Coastguard Worker TEST(UnsafeIteration, NoLimit) {
606*61c4878aSAndroid Build Coastguard Worker   constexpr std::array<std::string_view, 5> kExpectedEntries{
607*61c4878aSAndroid Build Coastguard Worker       "one", "two", "three", "four", "five"};
608*61c4878aSAndroid Build Coastguard Worker   std::array<std::byte, 32> buffer;
609*61c4878aSAndroid Build Coastguard Worker   MultiSink multisink(buffer);
610*61c4878aSAndroid Build Coastguard Worker 
611*61c4878aSAndroid Build Coastguard Worker   for (std::string_view entry : kExpectedEntries) {
612*61c4878aSAndroid Build Coastguard Worker     multisink.HandleEntry(as_bytes(span<const char>(entry)));
613*61c4878aSAndroid Build Coastguard Worker   }
614*61c4878aSAndroid Build Coastguard Worker 
615*61c4878aSAndroid Build Coastguard Worker   size_t entry_count = 0;
616*61c4878aSAndroid Build Coastguard Worker   struct {
617*61c4878aSAndroid Build Coastguard Worker     size_t& entry_count;
618*61c4878aSAndroid Build Coastguard Worker     span<const std::string_view> expected_results;
619*61c4878aSAndroid Build Coastguard Worker   } ctx{entry_count, kExpectedEntries};
620*61c4878aSAndroid Build Coastguard Worker   auto cb = [&ctx](ConstByteSpan data) {
621*61c4878aSAndroid Build Coastguard Worker     std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
622*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(data.size(), expected_entry.size());
623*61c4878aSAndroid Build Coastguard Worker     const int result =
624*61c4878aSAndroid Build Coastguard Worker         memcmp(data.data(), expected_entry.data(), expected_entry.size());
625*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(0, result);
626*61c4878aSAndroid Build Coastguard Worker     ctx.entry_count++;
627*61c4878aSAndroid Build Coastguard Worker   };
628*61c4878aSAndroid Build Coastguard Worker 
629*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntry(cb));
630*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(kExpectedEntries.size(), entry_count);
631*61c4878aSAndroid Build Coastguard Worker }
632*61c4878aSAndroid Build Coastguard Worker 
TEST(UnsafeIteration,Subset)633*61c4878aSAndroid Build Coastguard Worker TEST(UnsafeIteration, Subset) {
634*61c4878aSAndroid Build Coastguard Worker   constexpr std::array<std::string_view, 5> kExpectedEntries{
635*61c4878aSAndroid Build Coastguard Worker       "one", "two", "three", "four", "five"};
636*61c4878aSAndroid Build Coastguard Worker   constexpr size_t kStartOffset = 3;
637*61c4878aSAndroid Build Coastguard Worker   constexpr size_t kExpectedEntriesMaxEntries =
638*61c4878aSAndroid Build Coastguard Worker       kExpectedEntries.size() - kStartOffset;
639*61c4878aSAndroid Build Coastguard Worker   std::array<std::byte, 32> buffer;
640*61c4878aSAndroid Build Coastguard Worker   MultiSink multisink(buffer);
641*61c4878aSAndroid Build Coastguard Worker 
642*61c4878aSAndroid Build Coastguard Worker   for (std::string_view entry : kExpectedEntries) {
643*61c4878aSAndroid Build Coastguard Worker     multisink.HandleEntry(as_bytes(span<const char>(entry)));
644*61c4878aSAndroid Build Coastguard Worker   }
645*61c4878aSAndroid Build Coastguard Worker 
646*61c4878aSAndroid Build Coastguard Worker   size_t entry_count = 0;
647*61c4878aSAndroid Build Coastguard Worker   struct {
648*61c4878aSAndroid Build Coastguard Worker     size_t& entry_count;
649*61c4878aSAndroid Build Coastguard Worker     span<const std::string_view> expected_results;
650*61c4878aSAndroid Build Coastguard Worker   } ctx{entry_count, kExpectedEntries};
651*61c4878aSAndroid Build Coastguard Worker   auto cb = [&ctx](ConstByteSpan data) {
652*61c4878aSAndroid Build Coastguard Worker     std::string_view expected_entry =
653*61c4878aSAndroid Build Coastguard Worker         ctx.expected_results[ctx.entry_count + kStartOffset];
654*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(data.size(), expected_entry.size());
655*61c4878aSAndroid Build Coastguard Worker     const int result =
656*61c4878aSAndroid Build Coastguard Worker         memcmp(data.data(), expected_entry.data(), expected_entry.size());
657*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(0, result);
658*61c4878aSAndroid Build Coastguard Worker     ctx.entry_count++;
659*61c4878aSAndroid Build Coastguard Worker   };
660*61c4878aSAndroid Build Coastguard Worker 
661*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(
662*61c4878aSAndroid Build Coastguard Worker       OkStatus(),
663*61c4878aSAndroid Build Coastguard Worker       multisink.UnsafeForEachEntry(cb, kExpectedEntries.size() - kStartOffset));
664*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(kExpectedEntriesMaxEntries, entry_count);
665*61c4878aSAndroid Build Coastguard Worker }
666*61c4878aSAndroid Build Coastguard Worker 
TEST(UnsafeIterationFromEnd,NoTruncation)667*61c4878aSAndroid Build Coastguard Worker TEST(UnsafeIterationFromEnd, NoTruncation) {
668*61c4878aSAndroid Build Coastguard Worker   constexpr std::array<std::string_view, 5> kExpectedEntries{
669*61c4878aSAndroid Build Coastguard Worker       "one", "two", "three", "four", "five"};
670*61c4878aSAndroid Build Coastguard Worker   constexpr size_t buffer_size = 32;
671*61c4878aSAndroid Build Coastguard Worker   std::array<std::byte, buffer_size> buffer;
672*61c4878aSAndroid Build Coastguard Worker   MultiSink multisink(buffer);
673*61c4878aSAndroid Build Coastguard Worker 
674*61c4878aSAndroid Build Coastguard Worker   for (std::string_view entry : kExpectedEntries) {
675*61c4878aSAndroid Build Coastguard Worker     multisink.HandleEntry(as_bytes(span<const char>(entry)));
676*61c4878aSAndroid Build Coastguard Worker   }
677*61c4878aSAndroid Build Coastguard Worker 
678*61c4878aSAndroid Build Coastguard Worker   size_t entry_count = 0;
679*61c4878aSAndroid Build Coastguard Worker   struct {
680*61c4878aSAndroid Build Coastguard Worker     size_t& entry_count;
681*61c4878aSAndroid Build Coastguard Worker     span<const std::string_view> expected_results;
682*61c4878aSAndroid Build Coastguard Worker   } ctx{entry_count, kExpectedEntries};
683*61c4878aSAndroid Build Coastguard Worker   auto cb = [&ctx](ConstByteSpan data) {
684*61c4878aSAndroid Build Coastguard Worker     std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
685*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(data.size(), expected_entry.size());
686*61c4878aSAndroid Build Coastguard Worker     const int result =
687*61c4878aSAndroid Build Coastguard Worker         memcmp(data.data(), expected_entry.data(), expected_entry.size());
688*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(0, result);
689*61c4878aSAndroid Build Coastguard Worker     ctx.entry_count++;
690*61c4878aSAndroid Build Coastguard Worker   };
691*61c4878aSAndroid Build Coastguard Worker 
692*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntryFromEnd(cb, buffer_size));
693*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(kExpectedEntries.size(), entry_count);
694*61c4878aSAndroid Build Coastguard Worker }
695*61c4878aSAndroid Build Coastguard Worker 
TEST(UnsafeIterationFromEnd,Truncation)696*61c4878aSAndroid Build Coastguard Worker TEST(UnsafeIterationFromEnd, Truncation) {
697*61c4878aSAndroid Build Coastguard Worker   constexpr std::array<std::string_view, 5> kEntries{
698*61c4878aSAndroid Build Coastguard Worker       "one", "two", "three", "four", "five"};
699*61c4878aSAndroid Build Coastguard Worker   constexpr std::array<std::string_view, 3> kExpectedEntries{
700*61c4878aSAndroid Build Coastguard Worker       "three", "four", "five"};
701*61c4878aSAndroid Build Coastguard Worker   constexpr size_t buffer_size = 32;
702*61c4878aSAndroid Build Coastguard Worker   std::array<std::byte, buffer_size> buffer;
703*61c4878aSAndroid Build Coastguard Worker   MultiSink multisink(buffer);
704*61c4878aSAndroid Build Coastguard Worker 
705*61c4878aSAndroid Build Coastguard Worker   for (std::string_view entry : kEntries) {
706*61c4878aSAndroid Build Coastguard Worker     multisink.HandleEntry(as_bytes(span<const char>(entry)));
707*61c4878aSAndroid Build Coastguard Worker   }
708*61c4878aSAndroid Build Coastguard Worker 
709*61c4878aSAndroid Build Coastguard Worker   size_t entry_count = 0;
710*61c4878aSAndroid Build Coastguard Worker   struct {
711*61c4878aSAndroid Build Coastguard Worker     size_t& entry_count;
712*61c4878aSAndroid Build Coastguard Worker     span<const std::string_view> expected_results;
713*61c4878aSAndroid Build Coastguard Worker   } ctx{entry_count, kExpectedEntries};
714*61c4878aSAndroid Build Coastguard Worker   auto cb = [&ctx](ConstByteSpan data) {
715*61c4878aSAndroid Build Coastguard Worker     std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
716*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(data.size(), expected_entry.size());
717*61c4878aSAndroid Build Coastguard Worker     const int result =
718*61c4878aSAndroid Build Coastguard Worker         memcmp(data.data(), expected_entry.data(), expected_entry.size());
719*61c4878aSAndroid Build Coastguard Worker     EXPECT_EQ(0, result);
720*61c4878aSAndroid Build Coastguard Worker     ctx.entry_count++;
721*61c4878aSAndroid Build Coastguard Worker   };
722*61c4878aSAndroid Build Coastguard Worker 
723*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(OkStatus(),
724*61c4878aSAndroid Build Coastguard Worker             multisink.UnsafeForEachEntryFromEnd(cb, buffer_size / 2));
725*61c4878aSAndroid Build Coastguard Worker   EXPECT_EQ(kExpectedEntries.size(), entry_count);
726*61c4878aSAndroid Build Coastguard Worker }
727*61c4878aSAndroid Build Coastguard Worker 
728*61c4878aSAndroid Build Coastguard Worker }  // namespace
729*61c4878aSAndroid Build Coastguard Worker }  // namespace pw::multisink
730