xref: /aosp_15_r20/external/angle/src/common/FixedQueue_unittest.cpp (revision 8975f5c5ed3d1c378011245431ada316dfb6f244)
1 //
2 // Copyright 2023 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // CircularBuffer_unittest:
7 //   Tests of the CircularBuffer class
8 //
9 
10 #include <gtest/gtest.h>
11 
12 #include "common/FixedQueue.h"
13 #include "common/system_utils.h"
14 
15 #include <chrono>
16 #include <thread>
17 
18 namespace angle
19 {
20 // Make sure the various constructors compile and do basic checks
TEST(FixedQueue,Constructors)21 TEST(FixedQueue, Constructors)
22 {
23     FixedQueue<int> q(5);
24     EXPECT_EQ(0u, q.size());
25     EXPECT_EQ(true, q.empty());
26 }
27 
28 // Make sure the destructor destroys all elements.
TEST(FixedQueue,Destructor)29 TEST(FixedQueue, Destructor)
30 {
31     struct s
32     {
33         s() : counter(nullptr) {}
34         s(int *c) : counter(c) {}
35         ~s()
36         {
37             if (counter)
38             {
39                 ++*counter;
40             }
41         }
42 
43         s(const s &)            = default;
44         s &operator=(const s &) = default;
45 
46         int *counter;
47     };
48 
49     int destructorCount = 0;
50 
51     {
52         FixedQueue<s> q(11);
53         q.push(s(&destructorCount));
54         // Destructor called once for the temporary above.
55         EXPECT_EQ(1, destructorCount);
56     }
57 
58     // Destructor should be called one more time for the element we pushed.
59     EXPECT_EQ(2, destructorCount);
60 }
61 
62 // Make sure the pop destroys the element.
TEST(FixedQueue,Pop)63 TEST(FixedQueue, Pop)
64 {
65     struct s
66     {
67         s() : counter(nullptr) {}
68         s(int *c) : counter(c) {}
69         ~s()
70         {
71             if (counter)
72             {
73                 ++*counter;
74             }
75         }
76 
77         s(const s &) = default;
78         s &operator=(const s &s)
79         {
80             // increment if we are overwriting the custom initialized object
81             if (counter)
82             {
83                 ++*counter;
84             }
85             counter = s.counter;
86             return *this;
87         }
88 
89         int *counter;
90     };
91 
92     int destructorCount = 0;
93 
94     FixedQueue<s> q(11);
95     q.push(s(&destructorCount));
96     // Destructor called once for the temporary above.
97     EXPECT_EQ(1, destructorCount);
98     q.pop();
99     // Copy assignment should be called for the element we popped.
100     EXPECT_EQ(2, destructorCount);
101 }
102 
103 // Test circulating behavior.
TEST(FixedQueue,WrapAround)104 TEST(FixedQueue, WrapAround)
105 {
106     FixedQueue<int> q(7);
107 
108     for (int i = 0; i < 7; ++i)
109     {
110         q.push(i);
111     }
112 
113     EXPECT_EQ(0, q.front());
114     q.pop();
115     // This should wrap around
116     q.push(7);
117     for (int i = 0; i < 7; ++i)
118     {
119         EXPECT_EQ(i + 1, q.front());
120         q.pop();
121     }
122 }
123 
124 // Test concurrent push and pop behavior.
TEST(FixedQueue,ConcurrentPushPop)125 TEST(FixedQueue, ConcurrentPushPop)
126 {
127     FixedQueue<uint64_t> q(7);
128     double timeOut    = 1.0;
129     uint64_t kMaxLoop = 1000000ull;
130     std::atomic<bool> enqueueThreadFinished(false);
131     std::atomic<bool> dequeueThreadFinished(false);
132 
133     std::thread enqueueThread = std::thread([&]() {
134         double t1      = angle::GetCurrentSystemTime();
135         double elapsed = 0.0;
136         uint64_t value = 0;
137         do
138         {
139             while (q.full() && !dequeueThreadFinished)
140             {
141                 std::this_thread::sleep_for(std::chrono::microseconds(1));
142             }
143 
144             // No point pushing new values once deque thread is finished.
145             if (dequeueThreadFinished)
146             {
147                 break;
148             }
149             ASSERT(!q.full());
150 
151             // test push
152             q.push(value);
153             value++;
154 
155             elapsed = angle::GetCurrentSystemTime() - t1;
156         } while (elapsed < timeOut && value < kMaxLoop);
157         // Can exit if: timed out, all values are pushed, or dequeue thread is finished earlier.
158         ASSERT(elapsed >= timeOut || value == kMaxLoop || dequeueThreadFinished);
159         enqueueThreadFinished = true;
160     });
161 
162     std::thread dequeueThread = std::thread([&]() {
163         double t1              = angle::GetCurrentSystemTime();
164         double elapsed         = 0.0;
165         uint64_t expectedValue = 0;
166         do
167         {
168             while (q.empty() && !enqueueThreadFinished)
169             {
170                 std::this_thread::sleep_for(std::chrono::microseconds(1));
171             }
172 
173             // Let's continue processing the queue even if enqueue thread is already finished.
174             if (q.empty())
175             {
176                 ASSERT(enqueueThreadFinished);
177                 break;
178             }
179 
180             EXPECT_EQ(expectedValue, q.front());
181             // test pop
182             q.pop();
183 
184             ASSERT(expectedValue < kMaxLoop);
185             expectedValue++;
186 
187             elapsed = angle::GetCurrentSystemTime() - t1;
188         } while (elapsed < timeOut);
189         // Can exit if: timed out, or queue is empty and will stay that way.
190         ASSERT(elapsed >= timeOut || (q.empty() && enqueueThreadFinished));
191         dequeueThreadFinished = true;
192     });
193 
194     enqueueThread.join();
195     dequeueThread.join();
196 }
197 
198 // Test concurrent push and pop behavior. When queue is full, instead of wait, it will try to
199 // increase capacity. At dequeue thread, it will also try to shrink the queue capacity when size
200 // fall under half of the capacity.
TEST(FixedQueue,ConcurrentPushPopWithResize)201 TEST(FixedQueue, ConcurrentPushPopWithResize)
202 {
203     static constexpr size_t kInitialQueueCapacity = 64;
204     static constexpr size_t kMaxQueueCapacity     = 64 * 1024;
205     FixedQueue<uint64_t> q(kInitialQueueCapacity);
206     double timeOut    = 1.0;
207     uint64_t kMaxLoop = 1000000ull;
208     std::atomic<bool> enqueueThreadFinished(false);
209     std::atomic<bool> dequeueThreadFinished(false);
210     std::mutex enqueueMutex;
211     std::mutex dequeueMutex;
212 
213     std::thread enqueueThread = std::thread([&]() {
214         double t1      = angle::GetCurrentSystemTime();
215         double elapsed = 0.0;
216         uint64_t value = 0;
217         do
218         {
219             std::unique_lock<std::mutex> enqueueLock(enqueueMutex);
220             if (q.full())
221             {
222                 // Take both lock to ensure no one will access while we try to double the
223                 // storage. Note that under a well balanced system, this should happen infrequently.
224                 std::unique_lock<std::mutex> dequeueLock(dequeueMutex);
225                 // Check again to see if queue is still full after taking the dequeueMutex.
226                 size_t newCapacity = q.capacity() * 2;
227                 if (q.full() && newCapacity <= kMaxQueueCapacity)
228                 {
229                     // Double the storage size while we took the lock
230                     q.updateCapacity(newCapacity);
231                 }
232             }
233 
234             // If queue is still full, lets wait for dequeue thread to make some progress
235             while (q.full() && !dequeueThreadFinished)
236             {
237                 enqueueLock.unlock();
238                 std::this_thread::sleep_for(std::chrono::microseconds(1));
239                 enqueueLock.lock();
240             }
241 
242             // No point pushing new values once deque thread is finished.
243             if (dequeueThreadFinished)
244             {
245                 break;
246             }
247             ASSERT(!q.full());
248 
249             // test push
250             q.push(value);
251             value++;
252 
253             elapsed = angle::GetCurrentSystemTime() - t1;
254         } while (elapsed < timeOut && value < kMaxLoop);
255         // Can exit if: timed out, all values are pushed, or dequeue thread is finished earlier.
256         ASSERT(elapsed >= timeOut || value == kMaxLoop || dequeueThreadFinished);
257         enqueueThreadFinished = true;
258     });
259 
260     std::thread dequeueThread = std::thread([&]() {
261         double t1              = angle::GetCurrentSystemTime();
262         double elapsed         = 0.0;
263         uint64_t expectedValue = 0;
264         do
265         {
266             std::unique_lock<std::mutex> dequeueLock(dequeueMutex);
267             if (q.size() < q.capacity() / 10 && q.capacity() > kInitialQueueCapacity)
268             {
269                 // Shrink the storage if we only used less than 10% of storage. We must take both
270                 // lock to ensure no one is accessing it when we update storage. And the lock must
271                 // take in the same order as other thread to avoid deadlock.
272                 dequeueLock.unlock();
273                 std::unique_lock<std::mutex> enqueueLock(enqueueMutex);
274                 dequeueLock.lock();
275                 // Figure out what the new capacity should be
276                 size_t newCapacity = q.capacity() / 2;
277                 while (q.size() < newCapacity)
278                 {
279                     newCapacity /= 2;
280                 }
281                 newCapacity *= 2;
282                 newCapacity = std::max(newCapacity, kInitialQueueCapacity);
283 
284                 if (newCapacity < q.capacity())
285                 {
286                     q.updateCapacity(newCapacity);
287                 }
288             }
289 
290             while (q.empty() && !enqueueThreadFinished)
291             {
292                 dequeueLock.unlock();
293                 std::this_thread::sleep_for(std::chrono::microseconds(1));
294                 dequeueLock.lock();
295             }
296 
297             // Let's continue processing the queue even if enqueue thread is already finished.
298             if (q.empty())
299             {
300                 ASSERT(enqueueThreadFinished);
301                 break;
302             }
303 
304             EXPECT_EQ(expectedValue, q.front());
305             // test pop
306             q.pop();
307 
308             ASSERT(expectedValue < kMaxLoop);
309             expectedValue++;
310 
311             elapsed = angle::GetCurrentSystemTime() - t1;
312         } while (elapsed < timeOut);
313         // Can exit if: timed out, or queue is empty and will stay that way.
314         ASSERT(elapsed >= timeOut || (q.empty() && enqueueThreadFinished));
315         dequeueThreadFinished = true;
316     });
317 
318     enqueueThread.join();
319     dequeueThread.join();
320 }
321 
322 // Test clearing the queue
TEST(FixedQueue,Clear)323 TEST(FixedQueue, Clear)
324 {
325     FixedQueue<int> q(5);
326     for (int i = 0; i < 5; ++i)
327     {
328         q.push(i);
329     }
330     q.clear();
331     EXPECT_EQ(0u, q.size());
332     EXPECT_EQ(true, q.empty());
333 }
334 }  // namespace angle
335