1*61c4878aSAndroid Build Coastguard Worker // Copyright 2021 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker
15*61c4878aSAndroid Build Coastguard Worker #include <cstddef>
16*61c4878aSAndroid Build Coastguard Worker #include <cstdint>
17*61c4878aSAndroid Build Coastguard Worker
18*61c4878aSAndroid Build Coastguard Worker #include "pw_containers/vector.h"
19*61c4878aSAndroid Build Coastguard Worker #include "pw_multisink/multisink.h"
20*61c4878aSAndroid Build Coastguard Worker #include "pw_multisink/test_thread.h"
21*61c4878aSAndroid Build Coastguard Worker #include "pw_span/span.h"
22*61c4878aSAndroid Build Coastguard Worker #include "pw_string/string_builder.h"
23*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/thread.h"
24*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/yield.h"
25*61c4878aSAndroid Build Coastguard Worker #include "pw_unit_test/framework.h"
26*61c4878aSAndroid Build Coastguard Worker
27*61c4878aSAndroid Build Coastguard Worker namespace pw::multisink {
28*61c4878aSAndroid Build Coastguard Worker namespace {
29*61c4878aSAndroid Build Coastguard Worker
30*61c4878aSAndroid Build Coastguard Worker constexpr size_t kEntryBufferSize = sizeof("message 000");
31*61c4878aSAndroid Build Coastguard Worker constexpr size_t kMaxMessageCount = 250;
32*61c4878aSAndroid Build Coastguard Worker constexpr size_t kBufferSize = kMaxMessageCount * kEntryBufferSize;
33*61c4878aSAndroid Build Coastguard Worker
34*61c4878aSAndroid Build Coastguard Worker using MessageSpan = span<const StringBuffer<kEntryBufferSize>>;
35*61c4878aSAndroid Build Coastguard Worker
36*61c4878aSAndroid Build Coastguard Worker // This function is unused if joining is not supported.
CompareSentAndReceivedMessages(const MessageSpan & sent_messages,const MessageSpan & received_messages)37*61c4878aSAndroid Build Coastguard Worker [[maybe_unused]] void CompareSentAndReceivedMessages(
38*61c4878aSAndroid Build Coastguard Worker const MessageSpan& sent_messages, const MessageSpan& received_messages) {
39*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(sent_messages.size(), received_messages.size());
40*61c4878aSAndroid Build Coastguard Worker for (size_t i = 0; i < sent_messages.size(); ++i) {
41*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(sent_messages[i].size(), received_messages[i].size());
42*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(std::string_view(sent_messages[i]),
43*61c4878aSAndroid Build Coastguard Worker std::string_view(received_messages[i]));
44*61c4878aSAndroid Build Coastguard Worker }
45*61c4878aSAndroid Build Coastguard Worker }
46*61c4878aSAndroid Build Coastguard Worker
47*61c4878aSAndroid Build Coastguard Worker } // namespace
48*61c4878aSAndroid Build Coastguard Worker
49*61c4878aSAndroid Build Coastguard Worker // Static message pool to avoid recreating messages for every test and avoids
50*61c4878aSAndroid Build Coastguard Worker // using std::string.
51*61c4878aSAndroid Build Coastguard Worker class MessagePool {
52*61c4878aSAndroid Build Coastguard Worker public:
Instance()53*61c4878aSAndroid Build Coastguard Worker static MessagePool& Instance() {
54*61c4878aSAndroid Build Coastguard Worker static MessagePool instance;
55*61c4878aSAndroid Build Coastguard Worker return instance;
56*61c4878aSAndroid Build Coastguard Worker }
57*61c4878aSAndroid Build Coastguard Worker
58*61c4878aSAndroid Build Coastguard Worker MessagePool(const MessagePool&) = delete;
59*61c4878aSAndroid Build Coastguard Worker MessagePool& operator=(const MessagePool&) = delete;
60*61c4878aSAndroid Build Coastguard Worker MessagePool(MessagePool&&) = delete;
61*61c4878aSAndroid Build Coastguard Worker MessagePool& operator=(MessagePool&&) = delete;
62*61c4878aSAndroid Build Coastguard Worker
GetMessages(size_t message_count) const63*61c4878aSAndroid Build Coastguard Worker MessageSpan GetMessages(size_t message_count) const {
64*61c4878aSAndroid Build Coastguard Worker PW_ASSERT(message_count <= messages_.size());
65*61c4878aSAndroid Build Coastguard Worker return MessageSpan(messages_.begin(), message_count);
66*61c4878aSAndroid Build Coastguard Worker }
67*61c4878aSAndroid Build Coastguard Worker
68*61c4878aSAndroid Build Coastguard Worker private:
MessagePool()69*61c4878aSAndroid Build Coastguard Worker MessagePool() {
70*61c4878aSAndroid Build Coastguard Worker for (size_t i = 0; i < kMaxMessageCount; ++i) {
71*61c4878aSAndroid Build Coastguard Worker messages_.emplace_back();
72*61c4878aSAndroid Build Coastguard Worker messages_.back() << "message %u" << static_cast<unsigned int>(i);
73*61c4878aSAndroid Build Coastguard Worker }
74*61c4878aSAndroid Build Coastguard Worker }
75*61c4878aSAndroid Build Coastguard Worker
76*61c4878aSAndroid Build Coastguard Worker Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> messages_;
77*61c4878aSAndroid Build Coastguard Worker };
78*61c4878aSAndroid Build Coastguard Worker
79*61c4878aSAndroid Build Coastguard Worker // Continuously reads logs from a multisink, using PopEntry() and stores copies
80*61c4878aSAndroid Build Coastguard Worker // of the retrieved messages for later verification. The thread stops when the
81*61c4878aSAndroid Build Coastguard Worker // the number of read messages and total drop count matches the expected count.
82*61c4878aSAndroid Build Coastguard Worker class LogPopReaderThread : public thread::ThreadCore {
83*61c4878aSAndroid Build Coastguard Worker public:
LogPopReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)84*61c4878aSAndroid Build Coastguard Worker LogPopReaderThread(MultiSink& multisink,
85*61c4878aSAndroid Build Coastguard Worker uint32_t expected_message_and_drop_count)
86*61c4878aSAndroid Build Coastguard Worker : multisink_(multisink),
87*61c4878aSAndroid Build Coastguard Worker total_drop_count_(0),
88*61c4878aSAndroid Build Coastguard Worker expected_message_and_drop_count_(expected_message_and_drop_count) {
89*61c4878aSAndroid Build Coastguard Worker PW_ASSERT(expected_message_and_drop_count_ <= kMaxMessageCount);
90*61c4878aSAndroid Build Coastguard Worker }
91*61c4878aSAndroid Build Coastguard Worker
drop_count()92*61c4878aSAndroid Build Coastguard Worker uint32_t drop_count() { return total_drop_count_; }
93*61c4878aSAndroid Build Coastguard Worker
received_messages()94*61c4878aSAndroid Build Coastguard Worker const MessageSpan received_messages() {
95*61c4878aSAndroid Build Coastguard Worker return MessageSpan(received_messages_.begin(), received_messages_.size());
96*61c4878aSAndroid Build Coastguard Worker }
97*61c4878aSAndroid Build Coastguard Worker
Run()98*61c4878aSAndroid Build Coastguard Worker void Run() override {
99*61c4878aSAndroid Build Coastguard Worker multisink_.AttachDrain(drain_);
100*61c4878aSAndroid Build Coastguard Worker ReadAllEntries();
101*61c4878aSAndroid Build Coastguard Worker }
102*61c4878aSAndroid Build Coastguard Worker
ReadAllEntries()103*61c4878aSAndroid Build Coastguard Worker virtual void ReadAllEntries() {
104*61c4878aSAndroid Build Coastguard Worker do {
105*61c4878aSAndroid Build Coastguard Worker uint32_t drop_count = 0;
106*61c4878aSAndroid Build Coastguard Worker uint32_t ingress_drop_count = 0;
107*61c4878aSAndroid Build Coastguard Worker const Result<ConstByteSpan> possible_entry =
108*61c4878aSAndroid Build Coastguard Worker drain_.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
109*61c4878aSAndroid Build Coastguard Worker total_drop_count_ += drop_count + ingress_drop_count;
110*61c4878aSAndroid Build Coastguard Worker if (possible_entry.status().IsOutOfRange()) {
111*61c4878aSAndroid Build Coastguard Worker pw::this_thread::yield();
112*61c4878aSAndroid Build Coastguard Worker continue;
113*61c4878aSAndroid Build Coastguard Worker }
114*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(possible_entry.status(), OkStatus());
115*61c4878aSAndroid Build Coastguard Worker if (received_messages_.full()) {
116*61c4878aSAndroid Build Coastguard Worker return;
117*61c4878aSAndroid Build Coastguard Worker }
118*61c4878aSAndroid Build Coastguard Worker received_messages_.emplace_back();
119*61c4878aSAndroid Build Coastguard Worker received_messages_.back() << std::string_view(
120*61c4878aSAndroid Build Coastguard Worker reinterpret_cast<const char*>(possible_entry.value().data()),
121*61c4878aSAndroid Build Coastguard Worker possible_entry.value().size());
122*61c4878aSAndroid Build Coastguard Worker pw::this_thread::yield();
123*61c4878aSAndroid Build Coastguard Worker } while (total_drop_count_ + received_messages_.size() <
124*61c4878aSAndroid Build Coastguard Worker expected_message_and_drop_count_);
125*61c4878aSAndroid Build Coastguard Worker }
126*61c4878aSAndroid Build Coastguard Worker
127*61c4878aSAndroid Build Coastguard Worker protected:
128*61c4878aSAndroid Build Coastguard Worker MultiSink::Drain drain_;
129*61c4878aSAndroid Build Coastguard Worker MultiSink& multisink_;
130*61c4878aSAndroid Build Coastguard Worker std::array<std::byte, kEntryBufferSize> entry_buffer_;
131*61c4878aSAndroid Build Coastguard Worker uint32_t total_drop_count_;
132*61c4878aSAndroid Build Coastguard Worker const uint32_t expected_message_and_drop_count_;
133*61c4878aSAndroid Build Coastguard Worker Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> received_messages_;
134*61c4878aSAndroid Build Coastguard Worker };
135*61c4878aSAndroid Build Coastguard Worker
136*61c4878aSAndroid Build Coastguard Worker class LogPeekAndCommitReaderThread : public LogPopReaderThread {
137*61c4878aSAndroid Build Coastguard Worker public:
LogPeekAndCommitReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)138*61c4878aSAndroid Build Coastguard Worker LogPeekAndCommitReaderThread(MultiSink& multisink,
139*61c4878aSAndroid Build Coastguard Worker uint32_t expected_message_and_drop_count)
140*61c4878aSAndroid Build Coastguard Worker : LogPopReaderThread(multisink, expected_message_and_drop_count) {}
141*61c4878aSAndroid Build Coastguard Worker
ReadAllEntries()142*61c4878aSAndroid Build Coastguard Worker void ReadAllEntries() override {
143*61c4878aSAndroid Build Coastguard Worker do {
144*61c4878aSAndroid Build Coastguard Worker uint32_t drop_count = 0;
145*61c4878aSAndroid Build Coastguard Worker uint32_t ingress_drop_count = 0;
146*61c4878aSAndroid Build Coastguard Worker const Result<MultiSink::Drain::PeekedEntry> possible_entry =
147*61c4878aSAndroid Build Coastguard Worker drain_.PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
148*61c4878aSAndroid Build Coastguard Worker total_drop_count_ += drop_count + ingress_drop_count;
149*61c4878aSAndroid Build Coastguard Worker if (possible_entry.status().IsOutOfRange()) {
150*61c4878aSAndroid Build Coastguard Worker pw::this_thread::yield();
151*61c4878aSAndroid Build Coastguard Worker continue;
152*61c4878aSAndroid Build Coastguard Worker }
153*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(possible_entry.status(), OkStatus());
154*61c4878aSAndroid Build Coastguard Worker if (received_messages_.full()) {
155*61c4878aSAndroid Build Coastguard Worker return;
156*61c4878aSAndroid Build Coastguard Worker }
157*61c4878aSAndroid Build Coastguard Worker pw::this_thread::yield();
158*61c4878aSAndroid Build Coastguard Worker received_messages_.emplace_back();
159*61c4878aSAndroid Build Coastguard Worker received_messages_.back() << std::string_view(
160*61c4878aSAndroid Build Coastguard Worker reinterpret_cast<const char*>(possible_entry.value().entry().data()),
161*61c4878aSAndroid Build Coastguard Worker possible_entry.value().entry().size());
162*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(drain_.PopEntry(possible_entry.value()), OkStatus());
163*61c4878aSAndroid Build Coastguard Worker pw::this_thread::yield();
164*61c4878aSAndroid Build Coastguard Worker } while (total_drop_count_ + received_messages_.size() <
165*61c4878aSAndroid Build Coastguard Worker expected_message_and_drop_count_);
166*61c4878aSAndroid Build Coastguard Worker }
167*61c4878aSAndroid Build Coastguard Worker };
168*61c4878aSAndroid Build Coastguard Worker
169*61c4878aSAndroid Build Coastguard Worker // Adds the provided messages to the shared multisink.
170*61c4878aSAndroid Build Coastguard Worker class LogWriterThread : public thread::ThreadCore {
171*61c4878aSAndroid Build Coastguard Worker public:
LogWriterThread(MultiSink & multisink,const MessageSpan & message_stack)172*61c4878aSAndroid Build Coastguard Worker LogWriterThread(MultiSink& multisink, const MessageSpan& message_stack)
173*61c4878aSAndroid Build Coastguard Worker : multisink_(multisink), message_stack_(message_stack) {}
174*61c4878aSAndroid Build Coastguard Worker
Run()175*61c4878aSAndroid Build Coastguard Worker void Run() override {
176*61c4878aSAndroid Build Coastguard Worker for (const auto& message : message_stack_) {
177*61c4878aSAndroid Build Coastguard Worker multisink_.HandleEntry(as_bytes(span(std::string_view(message))));
178*61c4878aSAndroid Build Coastguard Worker pw::this_thread::yield();
179*61c4878aSAndroid Build Coastguard Worker }
180*61c4878aSAndroid Build Coastguard Worker }
181*61c4878aSAndroid Build Coastguard Worker
182*61c4878aSAndroid Build Coastguard Worker private:
183*61c4878aSAndroid Build Coastguard Worker MultiSink& multisink_;
184*61c4878aSAndroid Build Coastguard Worker const MessageSpan& message_stack_;
185*61c4878aSAndroid Build Coastguard Worker };
186*61c4878aSAndroid Build Coastguard Worker
187*61c4878aSAndroid Build Coastguard Worker class MultiSinkTest : public ::testing::Test {
188*61c4878aSAndroid Build Coastguard Worker protected:
MultiSinkTest()189*61c4878aSAndroid Build Coastguard Worker MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
190*61c4878aSAndroid Build Coastguard Worker
191*61c4878aSAndroid Build Coastguard Worker std::byte buffer_[kBufferSize];
192*61c4878aSAndroid Build Coastguard Worker MultiSink multisink_;
193*61c4878aSAndroid Build Coastguard Worker };
194*61c4878aSAndroid Build Coastguard Worker
195*61c4878aSAndroid Build Coastguard Worker #if PW_THREAD_JOINING_ENABLED
196*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,SingleWriterSingleReader)197*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, SingleWriterSingleReader) {
198*61c4878aSAndroid Build Coastguard Worker const uint32_t log_count = 100;
199*61c4878aSAndroid Build Coastguard Worker const uint32_t drop_count = 5;
200*61c4878aSAndroid Build Coastguard Worker const uint32_t expected_message_and_drop_count = log_count + drop_count;
201*61c4878aSAndroid Build Coastguard Worker const auto message_stack = MessagePool::Instance().GetMessages(log_count);
202*61c4878aSAndroid Build Coastguard Worker
203*61c4878aSAndroid Build Coastguard Worker // Start reader thread.
204*61c4878aSAndroid Build Coastguard Worker LogPopReaderThread reader_thread_core(multisink_,
205*61c4878aSAndroid Build Coastguard Worker expected_message_and_drop_count);
206*61c4878aSAndroid Build Coastguard Worker Thread reader_thread(test::MultiSinkTestThreadOptions(), reader_thread_core);
207*61c4878aSAndroid Build Coastguard Worker // Start writer thread.
208*61c4878aSAndroid Build Coastguard Worker LogWriterThread writer_thread_core(multisink_, message_stack);
209*61c4878aSAndroid Build Coastguard Worker Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core);
210*61c4878aSAndroid Build Coastguard Worker
211*61c4878aSAndroid Build Coastguard Worker // Wait for writer thread to end.
212*61c4878aSAndroid Build Coastguard Worker writer_thread.join();
213*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped(drop_count);
214*61c4878aSAndroid Build Coastguard Worker reader_thread.join();
215*61c4878aSAndroid Build Coastguard Worker
216*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
217*61c4878aSAndroid Build Coastguard Worker CompareSentAndReceivedMessages(message_stack,
218*61c4878aSAndroid Build Coastguard Worker reader_thread_core.received_messages());
219*61c4878aSAndroid Build Coastguard Worker }
220*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,SingleWriterSinglePeekAndCommitReader)221*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, SingleWriterSinglePeekAndCommitReader) {
222*61c4878aSAndroid Build Coastguard Worker const uint32_t log_count = 100;
223*61c4878aSAndroid Build Coastguard Worker const uint32_t drop_count = 5;
224*61c4878aSAndroid Build Coastguard Worker const uint32_t expected_message_and_drop_count = log_count + drop_count;
225*61c4878aSAndroid Build Coastguard Worker const auto message_stack = MessagePool::Instance().GetMessages(log_count);
226*61c4878aSAndroid Build Coastguard Worker
227*61c4878aSAndroid Build Coastguard Worker // Start reader thread.
228*61c4878aSAndroid Build Coastguard Worker LogPeekAndCommitReaderThread reader_thread_core(
229*61c4878aSAndroid Build Coastguard Worker multisink_, expected_message_and_drop_count);
230*61c4878aSAndroid Build Coastguard Worker Thread reader_thread(test::MultiSinkTestThreadOptions(), reader_thread_core);
231*61c4878aSAndroid Build Coastguard Worker // Start writer thread.
232*61c4878aSAndroid Build Coastguard Worker LogWriterThread writer_thread_core(multisink_, message_stack);
233*61c4878aSAndroid Build Coastguard Worker Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core);
234*61c4878aSAndroid Build Coastguard Worker
235*61c4878aSAndroid Build Coastguard Worker // Wait for writer thread to end.
236*61c4878aSAndroid Build Coastguard Worker writer_thread.join();
237*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped(drop_count);
238*61c4878aSAndroid Build Coastguard Worker reader_thread.join();
239*61c4878aSAndroid Build Coastguard Worker
240*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
241*61c4878aSAndroid Build Coastguard Worker CompareSentAndReceivedMessages(message_stack,
242*61c4878aSAndroid Build Coastguard Worker reader_thread_core.received_messages());
243*61c4878aSAndroid Build Coastguard Worker }
244*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,SingleWriterMultipleReaders)245*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, SingleWriterMultipleReaders) {
246*61c4878aSAndroid Build Coastguard Worker const uint32_t log_count = 100;
247*61c4878aSAndroid Build Coastguard Worker const uint32_t drop_count = 5;
248*61c4878aSAndroid Build Coastguard Worker const uint32_t expected_message_and_drop_count = log_count + drop_count;
249*61c4878aSAndroid Build Coastguard Worker const auto message_stack = MessagePool::Instance().GetMessages(log_count);
250*61c4878aSAndroid Build Coastguard Worker
251*61c4878aSAndroid Build Coastguard Worker // Start reader threads.
252*61c4878aSAndroid Build Coastguard Worker LogPopReaderThread reader_thread_core1(multisink_,
253*61c4878aSAndroid Build Coastguard Worker expected_message_and_drop_count);
254*61c4878aSAndroid Build Coastguard Worker Thread reader_thread1(test::MultiSinkTestThreadOptions(),
255*61c4878aSAndroid Build Coastguard Worker reader_thread_core1);
256*61c4878aSAndroid Build Coastguard Worker LogPopReaderThread reader_thread_core2(multisink_,
257*61c4878aSAndroid Build Coastguard Worker expected_message_and_drop_count);
258*61c4878aSAndroid Build Coastguard Worker Thread reader_thread2(test::MultiSinkTestThreadOptions(),
259*61c4878aSAndroid Build Coastguard Worker reader_thread_core2);
260*61c4878aSAndroid Build Coastguard Worker LogPeekAndCommitReaderThread reader_thread_core3(
261*61c4878aSAndroid Build Coastguard Worker multisink_, expected_message_and_drop_count);
262*61c4878aSAndroid Build Coastguard Worker Thread reader_thread3(test::MultiSinkTestThreadOptions(),
263*61c4878aSAndroid Build Coastguard Worker reader_thread_core3);
264*61c4878aSAndroid Build Coastguard Worker // Start writer thread.
265*61c4878aSAndroid Build Coastguard Worker LogWriterThread writer_thread_core(multisink_, message_stack);
266*61c4878aSAndroid Build Coastguard Worker Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core);
267*61c4878aSAndroid Build Coastguard Worker
268*61c4878aSAndroid Build Coastguard Worker // Wait for writer thread to end.
269*61c4878aSAndroid Build Coastguard Worker writer_thread.join();
270*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped(drop_count);
271*61c4878aSAndroid Build Coastguard Worker reader_thread1.join();
272*61c4878aSAndroid Build Coastguard Worker reader_thread2.join();
273*61c4878aSAndroid Build Coastguard Worker reader_thread3.join();
274*61c4878aSAndroid Build Coastguard Worker
275*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
276*61c4878aSAndroid Build Coastguard Worker CompareSentAndReceivedMessages(message_stack,
277*61c4878aSAndroid Build Coastguard Worker reader_thread_core1.received_messages());
278*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
279*61c4878aSAndroid Build Coastguard Worker CompareSentAndReceivedMessages(message_stack,
280*61c4878aSAndroid Build Coastguard Worker reader_thread_core2.received_messages());
281*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
282*61c4878aSAndroid Build Coastguard Worker CompareSentAndReceivedMessages(message_stack,
283*61c4878aSAndroid Build Coastguard Worker reader_thread_core3.received_messages());
284*61c4878aSAndroid Build Coastguard Worker }
285*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,MultipleWritersMultipleReaders)286*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, MultipleWritersMultipleReaders) {
287*61c4878aSAndroid Build Coastguard Worker const uint32_t log_count = 100;
288*61c4878aSAndroid Build Coastguard Worker const uint32_t drop_count = 7;
289*61c4878aSAndroid Build Coastguard Worker const uint32_t expected_message_and_drop_count = 2 * log_count + drop_count;
290*61c4878aSAndroid Build Coastguard Worker const auto message_stack = MessagePool::Instance().GetMessages(log_count);
291*61c4878aSAndroid Build Coastguard Worker
292*61c4878aSAndroid Build Coastguard Worker // Start reader threads.
293*61c4878aSAndroid Build Coastguard Worker LogPopReaderThread reader_thread_core1(multisink_,
294*61c4878aSAndroid Build Coastguard Worker expected_message_and_drop_count);
295*61c4878aSAndroid Build Coastguard Worker Thread reader_thread1(test::MultiSinkTestThreadOptions(),
296*61c4878aSAndroid Build Coastguard Worker reader_thread_core1);
297*61c4878aSAndroid Build Coastguard Worker LogPopReaderThread reader_thread_core2(multisink_,
298*61c4878aSAndroid Build Coastguard Worker expected_message_and_drop_count);
299*61c4878aSAndroid Build Coastguard Worker Thread reader_thread2(test::MultiSinkTestThreadOptions(),
300*61c4878aSAndroid Build Coastguard Worker reader_thread_core2);
301*61c4878aSAndroid Build Coastguard Worker LogPeekAndCommitReaderThread reader_thread_core3(
302*61c4878aSAndroid Build Coastguard Worker multisink_, expected_message_and_drop_count);
303*61c4878aSAndroid Build Coastguard Worker Thread reader_thread3(test::MultiSinkTestThreadOptions(),
304*61c4878aSAndroid Build Coastguard Worker reader_thread_core3);
305*61c4878aSAndroid Build Coastguard Worker // Start writer threads.
306*61c4878aSAndroid Build Coastguard Worker LogWriterThread writer_thread_core1(multisink_, message_stack);
307*61c4878aSAndroid Build Coastguard Worker Thread writer_thread1(test::MultiSinkTestThreadOptions(),
308*61c4878aSAndroid Build Coastguard Worker writer_thread_core1);
309*61c4878aSAndroid Build Coastguard Worker LogWriterThread writer_thread_core2(multisink_, message_stack);
310*61c4878aSAndroid Build Coastguard Worker Thread writer_thread2(test::MultiSinkTestThreadOptions(),
311*61c4878aSAndroid Build Coastguard Worker writer_thread_core2);
312*61c4878aSAndroid Build Coastguard Worker
313*61c4878aSAndroid Build Coastguard Worker // Wait for writer thread to end.
314*61c4878aSAndroid Build Coastguard Worker writer_thread1.join();
315*61c4878aSAndroid Build Coastguard Worker writer_thread2.join();
316*61c4878aSAndroid Build Coastguard Worker multisink_.HandleDropped(drop_count);
317*61c4878aSAndroid Build Coastguard Worker reader_thread1.join();
318*61c4878aSAndroid Build Coastguard Worker reader_thread2.join();
319*61c4878aSAndroid Build Coastguard Worker reader_thread3.join();
320*61c4878aSAndroid Build Coastguard Worker
321*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
322*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
323*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
324*61c4878aSAndroid Build Coastguard Worker // Since we don't know the order that messages came in, we can't check them.
325*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader_thread_core1.received_messages().size(),
326*61c4878aSAndroid Build Coastguard Worker expected_message_and_drop_count - drop_count);
327*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader_thread_core2.received_messages().size(),
328*61c4878aSAndroid Build Coastguard Worker expected_message_and_drop_count - drop_count);
329*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader_thread_core3.received_messages().size(),
330*61c4878aSAndroid Build Coastguard Worker expected_message_and_drop_count - drop_count);
331*61c4878aSAndroid Build Coastguard Worker }
332*61c4878aSAndroid Build Coastguard Worker
TEST_F(MultiSinkTest,OverflowMultisink)333*61c4878aSAndroid Build Coastguard Worker TEST_F(MultiSinkTest, OverflowMultisink) {
334*61c4878aSAndroid Build Coastguard Worker // Expect the multisink to overflow and readers to not fail when poping, or
335*61c4878aSAndroid Build Coastguard Worker // peeking and commiting entries.
336*61c4878aSAndroid Build Coastguard Worker const size_t log_count = kMaxMessageCount;
337*61c4878aSAndroid Build Coastguard Worker const size_t max_buffer_entry_count = 20;
338*61c4878aSAndroid Build Coastguard Worker std::byte small_multisink_buffer[max_buffer_entry_count * kEntryBufferSize];
339*61c4878aSAndroid Build Coastguard Worker MultiSink small_multisink(small_multisink_buffer);
340*61c4878aSAndroid Build Coastguard Worker
341*61c4878aSAndroid Build Coastguard Worker const auto message_stack = MessagePool::Instance().GetMessages(log_count);
342*61c4878aSAndroid Build Coastguard Worker
343*61c4878aSAndroid Build Coastguard Worker // Start reader threads.
344*61c4878aSAndroid Build Coastguard Worker LogPeekAndCommitReaderThread reader_thread_core1(small_multisink, log_count);
345*61c4878aSAndroid Build Coastguard Worker Thread reader_thread1(test::MultiSinkTestThreadOptions(),
346*61c4878aSAndroid Build Coastguard Worker reader_thread_core1);
347*61c4878aSAndroid Build Coastguard Worker LogPopReaderThread reader_thread_core2(small_multisink, log_count);
348*61c4878aSAndroid Build Coastguard Worker Thread reader_thread2(test::MultiSinkTestThreadOptions(),
349*61c4878aSAndroid Build Coastguard Worker reader_thread_core2);
350*61c4878aSAndroid Build Coastguard Worker
351*61c4878aSAndroid Build Coastguard Worker // Start writer threads.
352*61c4878aSAndroid Build Coastguard Worker LogWriterThread writer_thread_core1(small_multisink, message_stack);
353*61c4878aSAndroid Build Coastguard Worker Thread writer_thread1(test::MultiSinkTestThreadOptions(),
354*61c4878aSAndroid Build Coastguard Worker writer_thread_core1);
355*61c4878aSAndroid Build Coastguard Worker LogWriterThread writer_thread_core2(small_multisink, message_stack);
356*61c4878aSAndroid Build Coastguard Worker Thread writer_thread2(test::MultiSinkTestThreadOptions(),
357*61c4878aSAndroid Build Coastguard Worker writer_thread_core2);
358*61c4878aSAndroid Build Coastguard Worker
359*61c4878aSAndroid Build Coastguard Worker // Wait for writer thread to end.
360*61c4878aSAndroid Build Coastguard Worker writer_thread1.join();
361*61c4878aSAndroid Build Coastguard Worker writer_thread2.join();
362*61c4878aSAndroid Build Coastguard Worker reader_thread1.join();
363*61c4878aSAndroid Build Coastguard Worker reader_thread2.join();
364*61c4878aSAndroid Build Coastguard Worker
365*61c4878aSAndroid Build Coastguard Worker // Verifying received messages and drop message counts is unreliable as we
366*61c4878aSAndroid Build Coastguard Worker // can't control the order threads will operate.
367*61c4878aSAndroid Build Coastguard Worker }
368*61c4878aSAndroid Build Coastguard Worker
369*61c4878aSAndroid Build Coastguard Worker #endif // PW_THREAD_JOINING_ENABLED
370*61c4878aSAndroid Build Coastguard Worker
371*61c4878aSAndroid Build Coastguard Worker } // namespace pw::multisink
372