1 //! Tests for the tick channel flavor.
2 
3 #![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow
4 
5 use std::sync::atomic::AtomicUsize;
6 use std::sync::atomic::Ordering;
7 use std::thread;
8 use std::time::{Duration, Instant};
9 
10 use crossbeam_channel::{after, select, tick, Select, TryRecvError};
11 use crossbeam_utils::thread::scope;
12 
ms(ms: u64) -> Duration13 fn ms(ms: u64) -> Duration {
14     Duration::from_millis(ms)
15 }
16 
17 #[test]
fire()18 fn fire() {
19     let start = Instant::now();
20     let r = tick(ms(50));
21 
22     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
23     thread::sleep(ms(100));
24 
25     let fired = r.try_recv().unwrap();
26     assert!(start < fired);
27     assert!(fired - start >= ms(50));
28 
29     let now = Instant::now();
30     assert!(fired < now);
31     assert!(now - fired >= ms(50));
32 
33     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
34 
35     select! {
36         recv(r) -> _ => panic!(),
37         default => {}
38     }
39 
40     select! {
41         recv(r) -> _ => {}
42         recv(tick(ms(200))) -> _ => panic!(),
43     }
44 }
45 
46 #[test]
intervals()47 fn intervals() {
48     let start = Instant::now();
49     let r = tick(ms(50));
50 
51     let t1 = r.recv().unwrap();
52     assert!(start + ms(50) <= t1);
53     assert!(start + ms(100) > t1);
54 
55     thread::sleep(ms(300));
56     let t2 = r.try_recv().unwrap();
57     assert!(start + ms(100) <= t2);
58     assert!(start + ms(150) > t2);
59 
60     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
61     let t3 = r.recv().unwrap();
62     assert!(start + ms(400) <= t3);
63     assert!(start + ms(450) > t3);
64 
65     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
66 }
67 
68 #[test]
capacity()69 fn capacity() {
70     const COUNT: usize = 10;
71 
72     for i in 0..COUNT {
73         let r = tick(ms(i as u64));
74         assert_eq!(r.capacity(), Some(1));
75     }
76 }
77 
78 #[test]
len_empty_full()79 fn len_empty_full() {
80     let r = tick(ms(50));
81 
82     assert_eq!(r.len(), 0);
83     assert!(r.is_empty());
84     assert!(!r.is_full());
85 
86     thread::sleep(ms(100));
87 
88     assert_eq!(r.len(), 1);
89     assert!(!r.is_empty());
90     assert!(r.is_full());
91 
92     r.try_recv().unwrap();
93 
94     assert_eq!(r.len(), 0);
95     assert!(r.is_empty());
96     assert!(!r.is_full());
97 }
98 
99 #[test]
try_recv()100 fn try_recv() {
101     let r = tick(ms(200));
102     assert!(r.try_recv().is_err());
103 
104     thread::sleep(ms(100));
105     assert!(r.try_recv().is_err());
106 
107     thread::sleep(ms(200));
108     assert!(r.try_recv().is_ok());
109     assert!(r.try_recv().is_err());
110 
111     thread::sleep(ms(200));
112     assert!(r.try_recv().is_ok());
113     assert!(r.try_recv().is_err());
114 }
115 
116 #[test]
recv()117 fn recv() {
118     let start = Instant::now();
119     let r = tick(ms(50));
120 
121     let fired = r.recv().unwrap();
122     assert!(start < fired);
123     assert!(fired - start >= ms(50));
124 
125     let now = Instant::now();
126     assert!(fired < now);
127     assert!(now - fired < fired - start);
128 
129     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
130 }
131 
132 #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow
133 #[test]
recv_timeout()134 fn recv_timeout() {
135     let start = Instant::now();
136     let r = tick(ms(200));
137 
138     assert!(r.recv_timeout(ms(100)).is_err());
139     let now = Instant::now();
140     assert!(now - start >= ms(100));
141     assert!(now - start <= ms(150));
142 
143     let fired = r.recv_timeout(ms(200)).unwrap();
144     assert!(fired - start >= ms(200));
145     assert!(fired - start <= ms(250));
146 
147     assert!(r.recv_timeout(ms(100)).is_err());
148     let now = Instant::now();
149     assert!(now - start >= ms(300));
150     assert!(now - start <= ms(350));
151 
152     let fired = r.recv_timeout(ms(200)).unwrap();
153     assert!(fired - start >= ms(400));
154     assert!(fired - start <= ms(450));
155 }
156 
157 #[test]
recv_two()158 fn recv_two() {
159     let r1 = tick(ms(50));
160     let r2 = tick(ms(50));
161 
162     scope(|scope| {
163         scope.spawn(|_| {
164             for _ in 0..10 {
165                 select! {
166                     recv(r1) -> _ => {}
167                     recv(r2) -> _ => {}
168                 }
169             }
170         });
171         scope.spawn(|_| {
172             for _ in 0..10 {
173                 select! {
174                     recv(r1) -> _ => {}
175                     recv(r2) -> _ => {}
176                 }
177             }
178         });
179     })
180     .unwrap();
181 }
182 
183 #[test]
recv_race()184 fn recv_race() {
185     select! {
186         recv(tick(ms(50))) -> _ => {}
187         recv(tick(ms(100))) -> _ => panic!(),
188     }
189 
190     select! {
191         recv(tick(ms(100))) -> _ => panic!(),
192         recv(tick(ms(50))) -> _ => {}
193     }
194 }
195 
196 #[test]
stress_default()197 fn stress_default() {
198     const COUNT: usize = 10;
199 
200     for _ in 0..COUNT {
201         select! {
202             recv(tick(ms(0))) -> _ => {}
203             default => panic!(),
204         }
205     }
206 
207     for _ in 0..COUNT {
208         select! {
209             recv(tick(ms(100))) -> _ => panic!(),
210             default => {}
211         }
212     }
213 }
214 
215 #[test]
select()216 fn select() {
217     const THREADS: usize = 4;
218 
219     let hits = AtomicUsize::new(0);
220     let r1 = tick(ms(200));
221     let r2 = tick(ms(300));
222 
223     scope(|scope| {
224         for _ in 0..THREADS {
225             scope.spawn(|_| {
226                 let timeout = after(ms(1100));
227                 loop {
228                     let mut sel = Select::new();
229                     let oper1 = sel.recv(&r1);
230                     let oper2 = sel.recv(&r2);
231                     let oper3 = sel.recv(&timeout);
232                     let oper = sel.select();
233                     match oper.index() {
234                         i if i == oper1 => {
235                             oper.recv(&r1).unwrap();
236                             hits.fetch_add(1, Ordering::SeqCst);
237                         }
238                         i if i == oper2 => {
239                             oper.recv(&r2).unwrap();
240                             hits.fetch_add(1, Ordering::SeqCst);
241                         }
242                         i if i == oper3 => {
243                             oper.recv(&timeout).unwrap();
244                             break;
245                         }
246                         _ => unreachable!(),
247                     }
248                 }
249             });
250         }
251     })
252     .unwrap();
253 
254     assert_eq!(hits.load(Ordering::SeqCst), 8);
255 }
256 
257 #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow
258 #[test]
ready()259 fn ready() {
260     const THREADS: usize = 4;
261 
262     let hits = AtomicUsize::new(0);
263     let r1 = tick(ms(200));
264     let r2 = tick(ms(300));
265 
266     scope(|scope| {
267         for _ in 0..THREADS {
268             scope.spawn(|_| {
269                 let timeout = after(ms(1100));
270                 'outer: loop {
271                     let mut sel = Select::new();
272                     sel.recv(&r1);
273                     sel.recv(&r2);
274                     sel.recv(&timeout);
275                     loop {
276                         match sel.ready() {
277                             0 => {
278                                 if r1.try_recv().is_ok() {
279                                     hits.fetch_add(1, Ordering::SeqCst);
280                                     break;
281                                 }
282                             }
283                             1 => {
284                                 if r2.try_recv().is_ok() {
285                                     hits.fetch_add(1, Ordering::SeqCst);
286                                     break;
287                                 }
288                             }
289                             2 => {
290                                 if timeout.try_recv().is_ok() {
291                                     break 'outer;
292                                 }
293                             }
294                             _ => unreachable!(),
295                         }
296                     }
297                 }
298             });
299         }
300     })
301     .unwrap();
302 
303     assert_eq!(hits.load(Ordering::SeqCst), 8);
304 }
305 
306 #[test]
fairness()307 fn fairness() {
308     const COUNT: usize = 30;
309 
310     for &dur in &[0, 1] {
311         let mut hits = [0usize; 2];
312 
313         for _ in 0..COUNT {
314             let r1 = tick(ms(dur));
315             let r2 = tick(ms(dur));
316 
317             for _ in 0..COUNT {
318                 select! {
319                     recv(r1) -> _ => hits[0] += 1,
320                     recv(r2) -> _ => hits[1] += 1,
321                 }
322             }
323         }
324 
325         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
326     }
327 }
328 
329 #[test]
fairness_duplicates()330 fn fairness_duplicates() {
331     const COUNT: usize = 30;
332 
333     for &dur in &[0, 1] {
334         let mut hits = [0usize; 5];
335 
336         for _ in 0..COUNT {
337             let r = tick(ms(dur));
338 
339             for _ in 0..COUNT {
340                 select! {
341                     recv(r) -> _ => hits[0] += 1,
342                     recv(r) -> _ => hits[1] += 1,
343                     recv(r) -> _ => hits[2] += 1,
344                     recv(r) -> _ => hits[3] += 1,
345                     recv(r) -> _ => hits[4] += 1,
346                 }
347             }
348         }
349 
350         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
351     }
352 }
353