xref: /aosp_15_r20/external/pigweed/pw_stream/mpsc_stream_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_stream/mpsc_stream.h"
16 
17 #include "pw_containers/vector.h"
18 #include "pw_fuzzer/fuzztest.h"
19 #include "pw_random/xor_shift.h"
20 #include "pw_thread/test_thread_context.h"
21 #include "pw_thread/thread.h"
22 #include "pw_unit_test/framework.h"
23 
24 // TODO: https://pwbug.dev/365161669 - Express joinability as a build-system
25 // constraint.
26 #if PW_THREAD_JOINING_ENABLED
27 
28 namespace pw::stream {
29 namespace {
30 
31 using namespace std::chrono_literals;
32 using namespace pw::fuzzer;
33 
34 ////////////////////////////////////////////////////////////////////////////////
35 // Test fixtures.
36 
37 /// Capacity in bytes for data buffers.
38 constexpr size_t kBufSize = 512;
39 
40 /// Fills a byte span with random data.
Fill(std::byte * buf,size_t len)41 void Fill(std::byte* buf, size_t len) {
42   ByteSpan data(buf, len);
43   random::XorShiftStarRng64 rng(1);
44   rng.Get(data);
45 }
46 
47 /// FNV-1a offset basis.
48 constexpr uint64_t kOffsetBasis = 0xcbf29ce484222325ULL;
49 
50 /// FNV-1a prime value.
51 constexpr uint64_t kPrimeValue = 0x100000001b3ULL;
52 
53 /// Quick implementation of public-domain Fowler-Noll-Vo hashing algorithm.
54 ///
55 /// This is used in the tests below to verify equality of two sequences of bytes
56 /// that are too large to compare directly.
57 ///
58 /// See http://www.isthe.com/chongo/tech/comp/fnv/index.html
fnv1a(ConstByteSpan bytes,uint64_t & hash)59 void fnv1a(ConstByteSpan bytes, uint64_t& hash) {
60   for (const auto& b : bytes) {
61     hash = (hash ^ static_cast<uint8_t>(b)) * kPrimeValue;
62   }
63 }
64 
65 /// MpscStream test context that uses a generic reader.
66 ///
67 /// This struct associates a reader and writer with their parameters and return
68 /// values. This is useful for communicating with threads spawned to call a
69 /// blocking method.
70 struct MpscTestContext {
71   MpscWriter writer;
72   MpscReader reader;
73 
74   ConstByteSpan data;
75   std::byte write_buffer[kBufSize];
76   uint64_t write_hash = kOffsetBasis;
77   Status write_status;
78 
79   ByteSpan destination;
80   std::byte read_buffer[kBufSize];
81   Result<ByteSpan> read_result;
82   uint64_t read_hash = kOffsetBasis;
83   size_t total_read = 0;
84 
MpscTestContextpw::stream::__anonff6ca3ed0111::MpscTestContext85   MpscTestContext() {
86     data = ConstByteSpan(write_buffer);
87     destination = ByteSpan(read_buffer);
88   }
89 
Connectpw::stream::__anonff6ca3ed0111::MpscTestContext90   void Connect() { CreateMpscStream(reader, writer); }
91 
92   // Fills a byte span with random data.
Fillpw::stream::__anonff6ca3ed0111::MpscTestContext93   void Fill() { pw::stream::Fill(write_buffer, sizeof(write_buffer)); }
94 
95   // Writes data using the writer.
Writepw::stream::__anonff6ca3ed0111::MpscTestContext96   void Write() {
97     fnv1a(data, write_hash);
98     write_status = writer.Write(data);
99   }
100 
101   // Writes data repeatedly up to the writer's limit.
WriteAllpw::stream::__anonff6ca3ed0111::MpscTestContext102   void WriteAll() {
103     size_t limit = writer.ConservativeWriteLimit();
104     ASSERT_NE(limit, 0U);
105     ASSERT_NE(limit, Stream::kUnlimited);
106     while (limit != 0) {
107       if (limit < kBufSize) {
108         data = data.subspan(0, limit);
109       }
110       Fill();
111       Write();
112       if (!write_status.ok()) {
113         break;
114       }
115       limit = writer.ConservativeWriteLimit();
116     }
117   }
118 
119   // Reads data using the reader.
Readpw::stream::__anonff6ca3ed0111::MpscTestContext120   void Read() {
121     read_result = reader.Read(destination);
122     if (read_result.ok()) {
123       fnv1a(*read_result, write_hash);
124       total_read += read_result->size();
125     }
126   }
127 
128   // Run the given function on a dedicated thread.
129   using ThreadBody = Function<void(MpscTestContext* ctx)>;
Spawnpw::stream::__anonff6ca3ed0111::MpscTestContext130   void Spawn(ThreadBody func) {
131     body_ = std::move(func);
132     thread_ = Thread(context_.options(), [this]() { body_(this); });
133   }
134 
135   // Waits for the spawned thread to complete.
Joinpw::stream::__anonff6ca3ed0111::MpscTestContext136   void Join() { thread_.join(); }
137 
138  private:
139   Thread thread_;
140   thread::test::TestThreadContext context_;
141   ThreadBody body_;
142 };
143 
144 ////////////////////////////////////////////////////////////////////////////////
145 // Unit tests.
146 
TEST(MpscStreamTest,CopyWriters)147 TEST(MpscStreamTest, CopyWriters) {
148   MpscTestContext ctx;
149   ctx.Connect();
150   EXPECT_TRUE(ctx.reader.connected());
151   EXPECT_TRUE(ctx.writer.connected());
152 
153   MpscWriter writer2(ctx.writer);
154   EXPECT_TRUE(ctx.reader.connected());
155   EXPECT_TRUE(ctx.writer.connected());
156   EXPECT_TRUE(writer2.connected());
157 
158   MpscWriter writer3 = writer2;
159   EXPECT_TRUE(ctx.reader.connected());
160   EXPECT_TRUE(ctx.writer.connected());
161   EXPECT_TRUE(writer2.connected());
162   EXPECT_TRUE(writer3.connected());
163 
164   ctx.writer.Close();
165   writer2.Close();
166   EXPECT_TRUE(ctx.reader.connected());
167   EXPECT_FALSE(ctx.writer.connected());
168   EXPECT_FALSE(writer2.connected());
169   EXPECT_TRUE(writer3.connected());
170 }
171 
TEST(MpscStreamTest,MoveWriters)172 TEST(MpscStreamTest, MoveWriters) {
173   MpscTestContext ctx;
174   ctx.Connect();
175   EXPECT_TRUE(ctx.reader.connected());
176   EXPECT_TRUE(ctx.writer.connected());
177 
178   MpscWriter writer2(std::move(ctx.writer));
179   EXPECT_TRUE(ctx.reader.connected());
180   EXPECT_TRUE(writer2.connected());
181 
182   MpscWriter writer3 = std::move(writer2);
183   EXPECT_TRUE(ctx.reader.connected());
184   EXPECT_TRUE(writer3.connected());
185 
186   // Only writer3 should be connected.
187   writer3.Close();
188   EXPECT_FALSE(writer3.connected());
189   EXPECT_FALSE(ctx.reader.connected());
190 }
191 
TEST(MpscStreamTest,ReadFailsIfDisconnected)192 TEST(MpscStreamTest, ReadFailsIfDisconnected) {
193   MpscTestContext ctx;
194   ctx.Connect();
195 
196   ctx.writer.Close();
197   ctx.Read();
198   EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
199 }
200 
TEST(MpscStreamTest,ReadBlocksWhenEmpty)201 TEST(MpscStreamTest, ReadBlocksWhenEmpty) {
202   MpscTestContext ctx;
203   ctx.Connect();
204   ctx.reader.SetTimeout(10ms);
205 
206   auto start = chrono::SystemClock::now();
207   ctx.Read();
208   auto elapsed = chrono::SystemClock::now() - start;
209 
210   EXPECT_EQ(ctx.read_result.status(), Status::ResourceExhausted());
211   EXPECT_GE(elapsed, 10ms);
212 }
213 
TEST(MpscStreamTest,ReadReturnsAfterReaderClose)214 TEST(MpscStreamTest, ReadReturnsAfterReaderClose) {
215   MpscTestContext ctx;
216   ctx.Connect();
217 
218   ctx.Spawn([](MpscTestContext* inner) { inner->Read(); });
219   ctx.reader.Close();
220   ctx.Join();
221 
222   EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
223 }
224 
TEST(MpscStreamTest,WriteBlocksUntilTimeout)225 TEST(MpscStreamTest, WriteBlocksUntilTimeout) {
226   MpscTestContext ctx;
227   ctx.Connect();
228   ctx.writer.SetTimeout(10ms);
229   ctx.Fill();
230 
231   auto start = chrono::SystemClock::now();
232   ctx.Write();
233   auto elapsed = chrono::SystemClock::now() - start;
234 
235   EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
236   EXPECT_GE(elapsed, 10ms);
237 }
238 
TEST(MpscStreamTest,WriteReturnsAfterClose)239 TEST(MpscStreamTest, WriteReturnsAfterClose) {
240   MpscTestContext ctx;
241   ctx.Connect();
242 
243   ctx.Fill();
244   ctx.Spawn([](MpscTestContext* inner) { inner->Write(); });
245   ctx.reader.Close();
246   ctx.Join();
247 
248   EXPECT_EQ(ctx.write_status, Status::OutOfRange());
249 }
250 
VerifyRoundtripImpl(const Vector<std::byte> & data,ByteSpan buffer)251 void VerifyRoundtripImpl(const Vector<std::byte>& data, ByteSpan buffer) {
252   MpscTestContext ctx;
253   ctx.Connect();
254 
255   ctx.reader.SetBuffer(buffer);
256   ctx.data = ConstByteSpan(data.data(), data.size());
257   ctx.Spawn([](MpscTestContext* inner) { inner->Write(); });
258   size_t offset = 0;
259   while (offset < data.size()) {
260     ctx.Read();
261     ASSERT_EQ(ctx.read_result.status(), OkStatus());
262     size_t num_read = ctx.read_result->size();
263     EXPECT_EQ(memcmp(ctx.read_buffer, &data[offset], num_read), 0);
264     offset += num_read;
265   }
266   ctx.Join();
267 }
268 
269 template <size_t kCapacity>
FillAndVerifyRoundtripImpl(ByteSpan buffer)270 void FillAndVerifyRoundtripImpl(ByteSpan buffer) {
271   Vector<std::byte, kCapacity> data;
272   Fill(data.data(), data.size());
273   VerifyRoundtripImpl(data, buffer);
274 }
275 
TEST(MpscStreamTest,VerifyRoundtripWithoutBufferSmall)276 TEST(MpscStreamTest, VerifyRoundtripWithoutBufferSmall) {
277   FillAndVerifyRoundtripImpl<kBufSize / 2>(ByteSpan());
278 }
279 
TEST(MpscStreamTest,VerifyRoundtripWithoutBufferLarge)280 TEST(MpscStreamTest, VerifyRoundtripWithoutBufferLarge) {
281   FillAndVerifyRoundtripImpl<kBufSize * 2>(ByteSpan());
282 }
283 
VerifyRoundtripWithoutBuffer(const Vector<std::byte> & data)284 void VerifyRoundtripWithoutBuffer(const Vector<std::byte>& data) {
285   VerifyRoundtripImpl(data, ByteSpan());
286 }
287 FUZZ_TEST(MpscStreamTest, VerifyRoundtripWithoutBuffer)
288     .WithDomains(VectorOf<kBufSize * 2>(Arbitrary<std::byte>()).WithMinSize(1));
289 
TEST(MpscStreamTest,VerifyRoundtripWithBufferSmall)290 TEST(MpscStreamTest, VerifyRoundtripWithBufferSmall) {
291   std::byte buffer[kBufSize];
292   FillAndVerifyRoundtripImpl<kBufSize / 2>(buffer);
293 }
294 
TEST(MpscStreamTest,VerifyRoundtripWithBufferLarge)295 TEST(MpscStreamTest, VerifyRoundtripWithBufferLarge) {
296   std::byte buffer[kBufSize];
297   FillAndVerifyRoundtripImpl<kBufSize * 2>(buffer);
298 }
299 
VerifyRoundtripWithBuffer(const Vector<std::byte> & data)300 void VerifyRoundtripWithBuffer(const Vector<std::byte>& data) {
301   std::byte buffer[kBufSize];
302   VerifyRoundtripImpl(data, buffer);
303 }
304 FUZZ_TEST(MpscStreamTest, VerifyRoundtripWithBuffer)
305     .WithDomains(VectorOf<kBufSize * 2>(Arbitrary<std::byte>()).WithMinSize(1));
306 
TEST(MpscStreamTest,CanRetryAfterPartialWrite)307 TEST(MpscStreamTest, CanRetryAfterPartialWrite) {
308   constexpr size_t kChunk = kBufSize - 4;
309   MpscTestContext ctx;
310   ctx.Connect();
311   ctx.writer.SetTimeout(10ms);
312   ByteSpan destination = ctx.destination;
313 
314   ctx.Spawn([](MpscTestContext* inner) {
315     inner->Fill();
316     inner->Write();
317   });
318   ctx.destination = destination.subspan(0, kChunk);
319   ctx.Read();
320   ctx.Join();
321   EXPECT_EQ(ctx.read_result.status(), OkStatus());
322   EXPECT_EQ(ctx.read_result->size(), kChunk);
323   EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
324   EXPECT_EQ(ctx.writer.last_write(), kChunk);
325 
326   ctx.Spawn([](MpscTestContext* inner) {
327     inner->data = inner->data.subspan(kChunk);
328     inner->Write();
329   });
330   ctx.destination = destination.subspan(kChunk);
331   ctx.Read();
332   ctx.Join();
333   EXPECT_EQ(ctx.read_result.status(), OkStatus());
334   EXPECT_EQ(ctx.read_result->size(), 4U);
335   EXPECT_EQ(ctx.write_status, OkStatus());
336   EXPECT_EQ(ctx.writer.last_write(), 4U);
337 
338   EXPECT_EQ(memcmp(ctx.write_buffer, ctx.read_buffer, kBufSize), 0);
339 }
340 
TEST(MpscStreamTest,CannotReadAfterReaderClose)341 TEST(MpscStreamTest, CannotReadAfterReaderClose) {
342   MpscTestContext ctx;
343   ctx.Connect();
344   ctx.reader.Close();
345   ctx.Read();
346   EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
347 }
348 
TEST(MpscStreamTest,CanReadAfterWriterCloses)349 TEST(MpscStreamTest, CanReadAfterWriterCloses) {
350   MpscTestContext ctx;
351   ctx.Connect();
352   std::byte buffer[kBufSize];
353   ctx.reader.SetBuffer(buffer);
354   ctx.Fill();
355   ctx.Write();
356   EXPECT_EQ(ctx.write_status, OkStatus());
357   ctx.writer.Close();
358 
359   ctx.Read();
360   ASSERT_EQ(ctx.read_result.status(), OkStatus());
361   ASSERT_EQ(ctx.read_result->size(), kBufSize);
362   EXPECT_EQ(memcmp(ctx.write_buffer, ctx.read_buffer, kBufSize), 0);
363 }
364 
TEST(MpscStreamTest,CannotWriteAfterWriterClose)365 TEST(MpscStreamTest, CannotWriteAfterWriterClose) {
366   MpscTestContext ctx;
367   ctx.Connect();
368   ctx.Fill();
369   ctx.writer.Close();
370   ctx.Write();
371   EXPECT_EQ(ctx.write_status, Status::OutOfRange());
372 }
373 
TEST(MpscStreamTest,CannotWriteAfterReaderClose)374 TEST(MpscStreamTest, CannotWriteAfterReaderClose) {
375   MpscTestContext ctx;
376   ctx.Connect();
377   ctx.Fill();
378   ctx.reader.Close();
379   ctx.Write();
380   EXPECT_EQ(ctx.write_status, Status::OutOfRange());
381 }
382 
TEST(MpscStreamTest,MultipleWriters)383 TEST(MpscStreamTest, MultipleWriters) {
384   MpscTestContext ctx1;
385   ctx1.Connect();
386   Vector<std::byte, kBufSize + 1> data1(kBufSize + 1, std::byte(1));
387   ctx1.data = ByteSpan(data1.data(), data1.size());
388 
389   MpscTestContext ctx2;
390   ctx2.writer = ctx1.writer;
391   Vector<std::byte, kBufSize / 2> data2(kBufSize / 2, std::byte(2));
392   ctx2.data = ByteSpan(data2.data(), data2.size());
393 
394   MpscTestContext ctx3;
395   ctx3.writer = ctx1.writer;
396   Vector<std::byte, kBufSize * 3> data3(kBufSize * 3, std::byte(3));
397   ctx3.data = ByteSpan(data3.data(), data3.size());
398 
399   // Start all threads.
400   ctx1.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
401   ctx2.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
402   ctx3.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
403 
404   // The loop below keeps track of how many contiguous values are read, in order
405   // to verify that writes are not split or interleaved.
406   size_t expected[4] = {0, data1.size(), data2.size(), data3.size()};
407   size_t actual[4] = {0};
408 
409   size_t total_read = 0;
410   auto current = std::byte(0);
411   size_t num_current = 0;
412   while (total_read < data1.size() + data2.size() + data3.size()) {
413     ctx1.Read();
414     if (!ctx1.read_result.ok()) {
415       break;
416     }
417     size_t num_read = ctx1.read_result->size();
418     for (size_t i = 0; i < num_read; ++i) {
419       if (current == ctx1.read_buffer[i]) {
420         ++num_current;
421         continue;
422       }
423       actual[size_t(current)] = num_current;
424       current = ctx1.read_buffer[i];
425       num_current = 1;
426     }
427     actual[size_t(current)] = num_current;
428     total_read += num_read;
429   }
430   ctx1.reader.Close();
431   ctx1.Join();
432   ctx2.Join();
433   ctx3.Join();
434   ASSERT_EQ(ctx1.read_result.status(), OkStatus());
435   for (size_t i = 0; i < 4; ++i) {
436     EXPECT_EQ(actual[i], expected[i]);
437   }
438 }
439 
TEST(MpscStreamTest,GetAndSetLimits)440 TEST(MpscStreamTest, GetAndSetLimits) {
441   MpscReader reader;
442   EXPECT_EQ(reader.ConservativeReadLimit(), 0U);
443 
444   MpscWriter writer;
445   EXPECT_EQ(writer.ConservativeWriteLimit(), 0U);
446 
447   CreateMpscStream(reader, writer);
448   EXPECT_EQ(reader.ConservativeReadLimit(), Stream::kUnlimited);
449   EXPECT_EQ(writer.ConservativeWriteLimit(), Stream::kUnlimited);
450 
451   writer.SetLimit(10);
452   EXPECT_EQ(reader.ConservativeReadLimit(), 10U);
453   EXPECT_EQ(writer.ConservativeWriteLimit(), 10U);
454 
455   writer.Close();
456   EXPECT_EQ(reader.ConservativeReadLimit(), 0U);
457   EXPECT_EQ(writer.ConservativeWriteLimit(), 0U);
458 }
459 
TEST(MpscStreamTest,ReaderAggregatesLimit)460 TEST(MpscStreamTest, ReaderAggregatesLimit) {
461   MpscTestContext ctx;
462   ctx.Connect();
463   ctx.writer.SetLimit(10);
464 
465   MpscWriter writer2 = ctx.writer;
466   writer2.SetLimit(20);
467 
468   EXPECT_EQ(ctx.reader.ConservativeReadLimit(), 30U);
469 
470   ctx.writer.SetLimit(Stream::kUnlimited);
471   EXPECT_EQ(ctx.reader.ConservativeReadLimit(), Stream::kUnlimited);
472 
473   writer2.SetLimit(40);
474   EXPECT_EQ(ctx.reader.ConservativeReadLimit(), Stream::kUnlimited);
475 
476   ctx.writer.SetLimit(0);
477   EXPECT_EQ(ctx.reader.ConservativeReadLimit(), 40U);
478 }
479 
TEST(MpscStreamTest,ReadingUpdatesLimit)480 TEST(MpscStreamTest, ReadingUpdatesLimit) {
481   MpscTestContext ctx;
482   ctx.Connect();
483 
484   constexpr size_t kChunk = kBufSize - 4;
485   std::byte buffer[kBufSize];
486   ctx.reader.SetBuffer(buffer);
487   ctx.Fill();
488   ctx.writer.SetLimit(kBufSize);
489   ctx.Write();
490   EXPECT_EQ(ctx.write_status, OkStatus());
491 
492   ctx.destination = ByteSpan(ctx.read_buffer, kChunk);
493   ctx.Read();
494   EXPECT_EQ(ctx.read_result.status(), OkStatus());
495   EXPECT_EQ(ctx.read_result->size(), kChunk);
496   EXPECT_EQ(ctx.reader.ConservativeReadLimit(), kBufSize - kChunk);
497 }
498 
TEST(MpscStreamTest,CannotWriteMoreThanLimit)499 TEST(MpscStreamTest, CannotWriteMoreThanLimit) {
500   MpscTestContext ctx;
501   ctx.Connect();
502 
503   std::byte buffer[kBufSize];
504   ctx.reader.SetBuffer(buffer);
505   ctx.writer.SetLimit(kBufSize - 1);
506   ctx.Fill();
507   ctx.Write();
508   EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
509 }
510 
TEST(MpscStreamTest,WritersCanCloseAutomatically)511 TEST(MpscStreamTest, WritersCanCloseAutomatically) {
512   MpscTestContext ctx1;
513   ctx1.Connect();
514   Vector<std::byte, kBufSize + 1> data1(kBufSize + 1, std::byte(1));
515   ctx1.writer.SetLimit(data1.size());
516   ctx1.data = ByteSpan(data1.data(), data1.size());
517 
518   MpscTestContext ctx2;
519   ctx2.writer = ctx1.writer;
520   Vector<std::byte, kBufSize / 2> data2(kBufSize / 2, std::byte(2));
521   ctx2.writer.SetLimit(data2.size());
522   ctx2.data = ByteSpan(data2.data(), data2.size());
523 
524   // Start all threads.
525   EXPECT_TRUE(ctx1.reader.connected());
526   EXPECT_TRUE(ctx1.writer.connected());
527   EXPECT_TRUE(ctx2.writer.connected());
528 
529   ctx1.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
530   ctx2.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
531 
532   size_t total = 0;
533   while (ctx1.reader.ConservativeReadLimit() != 0) {
534     ctx1.Read();
535     EXPECT_EQ(ctx1.read_result.status(), OkStatus());
536     if (!ctx1.read_result.ok()) {
537       ctx1.reader.Close();
538       break;
539     }
540     total += ctx1.read_result->size();
541   }
542   EXPECT_EQ(total, data1.size() + data2.size());
543   ctx1.Join();
544   ctx2.Join();
545   EXPECT_FALSE(ctx1.reader.connected());
546   EXPECT_FALSE(ctx1.writer.connected());
547   EXPECT_FALSE(ctx2.writer.connected());
548 }
549 
TEST(MpscStreamTest,ReadAllWithoutBuffer)550 TEST(MpscStreamTest, ReadAllWithoutBuffer) {
551   MpscTestContext ctx;
552   Status status = ctx.reader.ReadAll([](ConstByteSpan) { return OkStatus(); });
553   EXPECT_EQ(status, Status::FailedPrecondition());
554 }
555 
TEST(MpscStreamTest,ReadAll)556 TEST(MpscStreamTest, ReadAll) {
557   MpscTestContext ctx;
558   ctx.Connect();
559 
560   std::byte buffer[kBufSize];
561   ctx.reader.SetBuffer(buffer);
562   ctx.writer.SetLimit(kBufSize * 100);
563   ctx.Spawn([](MpscTestContext* inner) { inner->WriteAll(); });
564 
565   Status status = ctx.reader.ReadAll([&ctx](ConstByteSpan data) {
566     ctx.total_read += data.size();
567     fnv1a(data, ctx.read_hash);
568     return OkStatus();
569   });
570   ctx.Join();
571 
572   EXPECT_EQ(status, OkStatus());
573   EXPECT_FALSE(ctx.reader.connected());
574   EXPECT_EQ(ctx.total_read, kBufSize * 100);
575   EXPECT_EQ(ctx.read_hash, ctx.write_hash);
576 }
577 
TEST(MpscStreamTest,BufferedMpscReader)578 TEST(MpscStreamTest, BufferedMpscReader) {
579   BufferedMpscReader<kBufSize> reader;
580   MpscWriter writer;
581   CreateMpscStream(reader, writer);
582 
583   // `kBufSize` writes of 1 byte each should fit without blocking.
584   for (size_t i = 0; i < kBufSize; ++i) {
585     std::byte b{static_cast<uint8_t>(i)};
586     EXPECT_EQ(writer.Write(ConstByteSpan(&b, 1)), OkStatus());
587   }
588 
589   std::byte rx_buffer[kBufSize];
590   auto result = reader.Read(ByteSpan(rx_buffer));
591   ASSERT_EQ(result.status(), OkStatus());
592   ASSERT_EQ(result->size(), kBufSize);
593   for (size_t i = 0; i < kBufSize; ++i) {
594     EXPECT_EQ(rx_buffer[i], std::byte(i));
595   }
596 }
597 
598 }  // namespace
599 }  // namespace pw::stream
600 
601 #endif  // PW_THREAD_JOINING_ENABLED
602