1 use std::sync::atomic::{AtomicUsize, Ordering};
2
3 use crossbeam_queue::ArrayQueue;
4 use crossbeam_utils::thread::scope;
5 use rand::{thread_rng, Rng};
6
7 #[test]
smoke()8 fn smoke() {
9 let q = ArrayQueue::new(1);
10
11 q.push(7).unwrap();
12 assert_eq!(q.pop(), Some(7));
13
14 q.push(8).unwrap();
15 assert_eq!(q.pop(), Some(8));
16 assert!(q.pop().is_none());
17 }
18
19 #[test]
capacity()20 fn capacity() {
21 for i in 1..10 {
22 let q = ArrayQueue::<i32>::new(i);
23 assert_eq!(q.capacity(), i);
24 }
25 }
26
27 #[test]
28 #[should_panic(expected = "capacity must be non-zero")]
zero_capacity()29 fn zero_capacity() {
30 let _ = ArrayQueue::<i32>::new(0);
31 }
32
33 #[test]
len_empty_full()34 fn len_empty_full() {
35 let q = ArrayQueue::new(2);
36
37 assert_eq!(q.len(), 0);
38 assert!(q.is_empty());
39 assert!(!q.is_full());
40
41 q.push(()).unwrap();
42
43 assert_eq!(q.len(), 1);
44 assert!(!q.is_empty());
45 assert!(!q.is_full());
46
47 q.push(()).unwrap();
48
49 assert_eq!(q.len(), 2);
50 assert!(!q.is_empty());
51 assert!(q.is_full());
52
53 q.pop().unwrap();
54
55 assert_eq!(q.len(), 1);
56 assert!(!q.is_empty());
57 assert!(!q.is_full());
58 }
59
60 #[test]
len()61 fn len() {
62 #[cfg(miri)]
63 const COUNT: usize = 30;
64 #[cfg(not(miri))]
65 const COUNT: usize = 25_000;
66 #[cfg(miri)]
67 const CAP: usize = 40;
68 #[cfg(not(miri))]
69 const CAP: usize = 1000;
70 const ITERS: usize = CAP / 20;
71
72 let q = ArrayQueue::new(CAP);
73 assert_eq!(q.len(), 0);
74
75 for _ in 0..CAP / 10 {
76 for i in 0..ITERS {
77 q.push(i).unwrap();
78 assert_eq!(q.len(), i + 1);
79 }
80
81 for i in 0..ITERS {
82 q.pop().unwrap();
83 assert_eq!(q.len(), ITERS - i - 1);
84 }
85 }
86 assert_eq!(q.len(), 0);
87
88 for i in 0..CAP {
89 q.push(i).unwrap();
90 assert_eq!(q.len(), i + 1);
91 }
92
93 for _ in 0..CAP {
94 q.pop().unwrap();
95 }
96 assert_eq!(q.len(), 0);
97
98 scope(|scope| {
99 scope.spawn(|_| {
100 for i in 0..COUNT {
101 loop {
102 if let Some(x) = q.pop() {
103 assert_eq!(x, i);
104 break;
105 }
106 }
107 let len = q.len();
108 assert!(len <= CAP);
109 }
110 });
111
112 scope.spawn(|_| {
113 for i in 0..COUNT {
114 while q.push(i).is_err() {}
115 let len = q.len();
116 assert!(len <= CAP);
117 }
118 });
119 })
120 .unwrap();
121 assert_eq!(q.len(), 0);
122 }
123
124 #[test]
spsc()125 fn spsc() {
126 #[cfg(miri)]
127 const COUNT: usize = 50;
128 #[cfg(not(miri))]
129 const COUNT: usize = 100_000;
130
131 let q = ArrayQueue::new(3);
132
133 scope(|scope| {
134 scope.spawn(|_| {
135 for i in 0..COUNT {
136 loop {
137 if let Some(x) = q.pop() {
138 assert_eq!(x, i);
139 break;
140 }
141 }
142 }
143 assert!(q.pop().is_none());
144 });
145
146 scope.spawn(|_| {
147 for i in 0..COUNT {
148 while q.push(i).is_err() {}
149 }
150 });
151 })
152 .unwrap();
153 }
154
155 #[test]
spsc_ring_buffer()156 fn spsc_ring_buffer() {
157 #[cfg(miri)]
158 const COUNT: usize = 50;
159 #[cfg(not(miri))]
160 const COUNT: usize = 100_000;
161
162 let t = AtomicUsize::new(1);
163 let q = ArrayQueue::<usize>::new(3);
164 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
165
166 scope(|scope| {
167 scope.spawn(|_| loop {
168 match t.load(Ordering::SeqCst) {
169 0 if q.is_empty() => break,
170
171 _ => {
172 while let Some(n) = q.pop() {
173 v[n].fetch_add(1, Ordering::SeqCst);
174 }
175 }
176 }
177 });
178
179 scope.spawn(|_| {
180 for i in 0..COUNT {
181 if let Some(n) = q.force_push(i) {
182 v[n].fetch_add(1, Ordering::SeqCst);
183 }
184 }
185
186 t.fetch_sub(1, Ordering::SeqCst);
187 });
188 })
189 .unwrap();
190
191 for c in v {
192 assert_eq!(c.load(Ordering::SeqCst), 1);
193 }
194 }
195
196 #[test]
mpmc()197 fn mpmc() {
198 #[cfg(miri)]
199 const COUNT: usize = 50;
200 #[cfg(not(miri))]
201 const COUNT: usize = 25_000;
202 const THREADS: usize = 4;
203
204 let q = ArrayQueue::<usize>::new(3);
205 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
206
207 scope(|scope| {
208 for _ in 0..THREADS {
209 scope.spawn(|_| {
210 for _ in 0..COUNT {
211 let n = loop {
212 if let Some(x) = q.pop() {
213 break x;
214 }
215 };
216 v[n].fetch_add(1, Ordering::SeqCst);
217 }
218 });
219 }
220 for _ in 0..THREADS {
221 scope.spawn(|_| {
222 for i in 0..COUNT {
223 while q.push(i).is_err() {}
224 }
225 });
226 }
227 })
228 .unwrap();
229
230 for c in v {
231 assert_eq!(c.load(Ordering::SeqCst), THREADS);
232 }
233 }
234
235 #[test]
mpmc_ring_buffer()236 fn mpmc_ring_buffer() {
237 #[cfg(miri)]
238 const COUNT: usize = 50;
239 #[cfg(not(miri))]
240 const COUNT: usize = 25_000;
241 const THREADS: usize = 4;
242
243 let t = AtomicUsize::new(THREADS);
244 let q = ArrayQueue::<usize>::new(3);
245 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
246
247 scope(|scope| {
248 for _ in 0..THREADS {
249 scope.spawn(|_| loop {
250 match t.load(Ordering::SeqCst) {
251 0 if q.is_empty() => break,
252
253 _ => {
254 while let Some(n) = q.pop() {
255 v[n].fetch_add(1, Ordering::SeqCst);
256 }
257 }
258 }
259 });
260 }
261
262 for _ in 0..THREADS {
263 scope.spawn(|_| {
264 for i in 0..COUNT {
265 if let Some(n) = q.force_push(i) {
266 v[n].fetch_add(1, Ordering::SeqCst);
267 }
268 }
269
270 t.fetch_sub(1, Ordering::SeqCst);
271 });
272 }
273 })
274 .unwrap();
275
276 for c in v {
277 assert_eq!(c.load(Ordering::SeqCst), THREADS);
278 }
279 }
280
281 #[test]
drops()282 fn drops() {
283 let runs: usize = if cfg!(miri) { 3 } else { 100 };
284 let steps: usize = if cfg!(miri) { 50 } else { 10_000 };
285 let additional: usize = if cfg!(miri) { 10 } else { 50 };
286
287 static DROPS: AtomicUsize = AtomicUsize::new(0);
288
289 #[derive(Debug, PartialEq)]
290 struct DropCounter;
291
292 impl Drop for DropCounter {
293 fn drop(&mut self) {
294 DROPS.fetch_add(1, Ordering::SeqCst);
295 }
296 }
297
298 let mut rng = thread_rng();
299
300 for _ in 0..runs {
301 let steps = rng.gen_range(0..steps);
302 let additional = rng.gen_range(0..additional);
303
304 DROPS.store(0, Ordering::SeqCst);
305 let q = ArrayQueue::new(50);
306
307 scope(|scope| {
308 scope.spawn(|_| {
309 for _ in 0..steps {
310 while q.pop().is_none() {}
311 }
312 });
313
314 scope.spawn(|_| {
315 for _ in 0..steps {
316 while q.push(DropCounter).is_err() {
317 DROPS.fetch_sub(1, Ordering::SeqCst);
318 }
319 }
320 });
321 })
322 .unwrap();
323
324 for _ in 0..additional {
325 q.push(DropCounter).unwrap();
326 }
327
328 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
329 drop(q);
330 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
331 }
332 }
333
334 #[test]
linearizable()335 fn linearizable() {
336 #[cfg(miri)]
337 const COUNT: usize = 100;
338 #[cfg(not(miri))]
339 const COUNT: usize = 25_000;
340 const THREADS: usize = 4;
341
342 let q = ArrayQueue::new(THREADS);
343
344 scope(|scope| {
345 for _ in 0..THREADS / 2 {
346 scope.spawn(|_| {
347 for _ in 0..COUNT {
348 while q.push(0).is_err() {}
349 q.pop().unwrap();
350 }
351 });
352
353 scope.spawn(|_| {
354 for _ in 0..COUNT {
355 if q.force_push(0).is_none() {
356 q.pop().unwrap();
357 }
358 }
359 });
360 }
361 })
362 .unwrap();
363 }
364
365 #[test]
into_iter()366 fn into_iter() {
367 let q = ArrayQueue::new(100);
368 for i in 0..100 {
369 q.push(i).unwrap();
370 }
371 for (i, j) in q.into_iter().enumerate() {
372 assert_eq!(i, j);
373 }
374 }
375