1 //
2 // Copyright 2023 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // CircularBuffer_unittest:
7 // Tests of the CircularBuffer class
8 //
9
10 #include <gtest/gtest.h>
11
12 #include "common/FixedQueue.h"
13 #include "common/system_utils.h"
14
15 #include <chrono>
16 #include <thread>
17
18 namespace angle
19 {
20 // Make sure the various constructors compile and do basic checks
TEST(FixedQueue,Constructors)21 TEST(FixedQueue, Constructors)
22 {
23 FixedQueue<int> q(5);
24 EXPECT_EQ(0u, q.size());
25 EXPECT_EQ(true, q.empty());
26 }
27
28 // Make sure the destructor destroys all elements.
TEST(FixedQueue,Destructor)29 TEST(FixedQueue, Destructor)
30 {
31 struct s
32 {
33 s() : counter(nullptr) {}
34 s(int *c) : counter(c) {}
35 ~s()
36 {
37 if (counter)
38 {
39 ++*counter;
40 }
41 }
42
43 s(const s &) = default;
44 s &operator=(const s &) = default;
45
46 int *counter;
47 };
48
49 int destructorCount = 0;
50
51 {
52 FixedQueue<s> q(11);
53 q.push(s(&destructorCount));
54 // Destructor called once for the temporary above.
55 EXPECT_EQ(1, destructorCount);
56 }
57
58 // Destructor should be called one more time for the element we pushed.
59 EXPECT_EQ(2, destructorCount);
60 }
61
62 // Make sure the pop destroys the element.
TEST(FixedQueue,Pop)63 TEST(FixedQueue, Pop)
64 {
65 struct s
66 {
67 s() : counter(nullptr) {}
68 s(int *c) : counter(c) {}
69 ~s()
70 {
71 if (counter)
72 {
73 ++*counter;
74 }
75 }
76
77 s(const s &) = default;
78 s &operator=(const s &s)
79 {
80 // increment if we are overwriting the custom initialized object
81 if (counter)
82 {
83 ++*counter;
84 }
85 counter = s.counter;
86 return *this;
87 }
88
89 int *counter;
90 };
91
92 int destructorCount = 0;
93
94 FixedQueue<s> q(11);
95 q.push(s(&destructorCount));
96 // Destructor called once for the temporary above.
97 EXPECT_EQ(1, destructorCount);
98 q.pop();
99 // Copy assignment should be called for the element we popped.
100 EXPECT_EQ(2, destructorCount);
101 }
102
103 // Test circulating behavior.
TEST(FixedQueue,WrapAround)104 TEST(FixedQueue, WrapAround)
105 {
106 FixedQueue<int> q(7);
107
108 for (int i = 0; i < 7; ++i)
109 {
110 q.push(i);
111 }
112
113 EXPECT_EQ(0, q.front());
114 q.pop();
115 // This should wrap around
116 q.push(7);
117 for (int i = 0; i < 7; ++i)
118 {
119 EXPECT_EQ(i + 1, q.front());
120 q.pop();
121 }
122 }
123
124 // Test concurrent push and pop behavior.
TEST(FixedQueue,ConcurrentPushPop)125 TEST(FixedQueue, ConcurrentPushPop)
126 {
127 FixedQueue<uint64_t> q(7);
128 double timeOut = 1.0;
129 uint64_t kMaxLoop = 1000000ull;
130 std::atomic<bool> enqueueThreadFinished(false);
131 std::atomic<bool> dequeueThreadFinished(false);
132
133 std::thread enqueueThread = std::thread([&]() {
134 double t1 = angle::GetCurrentSystemTime();
135 double elapsed = 0.0;
136 uint64_t value = 0;
137 do
138 {
139 while (q.full() && !dequeueThreadFinished)
140 {
141 std::this_thread::sleep_for(std::chrono::microseconds(1));
142 }
143
144 // No point pushing new values once deque thread is finished.
145 if (dequeueThreadFinished)
146 {
147 break;
148 }
149 ASSERT(!q.full());
150
151 // test push
152 q.push(value);
153 value++;
154
155 elapsed = angle::GetCurrentSystemTime() - t1;
156 } while (elapsed < timeOut && value < kMaxLoop);
157 // Can exit if: timed out, all values are pushed, or dequeue thread is finished earlier.
158 ASSERT(elapsed >= timeOut || value == kMaxLoop || dequeueThreadFinished);
159 enqueueThreadFinished = true;
160 });
161
162 std::thread dequeueThread = std::thread([&]() {
163 double t1 = angle::GetCurrentSystemTime();
164 double elapsed = 0.0;
165 uint64_t expectedValue = 0;
166 do
167 {
168 while (q.empty() && !enqueueThreadFinished)
169 {
170 std::this_thread::sleep_for(std::chrono::microseconds(1));
171 }
172
173 // Let's continue processing the queue even if enqueue thread is already finished.
174 if (q.empty())
175 {
176 ASSERT(enqueueThreadFinished);
177 break;
178 }
179
180 EXPECT_EQ(expectedValue, q.front());
181 // test pop
182 q.pop();
183
184 ASSERT(expectedValue < kMaxLoop);
185 expectedValue++;
186
187 elapsed = angle::GetCurrentSystemTime() - t1;
188 } while (elapsed < timeOut);
189 // Can exit if: timed out, or queue is empty and will stay that way.
190 ASSERT(elapsed >= timeOut || (q.empty() && enqueueThreadFinished));
191 dequeueThreadFinished = true;
192 });
193
194 enqueueThread.join();
195 dequeueThread.join();
196 }
197
198 // Test concurrent push and pop behavior. When queue is full, instead of wait, it will try to
199 // increase capacity. At dequeue thread, it will also try to shrink the queue capacity when size
200 // fall under half of the capacity.
TEST(FixedQueue,ConcurrentPushPopWithResize)201 TEST(FixedQueue, ConcurrentPushPopWithResize)
202 {
203 static constexpr size_t kInitialQueueCapacity = 64;
204 static constexpr size_t kMaxQueueCapacity = 64 * 1024;
205 FixedQueue<uint64_t> q(kInitialQueueCapacity);
206 double timeOut = 1.0;
207 uint64_t kMaxLoop = 1000000ull;
208 std::atomic<bool> enqueueThreadFinished(false);
209 std::atomic<bool> dequeueThreadFinished(false);
210 std::mutex enqueueMutex;
211 std::mutex dequeueMutex;
212
213 std::thread enqueueThread = std::thread([&]() {
214 double t1 = angle::GetCurrentSystemTime();
215 double elapsed = 0.0;
216 uint64_t value = 0;
217 do
218 {
219 std::unique_lock<std::mutex> enqueueLock(enqueueMutex);
220 if (q.full())
221 {
222 // Take both lock to ensure no one will access while we try to double the
223 // storage. Note that under a well balanced system, this should happen infrequently.
224 std::unique_lock<std::mutex> dequeueLock(dequeueMutex);
225 // Check again to see if queue is still full after taking the dequeueMutex.
226 size_t newCapacity = q.capacity() * 2;
227 if (q.full() && newCapacity <= kMaxQueueCapacity)
228 {
229 // Double the storage size while we took the lock
230 q.updateCapacity(newCapacity);
231 }
232 }
233
234 // If queue is still full, lets wait for dequeue thread to make some progress
235 while (q.full() && !dequeueThreadFinished)
236 {
237 enqueueLock.unlock();
238 std::this_thread::sleep_for(std::chrono::microseconds(1));
239 enqueueLock.lock();
240 }
241
242 // No point pushing new values once deque thread is finished.
243 if (dequeueThreadFinished)
244 {
245 break;
246 }
247 ASSERT(!q.full());
248
249 // test push
250 q.push(value);
251 value++;
252
253 elapsed = angle::GetCurrentSystemTime() - t1;
254 } while (elapsed < timeOut && value < kMaxLoop);
255 // Can exit if: timed out, all values are pushed, or dequeue thread is finished earlier.
256 ASSERT(elapsed >= timeOut || value == kMaxLoop || dequeueThreadFinished);
257 enqueueThreadFinished = true;
258 });
259
260 std::thread dequeueThread = std::thread([&]() {
261 double t1 = angle::GetCurrentSystemTime();
262 double elapsed = 0.0;
263 uint64_t expectedValue = 0;
264 do
265 {
266 std::unique_lock<std::mutex> dequeueLock(dequeueMutex);
267 if (q.size() < q.capacity() / 10 && q.capacity() > kInitialQueueCapacity)
268 {
269 // Shrink the storage if we only used less than 10% of storage. We must take both
270 // lock to ensure no one is accessing it when we update storage. And the lock must
271 // take in the same order as other thread to avoid deadlock.
272 dequeueLock.unlock();
273 std::unique_lock<std::mutex> enqueueLock(enqueueMutex);
274 dequeueLock.lock();
275 // Figure out what the new capacity should be
276 size_t newCapacity = q.capacity() / 2;
277 while (q.size() < newCapacity)
278 {
279 newCapacity /= 2;
280 }
281 newCapacity *= 2;
282 newCapacity = std::max(newCapacity, kInitialQueueCapacity);
283
284 if (newCapacity < q.capacity())
285 {
286 q.updateCapacity(newCapacity);
287 }
288 }
289
290 while (q.empty() && !enqueueThreadFinished)
291 {
292 dequeueLock.unlock();
293 std::this_thread::sleep_for(std::chrono::microseconds(1));
294 dequeueLock.lock();
295 }
296
297 // Let's continue processing the queue even if enqueue thread is already finished.
298 if (q.empty())
299 {
300 ASSERT(enqueueThreadFinished);
301 break;
302 }
303
304 EXPECT_EQ(expectedValue, q.front());
305 // test pop
306 q.pop();
307
308 ASSERT(expectedValue < kMaxLoop);
309 expectedValue++;
310
311 elapsed = angle::GetCurrentSystemTime() - t1;
312 } while (elapsed < timeOut);
313 // Can exit if: timed out, or queue is empty and will stay that way.
314 ASSERT(elapsed >= timeOut || (q.empty() && enqueueThreadFinished));
315 dequeueThreadFinished = true;
316 });
317
318 enqueueThread.join();
319 dequeueThread.join();
320 }
321
322 // Test clearing the queue
TEST(FixedQueue,Clear)323 TEST(FixedQueue, Clear)
324 {
325 FixedQueue<int> q(5);
326 for (int i = 0; i < 5; ++i)
327 {
328 q.push(i);
329 }
330 q.clear();
331 EXPECT_EQ(0u, q.size());
332 EXPECT_EQ(true, q.empty());
333 }
334 } // namespace angle
335