1*61c4878aSAndroid Build Coastguard Worker // Copyright 2023 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker
15*61c4878aSAndroid Build Coastguard Worker #include "pw_stream/mpsc_stream.h"
16*61c4878aSAndroid Build Coastguard Worker
17*61c4878aSAndroid Build Coastguard Worker #include "pw_containers/vector.h"
18*61c4878aSAndroid Build Coastguard Worker #include "pw_fuzzer/fuzztest.h"
19*61c4878aSAndroid Build Coastguard Worker #include "pw_random/xor_shift.h"
20*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/test_thread_context.h"
21*61c4878aSAndroid Build Coastguard Worker #include "pw_thread/thread.h"
22*61c4878aSAndroid Build Coastguard Worker #include "pw_unit_test/framework.h"
23*61c4878aSAndroid Build Coastguard Worker
24*61c4878aSAndroid Build Coastguard Worker // TODO: https://pwbug.dev/365161669 - Express joinability as a build-system
25*61c4878aSAndroid Build Coastguard Worker // constraint.
26*61c4878aSAndroid Build Coastguard Worker #if PW_THREAD_JOINING_ENABLED
27*61c4878aSAndroid Build Coastguard Worker
28*61c4878aSAndroid Build Coastguard Worker namespace pw::stream {
29*61c4878aSAndroid Build Coastguard Worker namespace {
30*61c4878aSAndroid Build Coastguard Worker
31*61c4878aSAndroid Build Coastguard Worker using namespace std::chrono_literals;
32*61c4878aSAndroid Build Coastguard Worker using namespace pw::fuzzer;
33*61c4878aSAndroid Build Coastguard Worker
34*61c4878aSAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
35*61c4878aSAndroid Build Coastguard Worker // Test fixtures.
36*61c4878aSAndroid Build Coastguard Worker
37*61c4878aSAndroid Build Coastguard Worker /// Capacity in bytes for data buffers.
38*61c4878aSAndroid Build Coastguard Worker constexpr size_t kBufSize = 512;
39*61c4878aSAndroid Build Coastguard Worker
40*61c4878aSAndroid Build Coastguard Worker /// Fills a byte span with random data.
Fill(std::byte * buf,size_t len)41*61c4878aSAndroid Build Coastguard Worker void Fill(std::byte* buf, size_t len) {
42*61c4878aSAndroid Build Coastguard Worker ByteSpan data(buf, len);
43*61c4878aSAndroid Build Coastguard Worker random::XorShiftStarRng64 rng(1);
44*61c4878aSAndroid Build Coastguard Worker rng.Get(data);
45*61c4878aSAndroid Build Coastguard Worker }
46*61c4878aSAndroid Build Coastguard Worker
47*61c4878aSAndroid Build Coastguard Worker /// FNV-1a offset basis.
48*61c4878aSAndroid Build Coastguard Worker constexpr uint64_t kOffsetBasis = 0xcbf29ce484222325ULL;
49*61c4878aSAndroid Build Coastguard Worker
50*61c4878aSAndroid Build Coastguard Worker /// FNV-1a prime value.
51*61c4878aSAndroid Build Coastguard Worker constexpr uint64_t kPrimeValue = 0x100000001b3ULL;
52*61c4878aSAndroid Build Coastguard Worker
53*61c4878aSAndroid Build Coastguard Worker /// Quick implementation of public-domain Fowler-Noll-Vo hashing algorithm.
54*61c4878aSAndroid Build Coastguard Worker ///
55*61c4878aSAndroid Build Coastguard Worker /// This is used in the tests below to verify equality of two sequences of bytes
56*61c4878aSAndroid Build Coastguard Worker /// that are too large to compare directly.
57*61c4878aSAndroid Build Coastguard Worker ///
58*61c4878aSAndroid Build Coastguard Worker /// See http://www.isthe.com/chongo/tech/comp/fnv/index.html
fnv1a(ConstByteSpan bytes,uint64_t & hash)59*61c4878aSAndroid Build Coastguard Worker void fnv1a(ConstByteSpan bytes, uint64_t& hash) {
60*61c4878aSAndroid Build Coastguard Worker for (const auto& b : bytes) {
61*61c4878aSAndroid Build Coastguard Worker hash = (hash ^ static_cast<uint8_t>(b)) * kPrimeValue;
62*61c4878aSAndroid Build Coastguard Worker }
63*61c4878aSAndroid Build Coastguard Worker }
64*61c4878aSAndroid Build Coastguard Worker
65*61c4878aSAndroid Build Coastguard Worker /// MpscStream test context that uses a generic reader.
66*61c4878aSAndroid Build Coastguard Worker ///
67*61c4878aSAndroid Build Coastguard Worker /// This struct associates a reader and writer with their parameters and return
68*61c4878aSAndroid Build Coastguard Worker /// values. This is useful for communicating with threads spawned to call a
69*61c4878aSAndroid Build Coastguard Worker /// blocking method.
70*61c4878aSAndroid Build Coastguard Worker struct MpscTestContext {
71*61c4878aSAndroid Build Coastguard Worker MpscWriter writer;
72*61c4878aSAndroid Build Coastguard Worker MpscReader reader;
73*61c4878aSAndroid Build Coastguard Worker
74*61c4878aSAndroid Build Coastguard Worker ConstByteSpan data;
75*61c4878aSAndroid Build Coastguard Worker std::byte write_buffer[kBufSize];
76*61c4878aSAndroid Build Coastguard Worker uint64_t write_hash = kOffsetBasis;
77*61c4878aSAndroid Build Coastguard Worker Status write_status;
78*61c4878aSAndroid Build Coastguard Worker
79*61c4878aSAndroid Build Coastguard Worker ByteSpan destination;
80*61c4878aSAndroid Build Coastguard Worker std::byte read_buffer[kBufSize];
81*61c4878aSAndroid Build Coastguard Worker Result<ByteSpan> read_result;
82*61c4878aSAndroid Build Coastguard Worker uint64_t read_hash = kOffsetBasis;
83*61c4878aSAndroid Build Coastguard Worker size_t total_read = 0;
84*61c4878aSAndroid Build Coastguard Worker
MpscTestContextpw::stream::__anonff6ca3ed0111::MpscTestContext85*61c4878aSAndroid Build Coastguard Worker MpscTestContext() {
86*61c4878aSAndroid Build Coastguard Worker data = ConstByteSpan(write_buffer);
87*61c4878aSAndroid Build Coastguard Worker destination = ByteSpan(read_buffer);
88*61c4878aSAndroid Build Coastguard Worker }
89*61c4878aSAndroid Build Coastguard Worker
Connectpw::stream::__anonff6ca3ed0111::MpscTestContext90*61c4878aSAndroid Build Coastguard Worker void Connect() { CreateMpscStream(reader, writer); }
91*61c4878aSAndroid Build Coastguard Worker
92*61c4878aSAndroid Build Coastguard Worker // Fills a byte span with random data.
Fillpw::stream::__anonff6ca3ed0111::MpscTestContext93*61c4878aSAndroid Build Coastguard Worker void Fill() { pw::stream::Fill(write_buffer, sizeof(write_buffer)); }
94*61c4878aSAndroid Build Coastguard Worker
95*61c4878aSAndroid Build Coastguard Worker // Writes data using the writer.
Writepw::stream::__anonff6ca3ed0111::MpscTestContext96*61c4878aSAndroid Build Coastguard Worker void Write() {
97*61c4878aSAndroid Build Coastguard Worker fnv1a(data, write_hash);
98*61c4878aSAndroid Build Coastguard Worker write_status = writer.Write(data);
99*61c4878aSAndroid Build Coastguard Worker }
100*61c4878aSAndroid Build Coastguard Worker
101*61c4878aSAndroid Build Coastguard Worker // Writes data repeatedly up to the writer's limit.
WriteAllpw::stream::__anonff6ca3ed0111::MpscTestContext102*61c4878aSAndroid Build Coastguard Worker void WriteAll() {
103*61c4878aSAndroid Build Coastguard Worker size_t limit = writer.ConservativeWriteLimit();
104*61c4878aSAndroid Build Coastguard Worker ASSERT_NE(limit, 0U);
105*61c4878aSAndroid Build Coastguard Worker ASSERT_NE(limit, Stream::kUnlimited);
106*61c4878aSAndroid Build Coastguard Worker while (limit != 0) {
107*61c4878aSAndroid Build Coastguard Worker if (limit < kBufSize) {
108*61c4878aSAndroid Build Coastguard Worker data = data.subspan(0, limit);
109*61c4878aSAndroid Build Coastguard Worker }
110*61c4878aSAndroid Build Coastguard Worker Fill();
111*61c4878aSAndroid Build Coastguard Worker Write();
112*61c4878aSAndroid Build Coastguard Worker if (!write_status.ok()) {
113*61c4878aSAndroid Build Coastguard Worker break;
114*61c4878aSAndroid Build Coastguard Worker }
115*61c4878aSAndroid Build Coastguard Worker limit = writer.ConservativeWriteLimit();
116*61c4878aSAndroid Build Coastguard Worker }
117*61c4878aSAndroid Build Coastguard Worker }
118*61c4878aSAndroid Build Coastguard Worker
119*61c4878aSAndroid Build Coastguard Worker // Reads data using the reader.
Readpw::stream::__anonff6ca3ed0111::MpscTestContext120*61c4878aSAndroid Build Coastguard Worker void Read() {
121*61c4878aSAndroid Build Coastguard Worker read_result = reader.Read(destination);
122*61c4878aSAndroid Build Coastguard Worker if (read_result.ok()) {
123*61c4878aSAndroid Build Coastguard Worker fnv1a(*read_result, write_hash);
124*61c4878aSAndroid Build Coastguard Worker total_read += read_result->size();
125*61c4878aSAndroid Build Coastguard Worker }
126*61c4878aSAndroid Build Coastguard Worker }
127*61c4878aSAndroid Build Coastguard Worker
128*61c4878aSAndroid Build Coastguard Worker // Run the given function on a dedicated thread.
129*61c4878aSAndroid Build Coastguard Worker using ThreadBody = Function<void(MpscTestContext* ctx)>;
Spawnpw::stream::__anonff6ca3ed0111::MpscTestContext130*61c4878aSAndroid Build Coastguard Worker void Spawn(ThreadBody func) {
131*61c4878aSAndroid Build Coastguard Worker body_ = std::move(func);
132*61c4878aSAndroid Build Coastguard Worker thread_ = Thread(context_.options(), [this]() { body_(this); });
133*61c4878aSAndroid Build Coastguard Worker }
134*61c4878aSAndroid Build Coastguard Worker
135*61c4878aSAndroid Build Coastguard Worker // Waits for the spawned thread to complete.
Joinpw::stream::__anonff6ca3ed0111::MpscTestContext136*61c4878aSAndroid Build Coastguard Worker void Join() { thread_.join(); }
137*61c4878aSAndroid Build Coastguard Worker
138*61c4878aSAndroid Build Coastguard Worker private:
139*61c4878aSAndroid Build Coastguard Worker Thread thread_;
140*61c4878aSAndroid Build Coastguard Worker thread::test::TestThreadContext context_;
141*61c4878aSAndroid Build Coastguard Worker ThreadBody body_;
142*61c4878aSAndroid Build Coastguard Worker };
143*61c4878aSAndroid Build Coastguard Worker
144*61c4878aSAndroid Build Coastguard Worker ////////////////////////////////////////////////////////////////////////////////
145*61c4878aSAndroid Build Coastguard Worker // Unit tests.
146*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,CopyWriters)147*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, CopyWriters) {
148*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
149*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
150*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx.reader.connected());
151*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx.writer.connected());
152*61c4878aSAndroid Build Coastguard Worker
153*61c4878aSAndroid Build Coastguard Worker MpscWriter writer2(ctx.writer);
154*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx.reader.connected());
155*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx.writer.connected());
156*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(writer2.connected());
157*61c4878aSAndroid Build Coastguard Worker
158*61c4878aSAndroid Build Coastguard Worker MpscWriter writer3 = writer2;
159*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx.reader.connected());
160*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx.writer.connected());
161*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(writer2.connected());
162*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(writer3.connected());
163*61c4878aSAndroid Build Coastguard Worker
164*61c4878aSAndroid Build Coastguard Worker ctx.writer.Close();
165*61c4878aSAndroid Build Coastguard Worker writer2.Close();
166*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx.reader.connected());
167*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(ctx.writer.connected());
168*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(writer2.connected());
169*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(writer3.connected());
170*61c4878aSAndroid Build Coastguard Worker }
171*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,MoveWriters)172*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, MoveWriters) {
173*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
174*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
175*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx.reader.connected());
176*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx.writer.connected());
177*61c4878aSAndroid Build Coastguard Worker
178*61c4878aSAndroid Build Coastguard Worker MpscWriter writer2(std::move(ctx.writer));
179*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx.reader.connected());
180*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(writer2.connected());
181*61c4878aSAndroid Build Coastguard Worker
182*61c4878aSAndroid Build Coastguard Worker MpscWriter writer3 = std::move(writer2);
183*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx.reader.connected());
184*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(writer3.connected());
185*61c4878aSAndroid Build Coastguard Worker
186*61c4878aSAndroid Build Coastguard Worker // Only writer3 should be connected.
187*61c4878aSAndroid Build Coastguard Worker writer3.Close();
188*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(writer3.connected());
189*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(ctx.reader.connected());
190*61c4878aSAndroid Build Coastguard Worker }
191*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,ReadFailsIfDisconnected)192*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, ReadFailsIfDisconnected) {
193*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
194*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
195*61c4878aSAndroid Build Coastguard Worker
196*61c4878aSAndroid Build Coastguard Worker ctx.writer.Close();
197*61c4878aSAndroid Build Coastguard Worker ctx.Read();
198*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
199*61c4878aSAndroid Build Coastguard Worker }
200*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,ReadBlocksWhenEmpty)201*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, ReadBlocksWhenEmpty) {
202*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
203*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
204*61c4878aSAndroid Build Coastguard Worker ctx.reader.SetTimeout(10ms);
205*61c4878aSAndroid Build Coastguard Worker
206*61c4878aSAndroid Build Coastguard Worker auto start = chrono::SystemClock::now();
207*61c4878aSAndroid Build Coastguard Worker ctx.Read();
208*61c4878aSAndroid Build Coastguard Worker auto elapsed = chrono::SystemClock::now() - start;
209*61c4878aSAndroid Build Coastguard Worker
210*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.read_result.status(), Status::ResourceExhausted());
211*61c4878aSAndroid Build Coastguard Worker EXPECT_GE(elapsed, 10ms);
212*61c4878aSAndroid Build Coastguard Worker }
213*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,ReadReturnsAfterReaderClose)214*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, ReadReturnsAfterReaderClose) {
215*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
216*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
217*61c4878aSAndroid Build Coastguard Worker
218*61c4878aSAndroid Build Coastguard Worker ctx.Spawn([](MpscTestContext* inner) { inner->Read(); });
219*61c4878aSAndroid Build Coastguard Worker ctx.reader.Close();
220*61c4878aSAndroid Build Coastguard Worker ctx.Join();
221*61c4878aSAndroid Build Coastguard Worker
222*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
223*61c4878aSAndroid Build Coastguard Worker }
224*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,WriteBlocksUntilTimeout)225*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, WriteBlocksUntilTimeout) {
226*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
227*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
228*61c4878aSAndroid Build Coastguard Worker ctx.writer.SetTimeout(10ms);
229*61c4878aSAndroid Build Coastguard Worker ctx.Fill();
230*61c4878aSAndroid Build Coastguard Worker
231*61c4878aSAndroid Build Coastguard Worker auto start = chrono::SystemClock::now();
232*61c4878aSAndroid Build Coastguard Worker ctx.Write();
233*61c4878aSAndroid Build Coastguard Worker auto elapsed = chrono::SystemClock::now() - start;
234*61c4878aSAndroid Build Coastguard Worker
235*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
236*61c4878aSAndroid Build Coastguard Worker EXPECT_GE(elapsed, 10ms);
237*61c4878aSAndroid Build Coastguard Worker }
238*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,WriteReturnsAfterClose)239*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, WriteReturnsAfterClose) {
240*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
241*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
242*61c4878aSAndroid Build Coastguard Worker
243*61c4878aSAndroid Build Coastguard Worker ctx.Fill();
244*61c4878aSAndroid Build Coastguard Worker ctx.Spawn([](MpscTestContext* inner) { inner->Write(); });
245*61c4878aSAndroid Build Coastguard Worker ctx.reader.Close();
246*61c4878aSAndroid Build Coastguard Worker ctx.Join();
247*61c4878aSAndroid Build Coastguard Worker
248*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.write_status, Status::OutOfRange());
249*61c4878aSAndroid Build Coastguard Worker }
250*61c4878aSAndroid Build Coastguard Worker
VerifyRoundtripImpl(const Vector<std::byte> & data,ByteSpan buffer)251*61c4878aSAndroid Build Coastguard Worker void VerifyRoundtripImpl(const Vector<std::byte>& data, ByteSpan buffer) {
252*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
253*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
254*61c4878aSAndroid Build Coastguard Worker
255*61c4878aSAndroid Build Coastguard Worker ctx.reader.SetBuffer(buffer);
256*61c4878aSAndroid Build Coastguard Worker ctx.data = ConstByteSpan(data.data(), data.size());
257*61c4878aSAndroid Build Coastguard Worker ctx.Spawn([](MpscTestContext* inner) { inner->Write(); });
258*61c4878aSAndroid Build Coastguard Worker size_t offset = 0;
259*61c4878aSAndroid Build Coastguard Worker while (offset < data.size()) {
260*61c4878aSAndroid Build Coastguard Worker ctx.Read();
261*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(ctx.read_result.status(), OkStatus());
262*61c4878aSAndroid Build Coastguard Worker size_t num_read = ctx.read_result->size();
263*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(memcmp(ctx.read_buffer, &data[offset], num_read), 0);
264*61c4878aSAndroid Build Coastguard Worker offset += num_read;
265*61c4878aSAndroid Build Coastguard Worker }
266*61c4878aSAndroid Build Coastguard Worker ctx.Join();
267*61c4878aSAndroid Build Coastguard Worker }
268*61c4878aSAndroid Build Coastguard Worker
269*61c4878aSAndroid Build Coastguard Worker template <size_t kCapacity>
FillAndVerifyRoundtripImpl(ByteSpan buffer)270*61c4878aSAndroid Build Coastguard Worker void FillAndVerifyRoundtripImpl(ByteSpan buffer) {
271*61c4878aSAndroid Build Coastguard Worker Vector<std::byte, kCapacity> data;
272*61c4878aSAndroid Build Coastguard Worker Fill(data.data(), data.size());
273*61c4878aSAndroid Build Coastguard Worker VerifyRoundtripImpl(data, buffer);
274*61c4878aSAndroid Build Coastguard Worker }
275*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,VerifyRoundtripWithoutBufferSmall)276*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, VerifyRoundtripWithoutBufferSmall) {
277*61c4878aSAndroid Build Coastguard Worker FillAndVerifyRoundtripImpl<kBufSize / 2>(ByteSpan());
278*61c4878aSAndroid Build Coastguard Worker }
279*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,VerifyRoundtripWithoutBufferLarge)280*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, VerifyRoundtripWithoutBufferLarge) {
281*61c4878aSAndroid Build Coastguard Worker FillAndVerifyRoundtripImpl<kBufSize * 2>(ByteSpan());
282*61c4878aSAndroid Build Coastguard Worker }
283*61c4878aSAndroid Build Coastguard Worker
VerifyRoundtripWithoutBuffer(const Vector<std::byte> & data)284*61c4878aSAndroid Build Coastguard Worker void VerifyRoundtripWithoutBuffer(const Vector<std::byte>& data) {
285*61c4878aSAndroid Build Coastguard Worker VerifyRoundtripImpl(data, ByteSpan());
286*61c4878aSAndroid Build Coastguard Worker }
287*61c4878aSAndroid Build Coastguard Worker FUZZ_TEST(MpscStreamTest, VerifyRoundtripWithoutBuffer)
288*61c4878aSAndroid Build Coastguard Worker .WithDomains(VectorOf<kBufSize * 2>(Arbitrary<std::byte>()).WithMinSize(1));
289*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,VerifyRoundtripWithBufferSmall)290*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, VerifyRoundtripWithBufferSmall) {
291*61c4878aSAndroid Build Coastguard Worker std::byte buffer[kBufSize];
292*61c4878aSAndroid Build Coastguard Worker FillAndVerifyRoundtripImpl<kBufSize / 2>(buffer);
293*61c4878aSAndroid Build Coastguard Worker }
294*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,VerifyRoundtripWithBufferLarge)295*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, VerifyRoundtripWithBufferLarge) {
296*61c4878aSAndroid Build Coastguard Worker std::byte buffer[kBufSize];
297*61c4878aSAndroid Build Coastguard Worker FillAndVerifyRoundtripImpl<kBufSize * 2>(buffer);
298*61c4878aSAndroid Build Coastguard Worker }
299*61c4878aSAndroid Build Coastguard Worker
VerifyRoundtripWithBuffer(const Vector<std::byte> & data)300*61c4878aSAndroid Build Coastguard Worker void VerifyRoundtripWithBuffer(const Vector<std::byte>& data) {
301*61c4878aSAndroid Build Coastguard Worker std::byte buffer[kBufSize];
302*61c4878aSAndroid Build Coastguard Worker VerifyRoundtripImpl(data, buffer);
303*61c4878aSAndroid Build Coastguard Worker }
304*61c4878aSAndroid Build Coastguard Worker FUZZ_TEST(MpscStreamTest, VerifyRoundtripWithBuffer)
305*61c4878aSAndroid Build Coastguard Worker .WithDomains(VectorOf<kBufSize * 2>(Arbitrary<std::byte>()).WithMinSize(1));
306*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,CanRetryAfterPartialWrite)307*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, CanRetryAfterPartialWrite) {
308*61c4878aSAndroid Build Coastguard Worker constexpr size_t kChunk = kBufSize - 4;
309*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
310*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
311*61c4878aSAndroid Build Coastguard Worker ctx.writer.SetTimeout(10ms);
312*61c4878aSAndroid Build Coastguard Worker ByteSpan destination = ctx.destination;
313*61c4878aSAndroid Build Coastguard Worker
314*61c4878aSAndroid Build Coastguard Worker ctx.Spawn([](MpscTestContext* inner) {
315*61c4878aSAndroid Build Coastguard Worker inner->Fill();
316*61c4878aSAndroid Build Coastguard Worker inner->Write();
317*61c4878aSAndroid Build Coastguard Worker });
318*61c4878aSAndroid Build Coastguard Worker ctx.destination = destination.subspan(0, kChunk);
319*61c4878aSAndroid Build Coastguard Worker ctx.Read();
320*61c4878aSAndroid Build Coastguard Worker ctx.Join();
321*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.read_result.status(), OkStatus());
322*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.read_result->size(), kChunk);
323*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
324*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.writer.last_write(), kChunk);
325*61c4878aSAndroid Build Coastguard Worker
326*61c4878aSAndroid Build Coastguard Worker ctx.Spawn([](MpscTestContext* inner) {
327*61c4878aSAndroid Build Coastguard Worker inner->data = inner->data.subspan(kChunk);
328*61c4878aSAndroid Build Coastguard Worker inner->Write();
329*61c4878aSAndroid Build Coastguard Worker });
330*61c4878aSAndroid Build Coastguard Worker ctx.destination = destination.subspan(kChunk);
331*61c4878aSAndroid Build Coastguard Worker ctx.Read();
332*61c4878aSAndroid Build Coastguard Worker ctx.Join();
333*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.read_result.status(), OkStatus());
334*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.read_result->size(), 4U);
335*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.write_status, OkStatus());
336*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.writer.last_write(), 4U);
337*61c4878aSAndroid Build Coastguard Worker
338*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(memcmp(ctx.write_buffer, ctx.read_buffer, kBufSize), 0);
339*61c4878aSAndroid Build Coastguard Worker }
340*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,CannotReadAfterReaderClose)341*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, CannotReadAfterReaderClose) {
342*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
343*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
344*61c4878aSAndroid Build Coastguard Worker ctx.reader.Close();
345*61c4878aSAndroid Build Coastguard Worker ctx.Read();
346*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
347*61c4878aSAndroid Build Coastguard Worker }
348*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,CanReadAfterWriterCloses)349*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, CanReadAfterWriterCloses) {
350*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
351*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
352*61c4878aSAndroid Build Coastguard Worker std::byte buffer[kBufSize];
353*61c4878aSAndroid Build Coastguard Worker ctx.reader.SetBuffer(buffer);
354*61c4878aSAndroid Build Coastguard Worker ctx.Fill();
355*61c4878aSAndroid Build Coastguard Worker ctx.Write();
356*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.write_status, OkStatus());
357*61c4878aSAndroid Build Coastguard Worker ctx.writer.Close();
358*61c4878aSAndroid Build Coastguard Worker
359*61c4878aSAndroid Build Coastguard Worker ctx.Read();
360*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(ctx.read_result.status(), OkStatus());
361*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(ctx.read_result->size(), kBufSize);
362*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(memcmp(ctx.write_buffer, ctx.read_buffer, kBufSize), 0);
363*61c4878aSAndroid Build Coastguard Worker }
364*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,CannotWriteAfterWriterClose)365*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, CannotWriteAfterWriterClose) {
366*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
367*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
368*61c4878aSAndroid Build Coastguard Worker ctx.Fill();
369*61c4878aSAndroid Build Coastguard Worker ctx.writer.Close();
370*61c4878aSAndroid Build Coastguard Worker ctx.Write();
371*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.write_status, Status::OutOfRange());
372*61c4878aSAndroid Build Coastguard Worker }
373*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,CannotWriteAfterReaderClose)374*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, CannotWriteAfterReaderClose) {
375*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
376*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
377*61c4878aSAndroid Build Coastguard Worker ctx.Fill();
378*61c4878aSAndroid Build Coastguard Worker ctx.reader.Close();
379*61c4878aSAndroid Build Coastguard Worker ctx.Write();
380*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.write_status, Status::OutOfRange());
381*61c4878aSAndroid Build Coastguard Worker }
382*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,MultipleWriters)383*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, MultipleWriters) {
384*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx1;
385*61c4878aSAndroid Build Coastguard Worker ctx1.Connect();
386*61c4878aSAndroid Build Coastguard Worker Vector<std::byte, kBufSize + 1> data1(kBufSize + 1, std::byte(1));
387*61c4878aSAndroid Build Coastguard Worker ctx1.data = ByteSpan(data1.data(), data1.size());
388*61c4878aSAndroid Build Coastguard Worker
389*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx2;
390*61c4878aSAndroid Build Coastguard Worker ctx2.writer = ctx1.writer;
391*61c4878aSAndroid Build Coastguard Worker Vector<std::byte, kBufSize / 2> data2(kBufSize / 2, std::byte(2));
392*61c4878aSAndroid Build Coastguard Worker ctx2.data = ByteSpan(data2.data(), data2.size());
393*61c4878aSAndroid Build Coastguard Worker
394*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx3;
395*61c4878aSAndroid Build Coastguard Worker ctx3.writer = ctx1.writer;
396*61c4878aSAndroid Build Coastguard Worker Vector<std::byte, kBufSize * 3> data3(kBufSize * 3, std::byte(3));
397*61c4878aSAndroid Build Coastguard Worker ctx3.data = ByteSpan(data3.data(), data3.size());
398*61c4878aSAndroid Build Coastguard Worker
399*61c4878aSAndroid Build Coastguard Worker // Start all threads.
400*61c4878aSAndroid Build Coastguard Worker ctx1.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
401*61c4878aSAndroid Build Coastguard Worker ctx2.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
402*61c4878aSAndroid Build Coastguard Worker ctx3.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
403*61c4878aSAndroid Build Coastguard Worker
404*61c4878aSAndroid Build Coastguard Worker // The loop below keeps track of how many contiguous values are read, in order
405*61c4878aSAndroid Build Coastguard Worker // to verify that writes are not split or interleaved.
406*61c4878aSAndroid Build Coastguard Worker size_t expected[4] = {0, data1.size(), data2.size(), data3.size()};
407*61c4878aSAndroid Build Coastguard Worker size_t actual[4] = {0};
408*61c4878aSAndroid Build Coastguard Worker
409*61c4878aSAndroid Build Coastguard Worker size_t total_read = 0;
410*61c4878aSAndroid Build Coastguard Worker auto current = std::byte(0);
411*61c4878aSAndroid Build Coastguard Worker size_t num_current = 0;
412*61c4878aSAndroid Build Coastguard Worker while (total_read < data1.size() + data2.size() + data3.size()) {
413*61c4878aSAndroid Build Coastguard Worker ctx1.Read();
414*61c4878aSAndroid Build Coastguard Worker if (!ctx1.read_result.ok()) {
415*61c4878aSAndroid Build Coastguard Worker break;
416*61c4878aSAndroid Build Coastguard Worker }
417*61c4878aSAndroid Build Coastguard Worker size_t num_read = ctx1.read_result->size();
418*61c4878aSAndroid Build Coastguard Worker for (size_t i = 0; i < num_read; ++i) {
419*61c4878aSAndroid Build Coastguard Worker if (current == ctx1.read_buffer[i]) {
420*61c4878aSAndroid Build Coastguard Worker ++num_current;
421*61c4878aSAndroid Build Coastguard Worker continue;
422*61c4878aSAndroid Build Coastguard Worker }
423*61c4878aSAndroid Build Coastguard Worker actual[size_t(current)] = num_current;
424*61c4878aSAndroid Build Coastguard Worker current = ctx1.read_buffer[i];
425*61c4878aSAndroid Build Coastguard Worker num_current = 1;
426*61c4878aSAndroid Build Coastguard Worker }
427*61c4878aSAndroid Build Coastguard Worker actual[size_t(current)] = num_current;
428*61c4878aSAndroid Build Coastguard Worker total_read += num_read;
429*61c4878aSAndroid Build Coastguard Worker }
430*61c4878aSAndroid Build Coastguard Worker ctx1.reader.Close();
431*61c4878aSAndroid Build Coastguard Worker ctx1.Join();
432*61c4878aSAndroid Build Coastguard Worker ctx2.Join();
433*61c4878aSAndroid Build Coastguard Worker ctx3.Join();
434*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(ctx1.read_result.status(), OkStatus());
435*61c4878aSAndroid Build Coastguard Worker for (size_t i = 0; i < 4; ++i) {
436*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(actual[i], expected[i]);
437*61c4878aSAndroid Build Coastguard Worker }
438*61c4878aSAndroid Build Coastguard Worker }
439*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,GetAndSetLimits)440*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, GetAndSetLimits) {
441*61c4878aSAndroid Build Coastguard Worker MpscReader reader;
442*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader.ConservativeReadLimit(), 0U);
443*61c4878aSAndroid Build Coastguard Worker
444*61c4878aSAndroid Build Coastguard Worker MpscWriter writer;
445*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(writer.ConservativeWriteLimit(), 0U);
446*61c4878aSAndroid Build Coastguard Worker
447*61c4878aSAndroid Build Coastguard Worker CreateMpscStream(reader, writer);
448*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader.ConservativeReadLimit(), Stream::kUnlimited);
449*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(writer.ConservativeWriteLimit(), Stream::kUnlimited);
450*61c4878aSAndroid Build Coastguard Worker
451*61c4878aSAndroid Build Coastguard Worker writer.SetLimit(10);
452*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader.ConservativeReadLimit(), 10U);
453*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(writer.ConservativeWriteLimit(), 10U);
454*61c4878aSAndroid Build Coastguard Worker
455*61c4878aSAndroid Build Coastguard Worker writer.Close();
456*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(reader.ConservativeReadLimit(), 0U);
457*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(writer.ConservativeWriteLimit(), 0U);
458*61c4878aSAndroid Build Coastguard Worker }
459*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,ReaderAggregatesLimit)460*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, ReaderAggregatesLimit) {
461*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
462*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
463*61c4878aSAndroid Build Coastguard Worker ctx.writer.SetLimit(10);
464*61c4878aSAndroid Build Coastguard Worker
465*61c4878aSAndroid Build Coastguard Worker MpscWriter writer2 = ctx.writer;
466*61c4878aSAndroid Build Coastguard Worker writer2.SetLimit(20);
467*61c4878aSAndroid Build Coastguard Worker
468*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.reader.ConservativeReadLimit(), 30U);
469*61c4878aSAndroid Build Coastguard Worker
470*61c4878aSAndroid Build Coastguard Worker ctx.writer.SetLimit(Stream::kUnlimited);
471*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.reader.ConservativeReadLimit(), Stream::kUnlimited);
472*61c4878aSAndroid Build Coastguard Worker
473*61c4878aSAndroid Build Coastguard Worker writer2.SetLimit(40);
474*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.reader.ConservativeReadLimit(), Stream::kUnlimited);
475*61c4878aSAndroid Build Coastguard Worker
476*61c4878aSAndroid Build Coastguard Worker ctx.writer.SetLimit(0);
477*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.reader.ConservativeReadLimit(), 40U);
478*61c4878aSAndroid Build Coastguard Worker }
479*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,ReadingUpdatesLimit)480*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, ReadingUpdatesLimit) {
481*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
482*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
483*61c4878aSAndroid Build Coastguard Worker
484*61c4878aSAndroid Build Coastguard Worker constexpr size_t kChunk = kBufSize - 4;
485*61c4878aSAndroid Build Coastguard Worker std::byte buffer[kBufSize];
486*61c4878aSAndroid Build Coastguard Worker ctx.reader.SetBuffer(buffer);
487*61c4878aSAndroid Build Coastguard Worker ctx.Fill();
488*61c4878aSAndroid Build Coastguard Worker ctx.writer.SetLimit(kBufSize);
489*61c4878aSAndroid Build Coastguard Worker ctx.Write();
490*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.write_status, OkStatus());
491*61c4878aSAndroid Build Coastguard Worker
492*61c4878aSAndroid Build Coastguard Worker ctx.destination = ByteSpan(ctx.read_buffer, kChunk);
493*61c4878aSAndroid Build Coastguard Worker ctx.Read();
494*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.read_result.status(), OkStatus());
495*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.read_result->size(), kChunk);
496*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.reader.ConservativeReadLimit(), kBufSize - kChunk);
497*61c4878aSAndroid Build Coastguard Worker }
498*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,CannotWriteMoreThanLimit)499*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, CannotWriteMoreThanLimit) {
500*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
501*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
502*61c4878aSAndroid Build Coastguard Worker
503*61c4878aSAndroid Build Coastguard Worker std::byte buffer[kBufSize];
504*61c4878aSAndroid Build Coastguard Worker ctx.reader.SetBuffer(buffer);
505*61c4878aSAndroid Build Coastguard Worker ctx.writer.SetLimit(kBufSize - 1);
506*61c4878aSAndroid Build Coastguard Worker ctx.Fill();
507*61c4878aSAndroid Build Coastguard Worker ctx.Write();
508*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
509*61c4878aSAndroid Build Coastguard Worker }
510*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,WritersCanCloseAutomatically)511*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, WritersCanCloseAutomatically) {
512*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx1;
513*61c4878aSAndroid Build Coastguard Worker ctx1.Connect();
514*61c4878aSAndroid Build Coastguard Worker Vector<std::byte, kBufSize + 1> data1(kBufSize + 1, std::byte(1));
515*61c4878aSAndroid Build Coastguard Worker ctx1.writer.SetLimit(data1.size());
516*61c4878aSAndroid Build Coastguard Worker ctx1.data = ByteSpan(data1.data(), data1.size());
517*61c4878aSAndroid Build Coastguard Worker
518*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx2;
519*61c4878aSAndroid Build Coastguard Worker ctx2.writer = ctx1.writer;
520*61c4878aSAndroid Build Coastguard Worker Vector<std::byte, kBufSize / 2> data2(kBufSize / 2, std::byte(2));
521*61c4878aSAndroid Build Coastguard Worker ctx2.writer.SetLimit(data2.size());
522*61c4878aSAndroid Build Coastguard Worker ctx2.data = ByteSpan(data2.data(), data2.size());
523*61c4878aSAndroid Build Coastguard Worker
524*61c4878aSAndroid Build Coastguard Worker // Start all threads.
525*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx1.reader.connected());
526*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx1.writer.connected());
527*61c4878aSAndroid Build Coastguard Worker EXPECT_TRUE(ctx2.writer.connected());
528*61c4878aSAndroid Build Coastguard Worker
529*61c4878aSAndroid Build Coastguard Worker ctx1.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
530*61c4878aSAndroid Build Coastguard Worker ctx2.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
531*61c4878aSAndroid Build Coastguard Worker
532*61c4878aSAndroid Build Coastguard Worker size_t total = 0;
533*61c4878aSAndroid Build Coastguard Worker while (ctx1.reader.ConservativeReadLimit() != 0) {
534*61c4878aSAndroid Build Coastguard Worker ctx1.Read();
535*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx1.read_result.status(), OkStatus());
536*61c4878aSAndroid Build Coastguard Worker if (!ctx1.read_result.ok()) {
537*61c4878aSAndroid Build Coastguard Worker ctx1.reader.Close();
538*61c4878aSAndroid Build Coastguard Worker break;
539*61c4878aSAndroid Build Coastguard Worker }
540*61c4878aSAndroid Build Coastguard Worker total += ctx1.read_result->size();
541*61c4878aSAndroid Build Coastguard Worker }
542*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(total, data1.size() + data2.size());
543*61c4878aSAndroid Build Coastguard Worker ctx1.Join();
544*61c4878aSAndroid Build Coastguard Worker ctx2.Join();
545*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(ctx1.reader.connected());
546*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(ctx1.writer.connected());
547*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(ctx2.writer.connected());
548*61c4878aSAndroid Build Coastguard Worker }
549*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,ReadAllWithoutBuffer)550*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, ReadAllWithoutBuffer) {
551*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
552*61c4878aSAndroid Build Coastguard Worker Status status = ctx.reader.ReadAll([](ConstByteSpan) { return OkStatus(); });
553*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(status, Status::FailedPrecondition());
554*61c4878aSAndroid Build Coastguard Worker }
555*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,ReadAll)556*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, ReadAll) {
557*61c4878aSAndroid Build Coastguard Worker MpscTestContext ctx;
558*61c4878aSAndroid Build Coastguard Worker ctx.Connect();
559*61c4878aSAndroid Build Coastguard Worker
560*61c4878aSAndroid Build Coastguard Worker std::byte buffer[kBufSize];
561*61c4878aSAndroid Build Coastguard Worker ctx.reader.SetBuffer(buffer);
562*61c4878aSAndroid Build Coastguard Worker ctx.writer.SetLimit(kBufSize * 100);
563*61c4878aSAndroid Build Coastguard Worker ctx.Spawn([](MpscTestContext* inner) { inner->WriteAll(); });
564*61c4878aSAndroid Build Coastguard Worker
565*61c4878aSAndroid Build Coastguard Worker Status status = ctx.reader.ReadAll([&ctx](ConstByteSpan data) {
566*61c4878aSAndroid Build Coastguard Worker ctx.total_read += data.size();
567*61c4878aSAndroid Build Coastguard Worker fnv1a(data, ctx.read_hash);
568*61c4878aSAndroid Build Coastguard Worker return OkStatus();
569*61c4878aSAndroid Build Coastguard Worker });
570*61c4878aSAndroid Build Coastguard Worker ctx.Join();
571*61c4878aSAndroid Build Coastguard Worker
572*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(status, OkStatus());
573*61c4878aSAndroid Build Coastguard Worker EXPECT_FALSE(ctx.reader.connected());
574*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.total_read, kBufSize * 100);
575*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(ctx.read_hash, ctx.write_hash);
576*61c4878aSAndroid Build Coastguard Worker }
577*61c4878aSAndroid Build Coastguard Worker
TEST(MpscStreamTest,BufferedMpscReader)578*61c4878aSAndroid Build Coastguard Worker TEST(MpscStreamTest, BufferedMpscReader) {
579*61c4878aSAndroid Build Coastguard Worker BufferedMpscReader<kBufSize> reader;
580*61c4878aSAndroid Build Coastguard Worker MpscWriter writer;
581*61c4878aSAndroid Build Coastguard Worker CreateMpscStream(reader, writer);
582*61c4878aSAndroid Build Coastguard Worker
583*61c4878aSAndroid Build Coastguard Worker // `kBufSize` writes of 1 byte each should fit without blocking.
584*61c4878aSAndroid Build Coastguard Worker for (size_t i = 0; i < kBufSize; ++i) {
585*61c4878aSAndroid Build Coastguard Worker std::byte b{static_cast<uint8_t>(i)};
586*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(writer.Write(ConstByteSpan(&b, 1)), OkStatus());
587*61c4878aSAndroid Build Coastguard Worker }
588*61c4878aSAndroid Build Coastguard Worker
589*61c4878aSAndroid Build Coastguard Worker std::byte rx_buffer[kBufSize];
590*61c4878aSAndroid Build Coastguard Worker auto result = reader.Read(ByteSpan(rx_buffer));
591*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(result.status(), OkStatus());
592*61c4878aSAndroid Build Coastguard Worker ASSERT_EQ(result->size(), kBufSize);
593*61c4878aSAndroid Build Coastguard Worker for (size_t i = 0; i < kBufSize; ++i) {
594*61c4878aSAndroid Build Coastguard Worker EXPECT_EQ(rx_buffer[i], std::byte(i));
595*61c4878aSAndroid Build Coastguard Worker }
596*61c4878aSAndroid Build Coastguard Worker }
597*61c4878aSAndroid Build Coastguard Worker
598*61c4878aSAndroid Build Coastguard Worker } // namespace
599*61c4878aSAndroid Build Coastguard Worker } // namespace pw::stream
600*61c4878aSAndroid Build Coastguard Worker
601*61c4878aSAndroid Build Coastguard Worker #endif // PW_THREAD_JOINING_ENABLED
602