1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_stream/mpsc_stream.h"
16
17 #include "pw_containers/vector.h"
18 #include "pw_fuzzer/fuzztest.h"
19 #include "pw_random/xor_shift.h"
20 #include "pw_thread/test_thread_context.h"
21 #include "pw_thread/thread.h"
22 #include "pw_unit_test/framework.h"
23
24 // TODO: https://pwbug.dev/365161669 - Express joinability as a build-system
25 // constraint.
26 #if PW_THREAD_JOINING_ENABLED
27
28 namespace pw::stream {
29 namespace {
30
31 using namespace std::chrono_literals;
32 using namespace pw::fuzzer;
33
34 ////////////////////////////////////////////////////////////////////////////////
35 // Test fixtures.
36
37 /// Capacity in bytes for data buffers.
38 constexpr size_t kBufSize = 512;
39
40 /// Fills a byte span with random data.
Fill(std::byte * buf,size_t len)41 void Fill(std::byte* buf, size_t len) {
42 ByteSpan data(buf, len);
43 random::XorShiftStarRng64 rng(1);
44 rng.Get(data);
45 }
46
47 /// FNV-1a offset basis.
48 constexpr uint64_t kOffsetBasis = 0xcbf29ce484222325ULL;
49
50 /// FNV-1a prime value.
51 constexpr uint64_t kPrimeValue = 0x100000001b3ULL;
52
53 /// Quick implementation of public-domain Fowler-Noll-Vo hashing algorithm.
54 ///
55 /// This is used in the tests below to verify equality of two sequences of bytes
56 /// that are too large to compare directly.
57 ///
58 /// See http://www.isthe.com/chongo/tech/comp/fnv/index.html
fnv1a(ConstByteSpan bytes,uint64_t & hash)59 void fnv1a(ConstByteSpan bytes, uint64_t& hash) {
60 for (const auto& b : bytes) {
61 hash = (hash ^ static_cast<uint8_t>(b)) * kPrimeValue;
62 }
63 }
64
65 /// MpscStream test context that uses a generic reader.
66 ///
67 /// This struct associates a reader and writer with their parameters and return
68 /// values. This is useful for communicating with threads spawned to call a
69 /// blocking method.
70 struct MpscTestContext {
71 MpscWriter writer;
72 MpscReader reader;
73
74 ConstByteSpan data;
75 std::byte write_buffer[kBufSize];
76 uint64_t write_hash = kOffsetBasis;
77 Status write_status;
78
79 ByteSpan destination;
80 std::byte read_buffer[kBufSize];
81 Result<ByteSpan> read_result;
82 uint64_t read_hash = kOffsetBasis;
83 size_t total_read = 0;
84
MpscTestContextpw::stream::__anonff6ca3ed0111::MpscTestContext85 MpscTestContext() {
86 data = ConstByteSpan(write_buffer);
87 destination = ByteSpan(read_buffer);
88 }
89
Connectpw::stream::__anonff6ca3ed0111::MpscTestContext90 void Connect() { CreateMpscStream(reader, writer); }
91
92 // Fills a byte span with random data.
Fillpw::stream::__anonff6ca3ed0111::MpscTestContext93 void Fill() { pw::stream::Fill(write_buffer, sizeof(write_buffer)); }
94
95 // Writes data using the writer.
Writepw::stream::__anonff6ca3ed0111::MpscTestContext96 void Write() {
97 fnv1a(data, write_hash);
98 write_status = writer.Write(data);
99 }
100
101 // Writes data repeatedly up to the writer's limit.
WriteAllpw::stream::__anonff6ca3ed0111::MpscTestContext102 void WriteAll() {
103 size_t limit = writer.ConservativeWriteLimit();
104 ASSERT_NE(limit, 0U);
105 ASSERT_NE(limit, Stream::kUnlimited);
106 while (limit != 0) {
107 if (limit < kBufSize) {
108 data = data.subspan(0, limit);
109 }
110 Fill();
111 Write();
112 if (!write_status.ok()) {
113 break;
114 }
115 limit = writer.ConservativeWriteLimit();
116 }
117 }
118
119 // Reads data using the reader.
Readpw::stream::__anonff6ca3ed0111::MpscTestContext120 void Read() {
121 read_result = reader.Read(destination);
122 if (read_result.ok()) {
123 fnv1a(*read_result, write_hash);
124 total_read += read_result->size();
125 }
126 }
127
128 // Run the given function on a dedicated thread.
129 using ThreadBody = Function<void(MpscTestContext* ctx)>;
Spawnpw::stream::__anonff6ca3ed0111::MpscTestContext130 void Spawn(ThreadBody func) {
131 body_ = std::move(func);
132 thread_ = Thread(context_.options(), [this]() { body_(this); });
133 }
134
135 // Waits for the spawned thread to complete.
Joinpw::stream::__anonff6ca3ed0111::MpscTestContext136 void Join() { thread_.join(); }
137
138 private:
139 Thread thread_;
140 thread::test::TestThreadContext context_;
141 ThreadBody body_;
142 };
143
144 ////////////////////////////////////////////////////////////////////////////////
145 // Unit tests.
146
TEST(MpscStreamTest,CopyWriters)147 TEST(MpscStreamTest, CopyWriters) {
148 MpscTestContext ctx;
149 ctx.Connect();
150 EXPECT_TRUE(ctx.reader.connected());
151 EXPECT_TRUE(ctx.writer.connected());
152
153 MpscWriter writer2(ctx.writer);
154 EXPECT_TRUE(ctx.reader.connected());
155 EXPECT_TRUE(ctx.writer.connected());
156 EXPECT_TRUE(writer2.connected());
157
158 MpscWriter writer3 = writer2;
159 EXPECT_TRUE(ctx.reader.connected());
160 EXPECT_TRUE(ctx.writer.connected());
161 EXPECT_TRUE(writer2.connected());
162 EXPECT_TRUE(writer3.connected());
163
164 ctx.writer.Close();
165 writer2.Close();
166 EXPECT_TRUE(ctx.reader.connected());
167 EXPECT_FALSE(ctx.writer.connected());
168 EXPECT_FALSE(writer2.connected());
169 EXPECT_TRUE(writer3.connected());
170 }
171
TEST(MpscStreamTest,MoveWriters)172 TEST(MpscStreamTest, MoveWriters) {
173 MpscTestContext ctx;
174 ctx.Connect();
175 EXPECT_TRUE(ctx.reader.connected());
176 EXPECT_TRUE(ctx.writer.connected());
177
178 MpscWriter writer2(std::move(ctx.writer));
179 EXPECT_TRUE(ctx.reader.connected());
180 EXPECT_TRUE(writer2.connected());
181
182 MpscWriter writer3 = std::move(writer2);
183 EXPECT_TRUE(ctx.reader.connected());
184 EXPECT_TRUE(writer3.connected());
185
186 // Only writer3 should be connected.
187 writer3.Close();
188 EXPECT_FALSE(writer3.connected());
189 EXPECT_FALSE(ctx.reader.connected());
190 }
191
TEST(MpscStreamTest,ReadFailsIfDisconnected)192 TEST(MpscStreamTest, ReadFailsIfDisconnected) {
193 MpscTestContext ctx;
194 ctx.Connect();
195
196 ctx.writer.Close();
197 ctx.Read();
198 EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
199 }
200
TEST(MpscStreamTest,ReadBlocksWhenEmpty)201 TEST(MpscStreamTest, ReadBlocksWhenEmpty) {
202 MpscTestContext ctx;
203 ctx.Connect();
204 ctx.reader.SetTimeout(10ms);
205
206 auto start = chrono::SystemClock::now();
207 ctx.Read();
208 auto elapsed = chrono::SystemClock::now() - start;
209
210 EXPECT_EQ(ctx.read_result.status(), Status::ResourceExhausted());
211 EXPECT_GE(elapsed, 10ms);
212 }
213
TEST(MpscStreamTest,ReadReturnsAfterReaderClose)214 TEST(MpscStreamTest, ReadReturnsAfterReaderClose) {
215 MpscTestContext ctx;
216 ctx.Connect();
217
218 ctx.Spawn([](MpscTestContext* inner) { inner->Read(); });
219 ctx.reader.Close();
220 ctx.Join();
221
222 EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
223 }
224
TEST(MpscStreamTest,WriteBlocksUntilTimeout)225 TEST(MpscStreamTest, WriteBlocksUntilTimeout) {
226 MpscTestContext ctx;
227 ctx.Connect();
228 ctx.writer.SetTimeout(10ms);
229 ctx.Fill();
230
231 auto start = chrono::SystemClock::now();
232 ctx.Write();
233 auto elapsed = chrono::SystemClock::now() - start;
234
235 EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
236 EXPECT_GE(elapsed, 10ms);
237 }
238
TEST(MpscStreamTest,WriteReturnsAfterClose)239 TEST(MpscStreamTest, WriteReturnsAfterClose) {
240 MpscTestContext ctx;
241 ctx.Connect();
242
243 ctx.Fill();
244 ctx.Spawn([](MpscTestContext* inner) { inner->Write(); });
245 ctx.reader.Close();
246 ctx.Join();
247
248 EXPECT_EQ(ctx.write_status, Status::OutOfRange());
249 }
250
VerifyRoundtripImpl(const Vector<std::byte> & data,ByteSpan buffer)251 void VerifyRoundtripImpl(const Vector<std::byte>& data, ByteSpan buffer) {
252 MpscTestContext ctx;
253 ctx.Connect();
254
255 ctx.reader.SetBuffer(buffer);
256 ctx.data = ConstByteSpan(data.data(), data.size());
257 ctx.Spawn([](MpscTestContext* inner) { inner->Write(); });
258 size_t offset = 0;
259 while (offset < data.size()) {
260 ctx.Read();
261 ASSERT_EQ(ctx.read_result.status(), OkStatus());
262 size_t num_read = ctx.read_result->size();
263 EXPECT_EQ(memcmp(ctx.read_buffer, &data[offset], num_read), 0);
264 offset += num_read;
265 }
266 ctx.Join();
267 }
268
269 template <size_t kCapacity>
FillAndVerifyRoundtripImpl(ByteSpan buffer)270 void FillAndVerifyRoundtripImpl(ByteSpan buffer) {
271 Vector<std::byte, kCapacity> data;
272 Fill(data.data(), data.size());
273 VerifyRoundtripImpl(data, buffer);
274 }
275
TEST(MpscStreamTest,VerifyRoundtripWithoutBufferSmall)276 TEST(MpscStreamTest, VerifyRoundtripWithoutBufferSmall) {
277 FillAndVerifyRoundtripImpl<kBufSize / 2>(ByteSpan());
278 }
279
TEST(MpscStreamTest,VerifyRoundtripWithoutBufferLarge)280 TEST(MpscStreamTest, VerifyRoundtripWithoutBufferLarge) {
281 FillAndVerifyRoundtripImpl<kBufSize * 2>(ByteSpan());
282 }
283
VerifyRoundtripWithoutBuffer(const Vector<std::byte> & data)284 void VerifyRoundtripWithoutBuffer(const Vector<std::byte>& data) {
285 VerifyRoundtripImpl(data, ByteSpan());
286 }
287 FUZZ_TEST(MpscStreamTest, VerifyRoundtripWithoutBuffer)
288 .WithDomains(VectorOf<kBufSize * 2>(Arbitrary<std::byte>()).WithMinSize(1));
289
TEST(MpscStreamTest,VerifyRoundtripWithBufferSmall)290 TEST(MpscStreamTest, VerifyRoundtripWithBufferSmall) {
291 std::byte buffer[kBufSize];
292 FillAndVerifyRoundtripImpl<kBufSize / 2>(buffer);
293 }
294
TEST(MpscStreamTest,VerifyRoundtripWithBufferLarge)295 TEST(MpscStreamTest, VerifyRoundtripWithBufferLarge) {
296 std::byte buffer[kBufSize];
297 FillAndVerifyRoundtripImpl<kBufSize * 2>(buffer);
298 }
299
VerifyRoundtripWithBuffer(const Vector<std::byte> & data)300 void VerifyRoundtripWithBuffer(const Vector<std::byte>& data) {
301 std::byte buffer[kBufSize];
302 VerifyRoundtripImpl(data, buffer);
303 }
304 FUZZ_TEST(MpscStreamTest, VerifyRoundtripWithBuffer)
305 .WithDomains(VectorOf<kBufSize * 2>(Arbitrary<std::byte>()).WithMinSize(1));
306
TEST(MpscStreamTest,CanRetryAfterPartialWrite)307 TEST(MpscStreamTest, CanRetryAfterPartialWrite) {
308 constexpr size_t kChunk = kBufSize - 4;
309 MpscTestContext ctx;
310 ctx.Connect();
311 ctx.writer.SetTimeout(10ms);
312 ByteSpan destination = ctx.destination;
313
314 ctx.Spawn([](MpscTestContext* inner) {
315 inner->Fill();
316 inner->Write();
317 });
318 ctx.destination = destination.subspan(0, kChunk);
319 ctx.Read();
320 ctx.Join();
321 EXPECT_EQ(ctx.read_result.status(), OkStatus());
322 EXPECT_EQ(ctx.read_result->size(), kChunk);
323 EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
324 EXPECT_EQ(ctx.writer.last_write(), kChunk);
325
326 ctx.Spawn([](MpscTestContext* inner) {
327 inner->data = inner->data.subspan(kChunk);
328 inner->Write();
329 });
330 ctx.destination = destination.subspan(kChunk);
331 ctx.Read();
332 ctx.Join();
333 EXPECT_EQ(ctx.read_result.status(), OkStatus());
334 EXPECT_EQ(ctx.read_result->size(), 4U);
335 EXPECT_EQ(ctx.write_status, OkStatus());
336 EXPECT_EQ(ctx.writer.last_write(), 4U);
337
338 EXPECT_EQ(memcmp(ctx.write_buffer, ctx.read_buffer, kBufSize), 0);
339 }
340
TEST(MpscStreamTest,CannotReadAfterReaderClose)341 TEST(MpscStreamTest, CannotReadAfterReaderClose) {
342 MpscTestContext ctx;
343 ctx.Connect();
344 ctx.reader.Close();
345 ctx.Read();
346 EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
347 }
348
TEST(MpscStreamTest,CanReadAfterWriterCloses)349 TEST(MpscStreamTest, CanReadAfterWriterCloses) {
350 MpscTestContext ctx;
351 ctx.Connect();
352 std::byte buffer[kBufSize];
353 ctx.reader.SetBuffer(buffer);
354 ctx.Fill();
355 ctx.Write();
356 EXPECT_EQ(ctx.write_status, OkStatus());
357 ctx.writer.Close();
358
359 ctx.Read();
360 ASSERT_EQ(ctx.read_result.status(), OkStatus());
361 ASSERT_EQ(ctx.read_result->size(), kBufSize);
362 EXPECT_EQ(memcmp(ctx.write_buffer, ctx.read_buffer, kBufSize), 0);
363 }
364
TEST(MpscStreamTest,CannotWriteAfterWriterClose)365 TEST(MpscStreamTest, CannotWriteAfterWriterClose) {
366 MpscTestContext ctx;
367 ctx.Connect();
368 ctx.Fill();
369 ctx.writer.Close();
370 ctx.Write();
371 EXPECT_EQ(ctx.write_status, Status::OutOfRange());
372 }
373
TEST(MpscStreamTest,CannotWriteAfterReaderClose)374 TEST(MpscStreamTest, CannotWriteAfterReaderClose) {
375 MpscTestContext ctx;
376 ctx.Connect();
377 ctx.Fill();
378 ctx.reader.Close();
379 ctx.Write();
380 EXPECT_EQ(ctx.write_status, Status::OutOfRange());
381 }
382
TEST(MpscStreamTest,MultipleWriters)383 TEST(MpscStreamTest, MultipleWriters) {
384 MpscTestContext ctx1;
385 ctx1.Connect();
386 Vector<std::byte, kBufSize + 1> data1(kBufSize + 1, std::byte(1));
387 ctx1.data = ByteSpan(data1.data(), data1.size());
388
389 MpscTestContext ctx2;
390 ctx2.writer = ctx1.writer;
391 Vector<std::byte, kBufSize / 2> data2(kBufSize / 2, std::byte(2));
392 ctx2.data = ByteSpan(data2.data(), data2.size());
393
394 MpscTestContext ctx3;
395 ctx3.writer = ctx1.writer;
396 Vector<std::byte, kBufSize * 3> data3(kBufSize * 3, std::byte(3));
397 ctx3.data = ByteSpan(data3.data(), data3.size());
398
399 // Start all threads.
400 ctx1.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
401 ctx2.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
402 ctx3.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
403
404 // The loop below keeps track of how many contiguous values are read, in order
405 // to verify that writes are not split or interleaved.
406 size_t expected[4] = {0, data1.size(), data2.size(), data3.size()};
407 size_t actual[4] = {0};
408
409 size_t total_read = 0;
410 auto current = std::byte(0);
411 size_t num_current = 0;
412 while (total_read < data1.size() + data2.size() + data3.size()) {
413 ctx1.Read();
414 if (!ctx1.read_result.ok()) {
415 break;
416 }
417 size_t num_read = ctx1.read_result->size();
418 for (size_t i = 0; i < num_read; ++i) {
419 if (current == ctx1.read_buffer[i]) {
420 ++num_current;
421 continue;
422 }
423 actual[size_t(current)] = num_current;
424 current = ctx1.read_buffer[i];
425 num_current = 1;
426 }
427 actual[size_t(current)] = num_current;
428 total_read += num_read;
429 }
430 ctx1.reader.Close();
431 ctx1.Join();
432 ctx2.Join();
433 ctx3.Join();
434 ASSERT_EQ(ctx1.read_result.status(), OkStatus());
435 for (size_t i = 0; i < 4; ++i) {
436 EXPECT_EQ(actual[i], expected[i]);
437 }
438 }
439
TEST(MpscStreamTest,GetAndSetLimits)440 TEST(MpscStreamTest, GetAndSetLimits) {
441 MpscReader reader;
442 EXPECT_EQ(reader.ConservativeReadLimit(), 0U);
443
444 MpscWriter writer;
445 EXPECT_EQ(writer.ConservativeWriteLimit(), 0U);
446
447 CreateMpscStream(reader, writer);
448 EXPECT_EQ(reader.ConservativeReadLimit(), Stream::kUnlimited);
449 EXPECT_EQ(writer.ConservativeWriteLimit(), Stream::kUnlimited);
450
451 writer.SetLimit(10);
452 EXPECT_EQ(reader.ConservativeReadLimit(), 10U);
453 EXPECT_EQ(writer.ConservativeWriteLimit(), 10U);
454
455 writer.Close();
456 EXPECT_EQ(reader.ConservativeReadLimit(), 0U);
457 EXPECT_EQ(writer.ConservativeWriteLimit(), 0U);
458 }
459
TEST(MpscStreamTest,ReaderAggregatesLimit)460 TEST(MpscStreamTest, ReaderAggregatesLimit) {
461 MpscTestContext ctx;
462 ctx.Connect();
463 ctx.writer.SetLimit(10);
464
465 MpscWriter writer2 = ctx.writer;
466 writer2.SetLimit(20);
467
468 EXPECT_EQ(ctx.reader.ConservativeReadLimit(), 30U);
469
470 ctx.writer.SetLimit(Stream::kUnlimited);
471 EXPECT_EQ(ctx.reader.ConservativeReadLimit(), Stream::kUnlimited);
472
473 writer2.SetLimit(40);
474 EXPECT_EQ(ctx.reader.ConservativeReadLimit(), Stream::kUnlimited);
475
476 ctx.writer.SetLimit(0);
477 EXPECT_EQ(ctx.reader.ConservativeReadLimit(), 40U);
478 }
479
TEST(MpscStreamTest,ReadingUpdatesLimit)480 TEST(MpscStreamTest, ReadingUpdatesLimit) {
481 MpscTestContext ctx;
482 ctx.Connect();
483
484 constexpr size_t kChunk = kBufSize - 4;
485 std::byte buffer[kBufSize];
486 ctx.reader.SetBuffer(buffer);
487 ctx.Fill();
488 ctx.writer.SetLimit(kBufSize);
489 ctx.Write();
490 EXPECT_EQ(ctx.write_status, OkStatus());
491
492 ctx.destination = ByteSpan(ctx.read_buffer, kChunk);
493 ctx.Read();
494 EXPECT_EQ(ctx.read_result.status(), OkStatus());
495 EXPECT_EQ(ctx.read_result->size(), kChunk);
496 EXPECT_EQ(ctx.reader.ConservativeReadLimit(), kBufSize - kChunk);
497 }
498
TEST(MpscStreamTest,CannotWriteMoreThanLimit)499 TEST(MpscStreamTest, CannotWriteMoreThanLimit) {
500 MpscTestContext ctx;
501 ctx.Connect();
502
503 std::byte buffer[kBufSize];
504 ctx.reader.SetBuffer(buffer);
505 ctx.writer.SetLimit(kBufSize - 1);
506 ctx.Fill();
507 ctx.Write();
508 EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
509 }
510
TEST(MpscStreamTest,WritersCanCloseAutomatically)511 TEST(MpscStreamTest, WritersCanCloseAutomatically) {
512 MpscTestContext ctx1;
513 ctx1.Connect();
514 Vector<std::byte, kBufSize + 1> data1(kBufSize + 1, std::byte(1));
515 ctx1.writer.SetLimit(data1.size());
516 ctx1.data = ByteSpan(data1.data(), data1.size());
517
518 MpscTestContext ctx2;
519 ctx2.writer = ctx1.writer;
520 Vector<std::byte, kBufSize / 2> data2(kBufSize / 2, std::byte(2));
521 ctx2.writer.SetLimit(data2.size());
522 ctx2.data = ByteSpan(data2.data(), data2.size());
523
524 // Start all threads.
525 EXPECT_TRUE(ctx1.reader.connected());
526 EXPECT_TRUE(ctx1.writer.connected());
527 EXPECT_TRUE(ctx2.writer.connected());
528
529 ctx1.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
530 ctx2.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
531
532 size_t total = 0;
533 while (ctx1.reader.ConservativeReadLimit() != 0) {
534 ctx1.Read();
535 EXPECT_EQ(ctx1.read_result.status(), OkStatus());
536 if (!ctx1.read_result.ok()) {
537 ctx1.reader.Close();
538 break;
539 }
540 total += ctx1.read_result->size();
541 }
542 EXPECT_EQ(total, data1.size() + data2.size());
543 ctx1.Join();
544 ctx2.Join();
545 EXPECT_FALSE(ctx1.reader.connected());
546 EXPECT_FALSE(ctx1.writer.connected());
547 EXPECT_FALSE(ctx2.writer.connected());
548 }
549
TEST(MpscStreamTest,ReadAllWithoutBuffer)550 TEST(MpscStreamTest, ReadAllWithoutBuffer) {
551 MpscTestContext ctx;
552 Status status = ctx.reader.ReadAll([](ConstByteSpan) { return OkStatus(); });
553 EXPECT_EQ(status, Status::FailedPrecondition());
554 }
555
TEST(MpscStreamTest,ReadAll)556 TEST(MpscStreamTest, ReadAll) {
557 MpscTestContext ctx;
558 ctx.Connect();
559
560 std::byte buffer[kBufSize];
561 ctx.reader.SetBuffer(buffer);
562 ctx.writer.SetLimit(kBufSize * 100);
563 ctx.Spawn([](MpscTestContext* inner) { inner->WriteAll(); });
564
565 Status status = ctx.reader.ReadAll([&ctx](ConstByteSpan data) {
566 ctx.total_read += data.size();
567 fnv1a(data, ctx.read_hash);
568 return OkStatus();
569 });
570 ctx.Join();
571
572 EXPECT_EQ(status, OkStatus());
573 EXPECT_FALSE(ctx.reader.connected());
574 EXPECT_EQ(ctx.total_read, kBufSize * 100);
575 EXPECT_EQ(ctx.read_hash, ctx.write_hash);
576 }
577
TEST(MpscStreamTest,BufferedMpscReader)578 TEST(MpscStreamTest, BufferedMpscReader) {
579 BufferedMpscReader<kBufSize> reader;
580 MpscWriter writer;
581 CreateMpscStream(reader, writer);
582
583 // `kBufSize` writes of 1 byte each should fit without blocking.
584 for (size_t i = 0; i < kBufSize; ++i) {
585 std::byte b{static_cast<uint8_t>(i)};
586 EXPECT_EQ(writer.Write(ConstByteSpan(&b, 1)), OkStatus());
587 }
588
589 std::byte rx_buffer[kBufSize];
590 auto result = reader.Read(ByteSpan(rx_buffer));
591 ASSERT_EQ(result.status(), OkStatus());
592 ASSERT_EQ(result->size(), kBufSize);
593 for (size_t i = 0; i < kBufSize; ++i) {
594 EXPECT_EQ(rx_buffer[i], std::byte(i));
595 }
596 }
597
598 } // namespace
599 } // namespace pw::stream
600
601 #endif // PW_THREAD_JOINING_ENABLED
602