1 //! Tests for the tick channel flavor.
2
3 #![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow
4
5 use std::sync::atomic::AtomicUsize;
6 use std::sync::atomic::Ordering;
7 use std::thread;
8 use std::time::{Duration, Instant};
9
10 use crossbeam_channel::{after, select, tick, Select, TryRecvError};
11 use crossbeam_utils::thread::scope;
12
ms(ms: u64) -> Duration13 fn ms(ms: u64) -> Duration {
14 Duration::from_millis(ms)
15 }
16
17 #[test]
fire()18 fn fire() {
19 let start = Instant::now();
20 let r = tick(ms(50));
21
22 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
23 thread::sleep(ms(100));
24
25 let fired = r.try_recv().unwrap();
26 assert!(start < fired);
27 assert!(fired - start >= ms(50));
28
29 let now = Instant::now();
30 assert!(fired < now);
31 assert!(now - fired >= ms(50));
32
33 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
34
35 select! {
36 recv(r) -> _ => panic!(),
37 default => {}
38 }
39
40 select! {
41 recv(r) -> _ => {}
42 recv(tick(ms(200))) -> _ => panic!(),
43 }
44 }
45
46 #[test]
intervals()47 fn intervals() {
48 let start = Instant::now();
49 let r = tick(ms(50));
50
51 let t1 = r.recv().unwrap();
52 assert!(start + ms(50) <= t1);
53 assert!(start + ms(100) > t1);
54
55 thread::sleep(ms(300));
56 let t2 = r.try_recv().unwrap();
57 assert!(start + ms(100) <= t2);
58 assert!(start + ms(150) > t2);
59
60 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
61 let t3 = r.recv().unwrap();
62 assert!(start + ms(400) <= t3);
63 assert!(start + ms(450) > t3);
64
65 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
66 }
67
68 #[test]
capacity()69 fn capacity() {
70 const COUNT: usize = 10;
71
72 for i in 0..COUNT {
73 let r = tick(ms(i as u64));
74 assert_eq!(r.capacity(), Some(1));
75 }
76 }
77
78 #[test]
len_empty_full()79 fn len_empty_full() {
80 let r = tick(ms(50));
81
82 assert_eq!(r.len(), 0);
83 assert!(r.is_empty());
84 assert!(!r.is_full());
85
86 thread::sleep(ms(100));
87
88 assert_eq!(r.len(), 1);
89 assert!(!r.is_empty());
90 assert!(r.is_full());
91
92 r.try_recv().unwrap();
93
94 assert_eq!(r.len(), 0);
95 assert!(r.is_empty());
96 assert!(!r.is_full());
97 }
98
99 #[test]
try_recv()100 fn try_recv() {
101 let r = tick(ms(200));
102 assert!(r.try_recv().is_err());
103
104 thread::sleep(ms(100));
105 assert!(r.try_recv().is_err());
106
107 thread::sleep(ms(200));
108 assert!(r.try_recv().is_ok());
109 assert!(r.try_recv().is_err());
110
111 thread::sleep(ms(200));
112 assert!(r.try_recv().is_ok());
113 assert!(r.try_recv().is_err());
114 }
115
116 #[test]
recv()117 fn recv() {
118 let start = Instant::now();
119 let r = tick(ms(50));
120
121 let fired = r.recv().unwrap();
122 assert!(start < fired);
123 assert!(fired - start >= ms(50));
124
125 let now = Instant::now();
126 assert!(fired < now);
127 assert!(now - fired < fired - start);
128
129 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
130 }
131
132 #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow
133 #[test]
recv_timeout()134 fn recv_timeout() {
135 let start = Instant::now();
136 let r = tick(ms(200));
137
138 assert!(r.recv_timeout(ms(100)).is_err());
139 let now = Instant::now();
140 assert!(now - start >= ms(100));
141 assert!(now - start <= ms(150));
142
143 let fired = r.recv_timeout(ms(200)).unwrap();
144 assert!(fired - start >= ms(200));
145 assert!(fired - start <= ms(250));
146
147 assert!(r.recv_timeout(ms(100)).is_err());
148 let now = Instant::now();
149 assert!(now - start >= ms(300));
150 assert!(now - start <= ms(350));
151
152 let fired = r.recv_timeout(ms(200)).unwrap();
153 assert!(fired - start >= ms(400));
154 assert!(fired - start <= ms(450));
155 }
156
157 #[test]
recv_two()158 fn recv_two() {
159 let r1 = tick(ms(50));
160 let r2 = tick(ms(50));
161
162 scope(|scope| {
163 scope.spawn(|_| {
164 for _ in 0..10 {
165 select! {
166 recv(r1) -> _ => {}
167 recv(r2) -> _ => {}
168 }
169 }
170 });
171 scope.spawn(|_| {
172 for _ in 0..10 {
173 select! {
174 recv(r1) -> _ => {}
175 recv(r2) -> _ => {}
176 }
177 }
178 });
179 })
180 .unwrap();
181 }
182
183 #[test]
recv_race()184 fn recv_race() {
185 select! {
186 recv(tick(ms(50))) -> _ => {}
187 recv(tick(ms(100))) -> _ => panic!(),
188 }
189
190 select! {
191 recv(tick(ms(100))) -> _ => panic!(),
192 recv(tick(ms(50))) -> _ => {}
193 }
194 }
195
196 #[test]
stress_default()197 fn stress_default() {
198 const COUNT: usize = 10;
199
200 for _ in 0..COUNT {
201 select! {
202 recv(tick(ms(0))) -> _ => {}
203 default => panic!(),
204 }
205 }
206
207 for _ in 0..COUNT {
208 select! {
209 recv(tick(ms(100))) -> _ => panic!(),
210 default => {}
211 }
212 }
213 }
214
215 #[test]
select()216 fn select() {
217 const THREADS: usize = 4;
218
219 let hits = AtomicUsize::new(0);
220 let r1 = tick(ms(200));
221 let r2 = tick(ms(300));
222
223 scope(|scope| {
224 for _ in 0..THREADS {
225 scope.spawn(|_| {
226 let timeout = after(ms(1100));
227 loop {
228 let mut sel = Select::new();
229 let oper1 = sel.recv(&r1);
230 let oper2 = sel.recv(&r2);
231 let oper3 = sel.recv(&timeout);
232 let oper = sel.select();
233 match oper.index() {
234 i if i == oper1 => {
235 oper.recv(&r1).unwrap();
236 hits.fetch_add(1, Ordering::SeqCst);
237 }
238 i if i == oper2 => {
239 oper.recv(&r2).unwrap();
240 hits.fetch_add(1, Ordering::SeqCst);
241 }
242 i if i == oper3 => {
243 oper.recv(&timeout).unwrap();
244 break;
245 }
246 _ => unreachable!(),
247 }
248 }
249 });
250 }
251 })
252 .unwrap();
253
254 assert_eq!(hits.load(Ordering::SeqCst), 8);
255 }
256
257 #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow
258 #[test]
ready()259 fn ready() {
260 const THREADS: usize = 4;
261
262 let hits = AtomicUsize::new(0);
263 let r1 = tick(ms(200));
264 let r2 = tick(ms(300));
265
266 scope(|scope| {
267 for _ in 0..THREADS {
268 scope.spawn(|_| {
269 let timeout = after(ms(1100));
270 'outer: loop {
271 let mut sel = Select::new();
272 sel.recv(&r1);
273 sel.recv(&r2);
274 sel.recv(&timeout);
275 loop {
276 match sel.ready() {
277 0 => {
278 if r1.try_recv().is_ok() {
279 hits.fetch_add(1, Ordering::SeqCst);
280 break;
281 }
282 }
283 1 => {
284 if r2.try_recv().is_ok() {
285 hits.fetch_add(1, Ordering::SeqCst);
286 break;
287 }
288 }
289 2 => {
290 if timeout.try_recv().is_ok() {
291 break 'outer;
292 }
293 }
294 _ => unreachable!(),
295 }
296 }
297 }
298 });
299 }
300 })
301 .unwrap();
302
303 assert_eq!(hits.load(Ordering::SeqCst), 8);
304 }
305
306 #[test]
fairness()307 fn fairness() {
308 const COUNT: usize = 30;
309
310 for &dur in &[0, 1] {
311 let mut hits = [0usize; 2];
312
313 for _ in 0..COUNT {
314 let r1 = tick(ms(dur));
315 let r2 = tick(ms(dur));
316
317 for _ in 0..COUNT {
318 select! {
319 recv(r1) -> _ => hits[0] += 1,
320 recv(r2) -> _ => hits[1] += 1,
321 }
322 }
323 }
324
325 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
326 }
327 }
328
329 #[test]
fairness_duplicates()330 fn fairness_duplicates() {
331 const COUNT: usize = 30;
332
333 for &dur in &[0, 1] {
334 let mut hits = [0usize; 5];
335
336 for _ in 0..COUNT {
337 let r = tick(ms(dur));
338
339 for _ in 0..COUNT {
340 select! {
341 recv(r) -> _ => hits[0] += 1,
342 recv(r) -> _ => hits[1] += 1,
343 recv(r) -> _ => hits[2] += 1,
344 recv(r) -> _ => hits[3] += 1,
345 recv(r) -> _ => hits[4] += 1,
346 }
347 }
348 }
349
350 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
351 }
352 }
353