1 use std::sync::atomic::{AtomicUsize, Ordering};
2 
3 use crossbeam_queue::ArrayQueue;
4 use crossbeam_utils::thread::scope;
5 use rand::{thread_rng, Rng};
6 
7 #[test]
smoke()8 fn smoke() {
9     let q = ArrayQueue::new(1);
10 
11     q.push(7).unwrap();
12     assert_eq!(q.pop(), Some(7));
13 
14     q.push(8).unwrap();
15     assert_eq!(q.pop(), Some(8));
16     assert!(q.pop().is_none());
17 }
18 
19 #[test]
capacity()20 fn capacity() {
21     for i in 1..10 {
22         let q = ArrayQueue::<i32>::new(i);
23         assert_eq!(q.capacity(), i);
24     }
25 }
26 
27 #[test]
28 #[should_panic(expected = "capacity must be non-zero")]
zero_capacity()29 fn zero_capacity() {
30     let _ = ArrayQueue::<i32>::new(0);
31 }
32 
33 #[test]
len_empty_full()34 fn len_empty_full() {
35     let q = ArrayQueue::new(2);
36 
37     assert_eq!(q.len(), 0);
38     assert!(q.is_empty());
39     assert!(!q.is_full());
40 
41     q.push(()).unwrap();
42 
43     assert_eq!(q.len(), 1);
44     assert!(!q.is_empty());
45     assert!(!q.is_full());
46 
47     q.push(()).unwrap();
48 
49     assert_eq!(q.len(), 2);
50     assert!(!q.is_empty());
51     assert!(q.is_full());
52 
53     q.pop().unwrap();
54 
55     assert_eq!(q.len(), 1);
56     assert!(!q.is_empty());
57     assert!(!q.is_full());
58 }
59 
60 #[test]
len()61 fn len() {
62     #[cfg(miri)]
63     const COUNT: usize = 30;
64     #[cfg(not(miri))]
65     const COUNT: usize = 25_000;
66     #[cfg(miri)]
67     const CAP: usize = 40;
68     #[cfg(not(miri))]
69     const CAP: usize = 1000;
70     const ITERS: usize = CAP / 20;
71 
72     let q = ArrayQueue::new(CAP);
73     assert_eq!(q.len(), 0);
74 
75     for _ in 0..CAP / 10 {
76         for i in 0..ITERS {
77             q.push(i).unwrap();
78             assert_eq!(q.len(), i + 1);
79         }
80 
81         for i in 0..ITERS {
82             q.pop().unwrap();
83             assert_eq!(q.len(), ITERS - i - 1);
84         }
85     }
86     assert_eq!(q.len(), 0);
87 
88     for i in 0..CAP {
89         q.push(i).unwrap();
90         assert_eq!(q.len(), i + 1);
91     }
92 
93     for _ in 0..CAP {
94         q.pop().unwrap();
95     }
96     assert_eq!(q.len(), 0);
97 
98     scope(|scope| {
99         scope.spawn(|_| {
100             for i in 0..COUNT {
101                 loop {
102                     if let Some(x) = q.pop() {
103                         assert_eq!(x, i);
104                         break;
105                     }
106                 }
107                 let len = q.len();
108                 assert!(len <= CAP);
109             }
110         });
111 
112         scope.spawn(|_| {
113             for i in 0..COUNT {
114                 while q.push(i).is_err() {}
115                 let len = q.len();
116                 assert!(len <= CAP);
117             }
118         });
119     })
120     .unwrap();
121     assert_eq!(q.len(), 0);
122 }
123 
124 #[test]
spsc()125 fn spsc() {
126     #[cfg(miri)]
127     const COUNT: usize = 50;
128     #[cfg(not(miri))]
129     const COUNT: usize = 100_000;
130 
131     let q = ArrayQueue::new(3);
132 
133     scope(|scope| {
134         scope.spawn(|_| {
135             for i in 0..COUNT {
136                 loop {
137                     if let Some(x) = q.pop() {
138                         assert_eq!(x, i);
139                         break;
140                     }
141                 }
142             }
143             assert!(q.pop().is_none());
144         });
145 
146         scope.spawn(|_| {
147             for i in 0..COUNT {
148                 while q.push(i).is_err() {}
149             }
150         });
151     })
152     .unwrap();
153 }
154 
155 #[test]
spsc_ring_buffer()156 fn spsc_ring_buffer() {
157     #[cfg(miri)]
158     const COUNT: usize = 50;
159     #[cfg(not(miri))]
160     const COUNT: usize = 100_000;
161 
162     let t = AtomicUsize::new(1);
163     let q = ArrayQueue::<usize>::new(3);
164     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
165 
166     scope(|scope| {
167         scope.spawn(|_| loop {
168             match t.load(Ordering::SeqCst) {
169                 0 if q.is_empty() => break,
170 
171                 _ => {
172                     while let Some(n) = q.pop() {
173                         v[n].fetch_add(1, Ordering::SeqCst);
174                     }
175                 }
176             }
177         });
178 
179         scope.spawn(|_| {
180             for i in 0..COUNT {
181                 if let Some(n) = q.force_push(i) {
182                     v[n].fetch_add(1, Ordering::SeqCst);
183                 }
184             }
185 
186             t.fetch_sub(1, Ordering::SeqCst);
187         });
188     })
189     .unwrap();
190 
191     for c in v {
192         assert_eq!(c.load(Ordering::SeqCst), 1);
193     }
194 }
195 
196 #[test]
mpmc()197 fn mpmc() {
198     #[cfg(miri)]
199     const COUNT: usize = 50;
200     #[cfg(not(miri))]
201     const COUNT: usize = 25_000;
202     const THREADS: usize = 4;
203 
204     let q = ArrayQueue::<usize>::new(3);
205     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
206 
207     scope(|scope| {
208         for _ in 0..THREADS {
209             scope.spawn(|_| {
210                 for _ in 0..COUNT {
211                     let n = loop {
212                         if let Some(x) = q.pop() {
213                             break x;
214                         }
215                     };
216                     v[n].fetch_add(1, Ordering::SeqCst);
217                 }
218             });
219         }
220         for _ in 0..THREADS {
221             scope.spawn(|_| {
222                 for i in 0..COUNT {
223                     while q.push(i).is_err() {}
224                 }
225             });
226         }
227     })
228     .unwrap();
229 
230     for c in v {
231         assert_eq!(c.load(Ordering::SeqCst), THREADS);
232     }
233 }
234 
235 #[test]
mpmc_ring_buffer()236 fn mpmc_ring_buffer() {
237     #[cfg(miri)]
238     const COUNT: usize = 50;
239     #[cfg(not(miri))]
240     const COUNT: usize = 25_000;
241     const THREADS: usize = 4;
242 
243     let t = AtomicUsize::new(THREADS);
244     let q = ArrayQueue::<usize>::new(3);
245     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
246 
247     scope(|scope| {
248         for _ in 0..THREADS {
249             scope.spawn(|_| loop {
250                 match t.load(Ordering::SeqCst) {
251                     0 if q.is_empty() => break,
252 
253                     _ => {
254                         while let Some(n) = q.pop() {
255                             v[n].fetch_add(1, Ordering::SeqCst);
256                         }
257                     }
258                 }
259             });
260         }
261 
262         for _ in 0..THREADS {
263             scope.spawn(|_| {
264                 for i in 0..COUNT {
265                     if let Some(n) = q.force_push(i) {
266                         v[n].fetch_add(1, Ordering::SeqCst);
267                     }
268                 }
269 
270                 t.fetch_sub(1, Ordering::SeqCst);
271             });
272         }
273     })
274     .unwrap();
275 
276     for c in v {
277         assert_eq!(c.load(Ordering::SeqCst), THREADS);
278     }
279 }
280 
281 #[test]
drops()282 fn drops() {
283     let runs: usize = if cfg!(miri) { 3 } else { 100 };
284     let steps: usize = if cfg!(miri) { 50 } else { 10_000 };
285     let additional: usize = if cfg!(miri) { 10 } else { 50 };
286 
287     static DROPS: AtomicUsize = AtomicUsize::new(0);
288 
289     #[derive(Debug, PartialEq)]
290     struct DropCounter;
291 
292     impl Drop for DropCounter {
293         fn drop(&mut self) {
294             DROPS.fetch_add(1, Ordering::SeqCst);
295         }
296     }
297 
298     let mut rng = thread_rng();
299 
300     for _ in 0..runs {
301         let steps = rng.gen_range(0..steps);
302         let additional = rng.gen_range(0..additional);
303 
304         DROPS.store(0, Ordering::SeqCst);
305         let q = ArrayQueue::new(50);
306 
307         scope(|scope| {
308             scope.spawn(|_| {
309                 for _ in 0..steps {
310                     while q.pop().is_none() {}
311                 }
312             });
313 
314             scope.spawn(|_| {
315                 for _ in 0..steps {
316                     while q.push(DropCounter).is_err() {
317                         DROPS.fetch_sub(1, Ordering::SeqCst);
318                     }
319                 }
320             });
321         })
322         .unwrap();
323 
324         for _ in 0..additional {
325             q.push(DropCounter).unwrap();
326         }
327 
328         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
329         drop(q);
330         assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
331     }
332 }
333 
334 #[test]
linearizable()335 fn linearizable() {
336     #[cfg(miri)]
337     const COUNT: usize = 100;
338     #[cfg(not(miri))]
339     const COUNT: usize = 25_000;
340     const THREADS: usize = 4;
341 
342     let q = ArrayQueue::new(THREADS);
343 
344     scope(|scope| {
345         for _ in 0..THREADS / 2 {
346             scope.spawn(|_| {
347                 for _ in 0..COUNT {
348                     while q.push(0).is_err() {}
349                     q.pop().unwrap();
350                 }
351             });
352 
353             scope.spawn(|_| {
354                 for _ in 0..COUNT {
355                     if q.force_push(0).is_none() {
356                         q.pop().unwrap();
357                     }
358                 }
359             });
360         }
361     })
362     .unwrap();
363 }
364 
365 #[test]
into_iter()366 fn into_iter() {
367     let q = ArrayQueue::new(100);
368     for i in 0..100 {
369         q.push(i).unwrap();
370     }
371     for (i, j) in q.into_iter().enumerate() {
372         assert_eq!(i, j);
373     }
374 }
375