1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "chre/util/system/atomic_spsc_queue.h"
18 #include "chre/util/array_queue.h"
19 #include "gtest/gtest.h"
20
21 #include <condition_variable>
22 #include <mutex>
23 #include <thread>
24
25 using chre::ArrayQueue;
26 using chre::AtomicSpscQueue;
27 using chre::FixedSizeVector;
28
29 namespace {
30
31 constexpr int kMaxTestCapacity = 10;
32 int destructor_count[kMaxTestCapacity];
33 int constructor_count;
34 int total_destructor_count;
35
36 class FakeElement {
37 public:
FakeElement()38 FakeElement() {
39 constructor_count++;
40 }
41
FakeElement(int i)42 FakeElement(int i) {
43 val_ = i;
44 constructor_count++;
45 }
46
~FakeElement()47 ~FakeElement() {
48 total_destructor_count++;
49 if (val_ >= 0 && val_ < kMaxTestCapacity) {
50 destructor_count[val_]++;
51 }
52 }
53
setValue(int i)54 void setValue(int i) {
55 val_ = i;
56 }
57
58 private:
59 int val_ = kMaxTestCapacity - 1;
60 };
61
62 } // namespace
63
TEST(AtomicSpscQueueTest,IsEmptyInitially)64 TEST(AtomicSpscQueueTest, IsEmptyInitially) {
65 AtomicSpscQueue<int, 4> q;
66 EXPECT_EQ(4, q.capacity());
67 EXPECT_TRUE(q.consumer().empty());
68 EXPECT_EQ(0, q.consumer().size());
69 EXPECT_EQ(0, q.producer().size());
70 EXPECT_EQ(0, q.size());
71 }
72
TEST(AtomicSpscQueueTest,SimplePushPop)73 TEST(AtomicSpscQueueTest, SimplePushPop) {
74 AtomicSpscQueue<int, 3> q;
75 q.producer().push(1);
76 q.producer().push(2);
77 EXPECT_EQ(q.consumer().front(), 1);
78 EXPECT_FALSE(q.producer().full());
79 q.consumer().pop();
80 q.producer().push(3);
81 EXPECT_EQ(q.consumer().front(), 2);
82 q.consumer().pop();
83 EXPECT_EQ(q.consumer().front(), 3);
84 }
85
TEST(AtomicSpscQueueTest,TestSize)86 TEST(AtomicSpscQueueTest, TestSize) {
87 AtomicSpscQueue<int, 2> q;
88 EXPECT_EQ(0, q.size());
89 q.producer().push(1);
90 EXPECT_EQ(1, q.size());
91 q.producer().push(2);
92 EXPECT_EQ(2, q.size());
93 q.consumer().pop();
94 EXPECT_EQ(1, q.size());
95 q.consumer().pop();
96 EXPECT_EQ(0, q.size());
97 }
98
TEST(AtomicSpscQueueTest,TestFront)99 TEST(AtomicSpscQueueTest, TestFront) {
100 AtomicSpscQueue<int, 3> q;
101 q.producer().emplace(1);
102 EXPECT_EQ(1, q.consumer().front());
103 q.consumer().pop();
104 q.producer().emplace(2);
105 EXPECT_EQ(2, q.consumer().front());
106 q.producer().emplace(3);
107 EXPECT_EQ(2, q.consumer().front());
108 }
109
TEST(AtomicSpscQueueTest,DestructorCalledOnPop)110 TEST(AtomicSpscQueueTest, DestructorCalledOnPop) {
111 for (size_t i = 0; i < kMaxTestCapacity; ++i) {
112 destructor_count[i] = 0;
113 }
114
115 AtomicSpscQueue<FakeElement, 3> q;
116 FakeElement e;
117 q.producer().push(e);
118 q.producer().push(e);
119
120 q.consumer().front().setValue(0);
121 q.consumer().pop();
122 EXPECT_EQ(1, destructor_count[0]);
123
124 q.consumer().front().setValue(1);
125 q.consumer().pop();
126 EXPECT_EQ(1, destructor_count[1]);
127 }
128
TEST(AtomicSpscQueueTest,ElementsDestructedWhenQueueDestructed)129 TEST(AtomicSpscQueueTest, ElementsDestructedWhenQueueDestructed) {
130 for (size_t i = 0; i < kMaxTestCapacity; ++i) {
131 destructor_count[i] = 0;
132 }
133
134 {
135 AtomicSpscQueue<FakeElement, 4> q;
136
137 for (size_t i = 0; i < 3; ++i) {
138 q.producer().emplace(i);
139 }
140 }
141
142 for (size_t i = 0; i < 3; ++i) {
143 EXPECT_EQ(1, destructor_count[i]);
144 }
145
146 EXPECT_EQ(0, destructor_count[3]);
147 }
148
TEST(AtomicSpscQueueTest,ExtractFull)149 TEST(AtomicSpscQueueTest, ExtractFull) {
150 constexpr size_t kSize = 16;
151 AtomicSpscQueue<int32_t, kSize> q;
152
153 for (int32_t i = 0; i < kSize; i++) {
154 q.producer().push(i);
155 }
156
157 int32_t dest[kSize + 1];
158 memset(dest, 0, sizeof(dest));
159 dest[kSize] = 0xdeadbeef;
160 size_t extracted = q.consumer().extract(dest, kSize);
161 EXPECT_EQ(extracted, kSize);
162 for (int32_t i = 0; i < kSize; i++) {
163 EXPECT_EQ(dest[i], i);
164 }
165 EXPECT_EQ(0xdeadbeef, dest[kSize]);
166 }
167
TEST(AtomicSpscQueueTest,ExtractPartial)168 TEST(AtomicSpscQueueTest, ExtractPartial) {
169 constexpr size_t kSize = 16;
170 AtomicSpscQueue<int32_t, kSize> q;
171
172 for (int32_t i = 0; i < kSize / 2; i++) {
173 q.producer().push(i);
174 }
175
176 int32_t dest[kSize + 1];
177 memset(dest, 0, sizeof(dest));
178 size_t extracted = q.consumer().extract(dest, kSize / 4);
179 EXPECT_EQ(extracted, kSize / 4);
180 for (int32_t i = 0; i < kSize / 4; i++) {
181 EXPECT_EQ(dest[i], i);
182 }
183 EXPECT_EQ(0, dest[kSize / 4]);
184 EXPECT_EQ(kSize / 4, q.size());
185
186 extracted = q.consumer().extract(&dest[kSize / 4], kSize / 4);
187 EXPECT_EQ(extracted, kSize / 4);
188 for (int32_t i = kSize / 4; i < kSize / 2; i++) {
189 EXPECT_EQ(dest[i], i);
190 }
191 EXPECT_EQ(0, dest[kSize]);
192 EXPECT_TRUE(q.consumer().empty());
193
194 q.producer().push(0xd00d);
195 EXPECT_EQ(0xd00d, q.consumer().front());
196 q.consumer().pop();
197 EXPECT_TRUE(q.consumer().empty());
198 }
199
TEST(AtomicSpscQueueTest,ExtractWraparound)200 TEST(AtomicSpscQueueTest, ExtractWraparound) {
201 constexpr size_t kSize = 16;
202 AtomicSpscQueue<int32_t, kSize> q;
203 auto p = q.producer();
204 auto c = q.consumer();
205
206 for (int32_t i = 0; i < kSize; i++) {
207 p.push(i);
208 }
209
210 for (int32_t i = kSize; i < kSize + kSize / 2; i++) {
211 c.pop();
212 p.push(i);
213 }
214
215 // Now two copies will be needed to extract the data
216 int32_t dest[kSize + 1];
217 memset(dest, 0, sizeof(dest));
218 dest[kSize] = 0xdeadbeef;
219
220 // Pull all except 1
221 size_t extracted = c.extract(dest, kSize - 1);
222 EXPECT_EQ(extracted, kSize - 1);
223
224 // And now the last one (asking for more than we expect to get)
225 EXPECT_EQ(1, q.size());
226 extracted = c.extract(&dest[kSize - 1], 2);
227 EXPECT_EQ(extracted, 1);
228
229 for (int32_t i = 0; i < kSize; i++) {
230 EXPECT_EQ(dest[i], i + kSize / 2);
231 }
232 EXPECT_EQ(0xdeadbeef, dest[kSize]);
233 }
234
TEST(AtomicSpscQueueTest,PopWraparound)235 TEST(AtomicSpscQueueTest, PopWraparound) {
236 constexpr size_t kSize = 16;
237 AtomicSpscQueue<int32_t, kSize> q;
238 auto p = q.producer();
239 auto c = q.consumer();
240
241 for (int32_t i = 0; i < kSize; i++) {
242 p.push(i);
243 }
244
245 for (int32_t i = kSize; i < kSize + kSize / 2; i++) {
246 EXPECT_EQ(c.front(), i - kSize);
247 c.pop();
248 p.push(i);
249 }
250
251 for (int32_t i = kSize / 2; i < kSize + kSize / 2; i++) {
252 EXPECT_EQ(c.front(), i);
253 c.pop();
254 }
255 }
256
TEST(AtomicSpscQueueTest,ExtractVector)257 TEST(AtomicSpscQueueTest, ExtractVector) {
258 constexpr size_t kSize = 8;
259 AtomicSpscQueue<int, kSize> q;
260
261 auto p = q.producer();
262 for (int i = 0; i < kSize; i++) {
263 p.push(i);
264 }
265
266 auto c = q.consumer();
267 constexpr size_t kExtraSpace = 2;
268 static_assert(kSize > kExtraSpace + 2, "Test assumption broken");
269 FixedSizeVector<int, kSize + kExtraSpace> v;
270
271 // Output size dependent on elements available in queue
272 size_t extracted = c.extract(&v);
273 EXPECT_EQ(extracted, kSize);
274 EXPECT_EQ(kSize, v.size());
275 for (int i = 0; i < kSize; i++) {
276 EXPECT_EQ(v[i], i);
277 }
278
279 for (int i = kSize; i < kSize + kExtraSpace; i++) {
280 p.push(i);
281 }
282 p.push(1337);
283 p.push(42);
284
285 // Output size dependent on space available in vector
286 extracted = c.extract(&v);
287 EXPECT_EQ(extracted, kExtraSpace);
288 EXPECT_EQ(v.capacity(), v.size());
289 for (int i = 0; i < kSize + kExtraSpace; i++) {
290 EXPECT_EQ(v[i], i);
291 }
292 EXPECT_EQ(2, q.size());
293
294 // Output size 0 (no space left in vector)
295 extracted = c.extract(&v);
296 EXPECT_EQ(0, extracted);
297 EXPECT_EQ(2, q.size());
298
299 // Extract into reset vector
300 v.resize(0);
301 extracted = c.extract(&v);
302 EXPECT_EQ(2, extracted);
303 EXPECT_EQ(2, v.size());
304 EXPECT_EQ(v[0], 1337);
305 EXPECT_EQ(v[1], 42);
306
307 // Output size 0 (no elements left in queue)
308 EXPECT_TRUE(q.consumer().empty());
309 extracted = c.extract(&v);
310 EXPECT_EQ(0, extracted);
311 }
312
313 // If this test fails it's likely due to thread interleaving, so consider
314 // increasing kMaxCount (e.g. by a factor of 100 or more) and/or run the test in
315 // parallel on multiple processes to increase the likelihood of repro.
TEST(AtomicSpscQueueStressTest,ConcurrencyStress)316 TEST(AtomicSpscQueueStressTest, ConcurrencyStress) {
317 constexpr size_t kCapacity = 2048;
318 constexpr int64_t kMaxCount = 100 * kCapacity;
319 AtomicSpscQueue<int64_t, kCapacity> q;
320
321 auto producer = q.producer();
322 std::thread producerThread = std::thread(
323 [](decltype(producer) p) {
324 int64_t count = 0;
325 while (count <= kMaxCount) {
326 if (p.full()) {
327 // Give the other thread a chance to be scheduled
328 std::this_thread::yield();
329 continue;
330 }
331
332 p.push(count++);
333 }
334 },
335 producer);
336
337 auto consumer = q.consumer();
338 std::thread consumerThread = std::thread(
339 [](decltype(consumer) c) {
340 int64_t last = -1;
341 do {
342 if (c.empty()) {
343 std::this_thread::yield();
344 continue;
345 }
346 int64_t next = c.front();
347 if (last != -1) {
348 EXPECT_EQ(last + 1, next);
349 }
350 last = next;
351 c.pop();
352 } while (last < kMaxCount);
353 },
354 consumer);
355
356 producerThread.join();
357 consumerThread.join();
358
359 EXPECT_EQ(0, q.size());
360 }
361
362 // Helpers for SynchronizedConcurrencyStress
363 enum class Op {
364 kPush = 0,
365 kPull = 1,
366 };
367 struct HistoryEntry {
368 Op op;
369 int numElements;
370 int64_t last;
371
372 HistoryEntry() = default;
HistoryEntryHistoryEntry373 HistoryEntry(Op op_, int numElements_, int64_t last_)
374 : op(op_), numElements(numElements_), last(last_) {}
375 };
376
377 constexpr size_t kHistorySize = 512;
378
379 namespace chre { // (PrintTo needs to be in the same namespace as ArrayQueue)
380
PrintTo(const ArrayQueue<HistoryEntry,kHistorySize> & history,std::ostream * os)381 void PrintTo(const ArrayQueue<HistoryEntry, kHistorySize> &history,
382 std::ostream *os) {
383 *os << "Dumping history from oldest to newest:" << std::endl;
384 for (const HistoryEntry &entry : history) {
385 *os << " " << ((entry.op == Op::kPush) ? "push " : "pull ") << std::setw(3)
386 << entry.numElements << " elements, last " << entry.last << std::endl;
387 }
388 }
389
390 } // namespace chre
391
392 // If this test fails it's likely due to thread interleaving, so consider
393 // increasing kMaxCount (e.g. by a factor of 100 or more) and/or run the test in
394 // parallel on multiple processes to increase the likelihood of repro.
TEST(AtomicSpscQueueStressTest,SynchronizedConcurrencyStress)395 TEST(AtomicSpscQueueStressTest, SynchronizedConcurrencyStress) {
396 constexpr size_t kCapacity = 512;
397 constexpr int64_t kMaxCount = 2000 * kCapacity;
398 AtomicSpscQueue<int64_t, kCapacity> q;
399
400 std::mutex m;
401 std::condition_variable cv;
402
403 // Guarded by mutex m
404 ArrayQueue<HistoryEntry, kHistorySize> history;
405 int64_t totalOps = 0;
406
407 auto lfsr = []() {
408 // 9-bit LFSR with feedback polynomial x^9 + x^5 + 1 gives us a
409 // pseudo-random sequence over all 511 possible values
410 static uint16_t lfsr = 1;
411 uint16_t nextBit = ((lfsr << 8) ^ (lfsr << 4)) & 0x100;
412 lfsr = nextBit | (lfsr >> 1);
413
414 return lfsr;
415 };
416 bool pending = false;
417
418 auto p = q.producer();
419 std::thread producerThread = std::thread([&]() {
420 int64_t count = 0;
421 while (count <= kMaxCount) {
422 // Push in a pseudo-random number of elements into the queue, then notify
423 // the consumer; yield if we can't push it all at once
424 uint16_t pushCount = lfsr();
425 while (p.capacity() - p.size() < pushCount) {
426 std::this_thread::yield();
427 }
428
429 for (int i = 0; i < pushCount; i++) {
430 p.push(count++);
431 if (count > kMaxCount) {
432 break;
433 }
434 }
435
436 m.lock();
437 history.kick_push(HistoryEntry(Op::kPush, pushCount, count - 1));
438 totalOps++;
439 pending = true;
440 m.unlock();
441 cv.notify_one();
442 }
443 });
444
445 auto c = q.consumer();
446 std::thread consumerThread = std::thread([&]() {
447 int64_t last = -1;
448 size_t extracted = 0;
449 FixedSizeVector<int64_t, kCapacity> myBuf;
450 while (last < kMaxCount) {
451 {
452 std::unique_lock<std::mutex> lock(m);
453 if (last != -1) {
454 history.kick_push(HistoryEntry(Op::kPull, extracted, last));
455 totalOps++;
456 }
457 while (c.empty() && !pending) {
458 cv.wait(lock);
459 if (pending) {
460 pending = false;
461 break;
462 }
463 }
464 }
465
466 extracted = c.extract(&myBuf);
467 EXPECT_LE(extracted, kCapacity);
468 for (int i = 0; i < extracted; i++) {
469 int64_t next = myBuf[i];
470 if (last != -1 && last + 1 != next) {
471 std::lock_guard<std::mutex> lock(m);
472 EXPECT_EQ(last + 1, next)
473 << "After pulling " << extracted << " elements, value at offset "
474 << i << " is incorrect: expected " << (last + 1) << " but got "
475 << next << "." << std::endl
476 << testing::PrintToString(history)
477 // totalOps + 1 because this call to extract() isn't counted yet
478 << " Total operations since start: " << (totalOps + 1)
479 << std::endl
480 << "Note: most recent push may not be included in the history, "
481 << "most recent pull is definitely not included (but indicated "
482 << "in the first sentence above)." << std::endl;
483 // The history is unlikely to have the most recent push operation
484 // because the consumer thread runs freely until it tries to acquire
485 // the mutex to add to the history. In other words, it may have pushed
486 // any time between after we unblock from wait() and reach here, but
487 // hasn't added it to the history yet.
488 }
489 last = next;
490 }
491 myBuf.resize(0);
492 }
493 });
494
495 producerThread.join();
496 consumerThread.join();
497
498 EXPECT_EQ(0, q.size());
499 }
500