xref: /aosp_15_r20/system/chre/util/tests/atomic_spsc_queue_test.cc (revision 84e339476a462649f82315436d70fd732297a399)
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "chre/util/system/atomic_spsc_queue.h"
18 #include "chre/util/array_queue.h"
19 #include "gtest/gtest.h"
20 
21 #include <condition_variable>
22 #include <mutex>
23 #include <thread>
24 
25 using chre::ArrayQueue;
26 using chre::AtomicSpscQueue;
27 using chre::FixedSizeVector;
28 
29 namespace {
30 
31 constexpr int kMaxTestCapacity = 10;
32 int destructor_count[kMaxTestCapacity];
33 int constructor_count;
34 int total_destructor_count;
35 
36 class FakeElement {
37  public:
FakeElement()38   FakeElement() {
39     constructor_count++;
40   }
41 
FakeElement(int i)42   FakeElement(int i) {
43     val_ = i;
44     constructor_count++;
45   }
46 
~FakeElement()47   ~FakeElement() {
48     total_destructor_count++;
49     if (val_ >= 0 && val_ < kMaxTestCapacity) {
50       destructor_count[val_]++;
51     }
52   }
53 
setValue(int i)54   void setValue(int i) {
55     val_ = i;
56   }
57 
58  private:
59   int val_ = kMaxTestCapacity - 1;
60 };
61 
62 }  // namespace
63 
TEST(AtomicSpscQueueTest,IsEmptyInitially)64 TEST(AtomicSpscQueueTest, IsEmptyInitially) {
65   AtomicSpscQueue<int, 4> q;
66   EXPECT_EQ(4, q.capacity());
67   EXPECT_TRUE(q.consumer().empty());
68   EXPECT_EQ(0, q.consumer().size());
69   EXPECT_EQ(0, q.producer().size());
70   EXPECT_EQ(0, q.size());
71 }
72 
TEST(AtomicSpscQueueTest,SimplePushPop)73 TEST(AtomicSpscQueueTest, SimplePushPop) {
74   AtomicSpscQueue<int, 3> q;
75   q.producer().push(1);
76   q.producer().push(2);
77   EXPECT_EQ(q.consumer().front(), 1);
78   EXPECT_FALSE(q.producer().full());
79   q.consumer().pop();
80   q.producer().push(3);
81   EXPECT_EQ(q.consumer().front(), 2);
82   q.consumer().pop();
83   EXPECT_EQ(q.consumer().front(), 3);
84 }
85 
TEST(AtomicSpscQueueTest,TestSize)86 TEST(AtomicSpscQueueTest, TestSize) {
87   AtomicSpscQueue<int, 2> q;
88   EXPECT_EQ(0, q.size());
89   q.producer().push(1);
90   EXPECT_EQ(1, q.size());
91   q.producer().push(2);
92   EXPECT_EQ(2, q.size());
93   q.consumer().pop();
94   EXPECT_EQ(1, q.size());
95   q.consumer().pop();
96   EXPECT_EQ(0, q.size());
97 }
98 
TEST(AtomicSpscQueueTest,TestFront)99 TEST(AtomicSpscQueueTest, TestFront) {
100   AtomicSpscQueue<int, 3> q;
101   q.producer().emplace(1);
102   EXPECT_EQ(1, q.consumer().front());
103   q.consumer().pop();
104   q.producer().emplace(2);
105   EXPECT_EQ(2, q.consumer().front());
106   q.producer().emplace(3);
107   EXPECT_EQ(2, q.consumer().front());
108 }
109 
TEST(AtomicSpscQueueTest,DestructorCalledOnPop)110 TEST(AtomicSpscQueueTest, DestructorCalledOnPop) {
111   for (size_t i = 0; i < kMaxTestCapacity; ++i) {
112     destructor_count[i] = 0;
113   }
114 
115   AtomicSpscQueue<FakeElement, 3> q;
116   FakeElement e;
117   q.producer().push(e);
118   q.producer().push(e);
119 
120   q.consumer().front().setValue(0);
121   q.consumer().pop();
122   EXPECT_EQ(1, destructor_count[0]);
123 
124   q.consumer().front().setValue(1);
125   q.consumer().pop();
126   EXPECT_EQ(1, destructor_count[1]);
127 }
128 
TEST(AtomicSpscQueueTest,ElementsDestructedWhenQueueDestructed)129 TEST(AtomicSpscQueueTest, ElementsDestructedWhenQueueDestructed) {
130   for (size_t i = 0; i < kMaxTestCapacity; ++i) {
131     destructor_count[i] = 0;
132   }
133 
134   {
135     AtomicSpscQueue<FakeElement, 4> q;
136 
137     for (size_t i = 0; i < 3; ++i) {
138       q.producer().emplace(i);
139     }
140   }
141 
142   for (size_t i = 0; i < 3; ++i) {
143     EXPECT_EQ(1, destructor_count[i]);
144   }
145 
146   EXPECT_EQ(0, destructor_count[3]);
147 }
148 
TEST(AtomicSpscQueueTest,ExtractFull)149 TEST(AtomicSpscQueueTest, ExtractFull) {
150   constexpr size_t kSize = 16;
151   AtomicSpscQueue<int32_t, kSize> q;
152 
153   for (int32_t i = 0; i < kSize; i++) {
154     q.producer().push(i);
155   }
156 
157   int32_t dest[kSize + 1];
158   memset(dest, 0, sizeof(dest));
159   dest[kSize] = 0xdeadbeef;
160   size_t extracted = q.consumer().extract(dest, kSize);
161   EXPECT_EQ(extracted, kSize);
162   for (int32_t i = 0; i < kSize; i++) {
163     EXPECT_EQ(dest[i], i);
164   }
165   EXPECT_EQ(0xdeadbeef, dest[kSize]);
166 }
167 
TEST(AtomicSpscQueueTest,ExtractPartial)168 TEST(AtomicSpscQueueTest, ExtractPartial) {
169   constexpr size_t kSize = 16;
170   AtomicSpscQueue<int32_t, kSize> q;
171 
172   for (int32_t i = 0; i < kSize / 2; i++) {
173     q.producer().push(i);
174   }
175 
176   int32_t dest[kSize + 1];
177   memset(dest, 0, sizeof(dest));
178   size_t extracted = q.consumer().extract(dest, kSize / 4);
179   EXPECT_EQ(extracted, kSize / 4);
180   for (int32_t i = 0; i < kSize / 4; i++) {
181     EXPECT_EQ(dest[i], i);
182   }
183   EXPECT_EQ(0, dest[kSize / 4]);
184   EXPECT_EQ(kSize / 4, q.size());
185 
186   extracted = q.consumer().extract(&dest[kSize / 4], kSize / 4);
187   EXPECT_EQ(extracted, kSize / 4);
188   for (int32_t i = kSize / 4; i < kSize / 2; i++) {
189     EXPECT_EQ(dest[i], i);
190   }
191   EXPECT_EQ(0, dest[kSize]);
192   EXPECT_TRUE(q.consumer().empty());
193 
194   q.producer().push(0xd00d);
195   EXPECT_EQ(0xd00d, q.consumer().front());
196   q.consumer().pop();
197   EXPECT_TRUE(q.consumer().empty());
198 }
199 
TEST(AtomicSpscQueueTest,ExtractWraparound)200 TEST(AtomicSpscQueueTest, ExtractWraparound) {
201   constexpr size_t kSize = 16;
202   AtomicSpscQueue<int32_t, kSize> q;
203   auto p = q.producer();
204   auto c = q.consumer();
205 
206   for (int32_t i = 0; i < kSize; i++) {
207     p.push(i);
208   }
209 
210   for (int32_t i = kSize; i < kSize + kSize / 2; i++) {
211     c.pop();
212     p.push(i);
213   }
214 
215   // Now two copies will be needed to extract the data
216   int32_t dest[kSize + 1];
217   memset(dest, 0, sizeof(dest));
218   dest[kSize] = 0xdeadbeef;
219 
220   // Pull all except 1
221   size_t extracted = c.extract(dest, kSize - 1);
222   EXPECT_EQ(extracted, kSize - 1);
223 
224   // And now the last one (asking for more than we expect to get)
225   EXPECT_EQ(1, q.size());
226   extracted = c.extract(&dest[kSize - 1], 2);
227   EXPECT_EQ(extracted, 1);
228 
229   for (int32_t i = 0; i < kSize; i++) {
230     EXPECT_EQ(dest[i], i + kSize / 2);
231   }
232   EXPECT_EQ(0xdeadbeef, dest[kSize]);
233 }
234 
TEST(AtomicSpscQueueTest,PopWraparound)235 TEST(AtomicSpscQueueTest, PopWraparound) {
236   constexpr size_t kSize = 16;
237   AtomicSpscQueue<int32_t, kSize> q;
238   auto p = q.producer();
239   auto c = q.consumer();
240 
241   for (int32_t i = 0; i < kSize; i++) {
242     p.push(i);
243   }
244 
245   for (int32_t i = kSize; i < kSize + kSize / 2; i++) {
246     EXPECT_EQ(c.front(), i - kSize);
247     c.pop();
248     p.push(i);
249   }
250 
251   for (int32_t i = kSize / 2; i < kSize + kSize / 2; i++) {
252     EXPECT_EQ(c.front(), i);
253     c.pop();
254   }
255 }
256 
TEST(AtomicSpscQueueTest,ExtractVector)257 TEST(AtomicSpscQueueTest, ExtractVector) {
258   constexpr size_t kSize = 8;
259   AtomicSpscQueue<int, kSize> q;
260 
261   auto p = q.producer();
262   for (int i = 0; i < kSize; i++) {
263     p.push(i);
264   }
265 
266   auto c = q.consumer();
267   constexpr size_t kExtraSpace = 2;
268   static_assert(kSize > kExtraSpace + 2, "Test assumption broken");
269   FixedSizeVector<int, kSize + kExtraSpace> v;
270 
271   // Output size dependent on elements available in queue
272   size_t extracted = c.extract(&v);
273   EXPECT_EQ(extracted, kSize);
274   EXPECT_EQ(kSize, v.size());
275   for (int i = 0; i < kSize; i++) {
276     EXPECT_EQ(v[i], i);
277   }
278 
279   for (int i = kSize; i < kSize + kExtraSpace; i++) {
280     p.push(i);
281   }
282   p.push(1337);
283   p.push(42);
284 
285   // Output size dependent on space available in vector
286   extracted = c.extract(&v);
287   EXPECT_EQ(extracted, kExtraSpace);
288   EXPECT_EQ(v.capacity(), v.size());
289   for (int i = 0; i < kSize + kExtraSpace; i++) {
290     EXPECT_EQ(v[i], i);
291   }
292   EXPECT_EQ(2, q.size());
293 
294   // Output size 0 (no space left in vector)
295   extracted = c.extract(&v);
296   EXPECT_EQ(0, extracted);
297   EXPECT_EQ(2, q.size());
298 
299   // Extract into reset vector
300   v.resize(0);
301   extracted = c.extract(&v);
302   EXPECT_EQ(2, extracted);
303   EXPECT_EQ(2, v.size());
304   EXPECT_EQ(v[0], 1337);
305   EXPECT_EQ(v[1], 42);
306 
307   // Output size 0 (no elements left in queue)
308   EXPECT_TRUE(q.consumer().empty());
309   extracted = c.extract(&v);
310   EXPECT_EQ(0, extracted);
311 }
312 
313 // If this test fails it's likely due to thread interleaving, so consider
314 // increasing kMaxCount (e.g. by a factor of 100 or more) and/or run the test in
315 // parallel on multiple processes to increase the likelihood of repro.
TEST(AtomicSpscQueueStressTest,ConcurrencyStress)316 TEST(AtomicSpscQueueStressTest, ConcurrencyStress) {
317   constexpr size_t kCapacity = 2048;
318   constexpr int64_t kMaxCount = 100 * kCapacity;
319   AtomicSpscQueue<int64_t, kCapacity> q;
320 
321   auto producer = q.producer();
322   std::thread producerThread = std::thread(
323       [](decltype(producer) p) {
324         int64_t count = 0;
325         while (count <= kMaxCount) {
326           if (p.full()) {
327             // Give the other thread a chance to be scheduled
328             std::this_thread::yield();
329             continue;
330           }
331 
332           p.push(count++);
333         }
334       },
335       producer);
336 
337   auto consumer = q.consumer();
338   std::thread consumerThread = std::thread(
339       [](decltype(consumer) c) {
340         int64_t last = -1;
341         do {
342           if (c.empty()) {
343             std::this_thread::yield();
344             continue;
345           }
346           int64_t next = c.front();
347           if (last != -1) {
348             EXPECT_EQ(last + 1, next);
349           }
350           last = next;
351           c.pop();
352         } while (last < kMaxCount);
353       },
354       consumer);
355 
356   producerThread.join();
357   consumerThread.join();
358 
359   EXPECT_EQ(0, q.size());
360 }
361 
362 // Helpers for SynchronizedConcurrencyStress
363 enum class Op {
364   kPush = 0,
365   kPull = 1,
366 };
367 struct HistoryEntry {
368   Op op;
369   int numElements;
370   int64_t last;
371 
372   HistoryEntry() = default;
HistoryEntryHistoryEntry373   HistoryEntry(Op op_, int numElements_, int64_t last_)
374       : op(op_), numElements(numElements_), last(last_) {}
375 };
376 
377 constexpr size_t kHistorySize = 512;
378 
379 namespace chre {  // (PrintTo needs to be in the same namespace as ArrayQueue)
380 
PrintTo(const ArrayQueue<HistoryEntry,kHistorySize> & history,std::ostream * os)381 void PrintTo(const ArrayQueue<HistoryEntry, kHistorySize> &history,
382              std::ostream *os) {
383   *os << "Dumping history from oldest to newest:" << std::endl;
384   for (const HistoryEntry &entry : history) {
385     *os << "  " << ((entry.op == Op::kPush) ? "push " : "pull ") << std::setw(3)
386         << entry.numElements << " elements, last " << entry.last << std::endl;
387   }
388 }
389 
390 }  // namespace chre
391 
392 // If this test fails it's likely due to thread interleaving, so consider
393 // increasing kMaxCount (e.g. by a factor of 100 or more) and/or run the test in
394 // parallel on multiple processes to increase the likelihood of repro.
TEST(AtomicSpscQueueStressTest,SynchronizedConcurrencyStress)395 TEST(AtomicSpscQueueStressTest, SynchronizedConcurrencyStress) {
396   constexpr size_t kCapacity = 512;
397   constexpr int64_t kMaxCount = 2000 * kCapacity;
398   AtomicSpscQueue<int64_t, kCapacity> q;
399 
400   std::mutex m;
401   std::condition_variable cv;
402 
403   // Guarded by mutex m
404   ArrayQueue<HistoryEntry, kHistorySize> history;
405   int64_t totalOps = 0;
406 
407   auto lfsr = []() {
408     // 9-bit LFSR with feedback polynomial x^9 + x^5 + 1 gives us a
409     // pseudo-random sequence over all 511 possible values
410     static uint16_t lfsr = 1;
411     uint16_t nextBit = ((lfsr << 8) ^ (lfsr << 4)) & 0x100;
412     lfsr = nextBit | (lfsr >> 1);
413 
414     return lfsr;
415   };
416   bool pending = false;
417 
418   auto p = q.producer();
419   std::thread producerThread = std::thread([&]() {
420     int64_t count = 0;
421     while (count <= kMaxCount) {
422       // Push in a pseudo-random number of elements into the queue, then notify
423       // the consumer; yield if we can't push it all at once
424       uint16_t pushCount = lfsr();
425       while (p.capacity() - p.size() < pushCount) {
426         std::this_thread::yield();
427       }
428 
429       for (int i = 0; i < pushCount; i++) {
430         p.push(count++);
431         if (count > kMaxCount) {
432           break;
433         }
434       }
435 
436       m.lock();
437       history.kick_push(HistoryEntry(Op::kPush, pushCount, count - 1));
438       totalOps++;
439       pending = true;
440       m.unlock();
441       cv.notify_one();
442     }
443   });
444 
445   auto c = q.consumer();
446   std::thread consumerThread = std::thread([&]() {
447     int64_t last = -1;
448     size_t extracted = 0;
449     FixedSizeVector<int64_t, kCapacity> myBuf;
450     while (last < kMaxCount) {
451       {
452         std::unique_lock<std::mutex> lock(m);
453         if (last != -1) {
454           history.kick_push(HistoryEntry(Op::kPull, extracted, last));
455           totalOps++;
456         }
457         while (c.empty() && !pending) {
458           cv.wait(lock);
459           if (pending) {
460             pending = false;
461             break;
462           }
463         }
464       }
465 
466       extracted = c.extract(&myBuf);
467       EXPECT_LE(extracted, kCapacity);
468       for (int i = 0; i < extracted; i++) {
469         int64_t next = myBuf[i];
470         if (last != -1 && last + 1 != next) {
471           std::lock_guard<std::mutex> lock(m);
472           EXPECT_EQ(last + 1, next)
473               << "After pulling " << extracted << " elements, value at offset "
474               << i << " is incorrect: expected " << (last + 1) << " but got "
475               << next << "." << std::endl
476               << testing::PrintToString(history)
477               // totalOps + 1 because this call to extract() isn't counted yet
478               << " Total operations since start: " << (totalOps + 1)
479               << std::endl
480               << "Note: most recent push may not be included in the history, "
481               << "most recent pull is definitely not included (but indicated "
482               << "in the first sentence above)." << std::endl;
483           // The history is unlikely to have the most recent push operation
484           // because the consumer thread runs freely until it tries to acquire
485           // the mutex to add to the history. In other words, it may have pushed
486           // any time between after we unblock from wait() and reach here, but
487           // hasn't added it to the history yet.
488         }
489         last = next;
490       }
491       myBuf.resize(0);
492     }
493   });
494 
495   producerThread.join();
496   consumerThread.join();
497 
498   EXPECT_EQ(0, q.size());
499 }
500