1 use std::sync::atomic::{AtomicUsize, Ordering};
2
3 use crossbeam_queue::SegQueue;
4 use crossbeam_utils::thread::scope;
5 use rand::{thread_rng, Rng};
6
7 #[test]
smoke()8 fn smoke() {
9 let q = SegQueue::new();
10 q.push(7);
11 assert_eq!(q.pop(), Some(7));
12
13 q.push(8);
14 assert_eq!(q.pop(), Some(8));
15 assert!(q.pop().is_none());
16 }
17
18 #[test]
len_empty_full()19 fn len_empty_full() {
20 let q = SegQueue::new();
21
22 assert_eq!(q.len(), 0);
23 assert!(q.is_empty());
24
25 q.push(());
26
27 assert_eq!(q.len(), 1);
28 assert!(!q.is_empty());
29
30 q.pop().unwrap();
31
32 assert_eq!(q.len(), 0);
33 assert!(q.is_empty());
34 }
35
36 #[test]
len()37 fn len() {
38 let q = SegQueue::new();
39
40 assert_eq!(q.len(), 0);
41
42 for i in 0..50 {
43 q.push(i);
44 assert_eq!(q.len(), i + 1);
45 }
46
47 for i in 0..50 {
48 q.pop().unwrap();
49 assert_eq!(q.len(), 50 - i - 1);
50 }
51
52 assert_eq!(q.len(), 0);
53 }
54
55 #[test]
spsc()56 fn spsc() {
57 #[cfg(miri)]
58 const COUNT: usize = 100;
59 #[cfg(not(miri))]
60 const COUNT: usize = 100_000;
61
62 let q = SegQueue::new();
63
64 scope(|scope| {
65 scope.spawn(|_| {
66 for i in 0..COUNT {
67 loop {
68 if let Some(x) = q.pop() {
69 assert_eq!(x, i);
70 break;
71 }
72 }
73 }
74 assert!(q.pop().is_none());
75 });
76 scope.spawn(|_| {
77 for i in 0..COUNT {
78 q.push(i);
79 }
80 });
81 })
82 .unwrap();
83 }
84
85 #[test]
mpmc()86 fn mpmc() {
87 #[cfg(miri)]
88 const COUNT: usize = 50;
89 #[cfg(not(miri))]
90 const COUNT: usize = 25_000;
91 const THREADS: usize = 4;
92
93 let q = SegQueue::<usize>::new();
94 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
95
96 scope(|scope| {
97 for _ in 0..THREADS {
98 scope.spawn(|_| {
99 for _ in 0..COUNT {
100 let n = loop {
101 if let Some(x) = q.pop() {
102 break x;
103 }
104 };
105 v[n].fetch_add(1, Ordering::SeqCst);
106 }
107 });
108 }
109 for _ in 0..THREADS {
110 scope.spawn(|_| {
111 for i in 0..COUNT {
112 q.push(i);
113 }
114 });
115 }
116 })
117 .unwrap();
118
119 for c in v {
120 assert_eq!(c.load(Ordering::SeqCst), THREADS);
121 }
122 }
123
124 #[test]
drops()125 fn drops() {
126 let runs: usize = if cfg!(miri) { 5 } else { 100 };
127 let steps: usize = if cfg!(miri) { 50 } else { 10_000 };
128 let additional: usize = if cfg!(miri) { 100 } else { 1_000 };
129
130 static DROPS: AtomicUsize = AtomicUsize::new(0);
131
132 #[derive(Debug, PartialEq)]
133 struct DropCounter;
134
135 impl Drop for DropCounter {
136 fn drop(&mut self) {
137 DROPS.fetch_add(1, Ordering::SeqCst);
138 }
139 }
140
141 let mut rng = thread_rng();
142
143 for _ in 0..runs {
144 let steps = rng.gen_range(0..steps);
145 let additional = rng.gen_range(0..additional);
146
147 DROPS.store(0, Ordering::SeqCst);
148 let q = SegQueue::new();
149
150 scope(|scope| {
151 scope.spawn(|_| {
152 for _ in 0..steps {
153 while q.pop().is_none() {}
154 }
155 });
156
157 scope.spawn(|_| {
158 for _ in 0..steps {
159 q.push(DropCounter);
160 }
161 });
162 })
163 .unwrap();
164
165 for _ in 0..additional {
166 q.push(DropCounter);
167 }
168
169 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
170 drop(q);
171 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
172 }
173 }
174
175 #[test]
into_iter()176 fn into_iter() {
177 let q = SegQueue::new();
178 for i in 0..100 {
179 q.push(i);
180 }
181 for (i, j) in q.into_iter().enumerate() {
182 assert_eq!(i, j);
183 }
184 }
185
186 #[test]
into_iter_drop()187 fn into_iter_drop() {
188 let q = SegQueue::new();
189 for i in 0..100 {
190 q.push(i);
191 }
192 for (i, j) in q.into_iter().enumerate().take(50) {
193 assert_eq!(i, j);
194 }
195 }
196