1 //! Tests for the list channel flavor.
2 
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8 
9 use crossbeam_channel::{select, unbounded, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14 
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16     Duration::from_millis(ms)
17 }
18 
19 #[test]
smoke()20 fn smoke() {
21     let (s, r) = unbounded();
22     s.try_send(7).unwrap();
23     assert_eq!(r.try_recv(), Ok(7));
24 
25     s.send(8).unwrap();
26     assert_eq!(r.recv(), Ok(8));
27 
28     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
29     assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
30 }
31 
32 #[test]
capacity()33 fn capacity() {
34     let (s, r) = unbounded::<()>();
35     assert_eq!(s.capacity(), None);
36     assert_eq!(r.capacity(), None);
37 }
38 
39 #[test]
len_empty_full()40 fn len_empty_full() {
41     let (s, r) = unbounded();
42 
43     assert_eq!(s.len(), 0);
44     assert!(s.is_empty());
45     assert!(!s.is_full());
46     assert_eq!(r.len(), 0);
47     assert!(r.is_empty());
48     assert!(!r.is_full());
49 
50     s.send(()).unwrap();
51 
52     assert_eq!(s.len(), 1);
53     assert!(!s.is_empty());
54     assert!(!s.is_full());
55     assert_eq!(r.len(), 1);
56     assert!(!r.is_empty());
57     assert!(!r.is_full());
58 
59     r.recv().unwrap();
60 
61     assert_eq!(s.len(), 0);
62     assert!(s.is_empty());
63     assert!(!s.is_full());
64     assert_eq!(r.len(), 0);
65     assert!(r.is_empty());
66     assert!(!r.is_full());
67 }
68 
69 #[test]
70 #[cfg_attr(miri, ignore)] // this test makes timing assumptions, but Miri is so slow it violates them
try_recv()71 fn try_recv() {
72     let (s, r) = unbounded();
73 
74     scope(|scope| {
75         scope.spawn(move |_| {
76             assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
77             thread::sleep(ms(1500));
78             assert_eq!(r.try_recv(), Ok(7));
79             thread::sleep(ms(500));
80             assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
81         });
82         scope.spawn(move |_| {
83             thread::sleep(ms(1000));
84             s.send(7).unwrap();
85         });
86     })
87     .unwrap();
88 }
89 
90 #[test]
recv()91 fn recv() {
92     let (s, r) = unbounded();
93 
94     scope(|scope| {
95         scope.spawn(move |_| {
96             assert_eq!(r.recv(), Ok(7));
97             thread::sleep(ms(1000));
98             assert_eq!(r.recv(), Ok(8));
99             thread::sleep(ms(1000));
100             assert_eq!(r.recv(), Ok(9));
101             assert_eq!(r.recv(), Err(RecvError));
102         });
103         scope.spawn(move |_| {
104             thread::sleep(ms(1500));
105             s.send(7).unwrap();
106             s.send(8).unwrap();
107             s.send(9).unwrap();
108         });
109     })
110     .unwrap();
111 }
112 
113 #[test]
recv_timeout()114 fn recv_timeout() {
115     let (s, r) = unbounded::<i32>();
116 
117     scope(|scope| {
118         scope.spawn(move |_| {
119             assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
120             assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
121             assert_eq!(
122                 r.recv_timeout(ms(1000)),
123                 Err(RecvTimeoutError::Disconnected)
124             );
125         });
126         scope.spawn(move |_| {
127             thread::sleep(ms(1500));
128             s.send(7).unwrap();
129         });
130     })
131     .unwrap();
132 }
133 
134 #[test]
try_send()135 fn try_send() {
136     #[cfg(miri)]
137     const COUNT: usize = 50;
138     #[cfg(not(miri))]
139     const COUNT: usize = 1000;
140 
141     let (s, r) = unbounded();
142     for i in 0..COUNT {
143         assert_eq!(s.try_send(i), Ok(()));
144     }
145 
146     drop(r);
147     assert_eq!(s.try_send(777), Err(TrySendError::Disconnected(777)));
148 }
149 
150 #[test]
send()151 fn send() {
152     #[cfg(miri)]
153     const COUNT: usize = 50;
154     #[cfg(not(miri))]
155     const COUNT: usize = 1000;
156 
157     let (s, r) = unbounded();
158     for i in 0..COUNT {
159         assert_eq!(s.send(i), Ok(()));
160     }
161 
162     drop(r);
163     assert_eq!(s.send(777), Err(SendError(777)));
164 }
165 
166 #[test]
send_timeout()167 fn send_timeout() {
168     #[cfg(miri)]
169     const COUNT: usize = 50;
170     #[cfg(not(miri))]
171     const COUNT: usize = 1000;
172 
173     let (s, r) = unbounded();
174     for i in 0..COUNT {
175         assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(()));
176     }
177 
178     drop(r);
179     assert_eq!(
180         s.send_timeout(777, ms(0)),
181         Err(SendTimeoutError::Disconnected(777))
182     );
183 }
184 
185 #[test]
send_after_disconnect()186 fn send_after_disconnect() {
187     let (s, r) = unbounded();
188 
189     s.send(1).unwrap();
190     s.send(2).unwrap();
191     s.send(3).unwrap();
192 
193     drop(r);
194 
195     assert_eq!(s.send(4), Err(SendError(4)));
196     assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
197     assert_eq!(
198         s.send_timeout(6, ms(0)),
199         Err(SendTimeoutError::Disconnected(6))
200     );
201 }
202 
203 #[test]
recv_after_disconnect()204 fn recv_after_disconnect() {
205     let (s, r) = unbounded();
206 
207     s.send(1).unwrap();
208     s.send(2).unwrap();
209     s.send(3).unwrap();
210 
211     drop(s);
212 
213     assert_eq!(r.recv(), Ok(1));
214     assert_eq!(r.recv(), Ok(2));
215     assert_eq!(r.recv(), Ok(3));
216     assert_eq!(r.recv(), Err(RecvError));
217 }
218 
219 #[test]
len()220 fn len() {
221     let (s, r) = unbounded();
222 
223     assert_eq!(s.len(), 0);
224     assert_eq!(r.len(), 0);
225 
226     for i in 0..50 {
227         s.send(i).unwrap();
228         assert_eq!(s.len(), i + 1);
229     }
230 
231     for i in 0..50 {
232         r.recv().unwrap();
233         assert_eq!(r.len(), 50 - i - 1);
234     }
235 
236     assert_eq!(s.len(), 0);
237     assert_eq!(r.len(), 0);
238 }
239 
240 #[test]
disconnect_wakes_receiver()241 fn disconnect_wakes_receiver() {
242     let (s, r) = unbounded::<()>();
243 
244     scope(|scope| {
245         scope.spawn(move |_| {
246             assert_eq!(r.recv(), Err(RecvError));
247         });
248         scope.spawn(move |_| {
249             thread::sleep(ms(1000));
250             drop(s);
251         });
252     })
253     .unwrap();
254 }
255 
256 #[test]
spsc()257 fn spsc() {
258     #[cfg(miri)]
259     const COUNT: usize = 100;
260     #[cfg(not(miri))]
261     const COUNT: usize = 100_000;
262 
263     let (s, r) = unbounded();
264 
265     scope(|scope| {
266         scope.spawn(move |_| {
267             for i in 0..COUNT {
268                 assert_eq!(r.recv(), Ok(i));
269             }
270             assert_eq!(r.recv(), Err(RecvError));
271         });
272         scope.spawn(move |_| {
273             for i in 0..COUNT {
274                 s.send(i).unwrap();
275             }
276         });
277     })
278     .unwrap();
279 }
280 
281 #[test]
mpmc()282 fn mpmc() {
283     #[cfg(miri)]
284     const COUNT: usize = 100;
285     #[cfg(not(miri))]
286     const COUNT: usize = 25_000;
287     const THREADS: usize = 4;
288 
289     let (s, r) = unbounded::<usize>();
290     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
291 
292     scope(|scope| {
293         for _ in 0..THREADS {
294             scope.spawn(|_| {
295                 for _ in 0..COUNT {
296                     let n = r.recv().unwrap();
297                     v[n].fetch_add(1, Ordering::SeqCst);
298                 }
299             });
300         }
301         for _ in 0..THREADS {
302             scope.spawn(|_| {
303                 for i in 0..COUNT {
304                     s.send(i).unwrap();
305                 }
306             });
307         }
308     })
309     .unwrap();
310 
311     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
312 
313     for c in v {
314         assert_eq!(c.load(Ordering::SeqCst), THREADS);
315     }
316 }
317 
318 #[test]
stress_oneshot()319 fn stress_oneshot() {
320     #[cfg(miri)]
321     const COUNT: usize = 100;
322     #[cfg(not(miri))]
323     const COUNT: usize = 10_000;
324 
325     for _ in 0..COUNT {
326         let (s, r) = unbounded();
327 
328         scope(|scope| {
329             scope.spawn(|_| r.recv().unwrap());
330             scope.spawn(|_| s.send(0).unwrap());
331         })
332         .unwrap();
333     }
334 }
335 
336 #[test]
stress_iter()337 fn stress_iter() {
338     #[cfg(miri)]
339     const COUNT: usize = 100;
340     #[cfg(not(miri))]
341     const COUNT: usize = 100_000;
342 
343     let (request_s, request_r) = unbounded();
344     let (response_s, response_r) = unbounded();
345 
346     scope(|scope| {
347         scope.spawn(move |_| {
348             let mut count = 0;
349             loop {
350                 for x in response_r.try_iter() {
351                     count += x;
352                     if count == COUNT {
353                         return;
354                     }
355                 }
356                 request_s.send(()).unwrap();
357             }
358         });
359 
360         for _ in request_r.iter() {
361             if response_s.send(1).is_err() {
362                 break;
363             }
364         }
365     })
366     .unwrap();
367 }
368 
369 #[test]
stress_timeout_two_threads()370 fn stress_timeout_two_threads() {
371     const COUNT: usize = 100;
372 
373     let (s, r) = unbounded();
374 
375     scope(|scope| {
376         scope.spawn(|_| {
377             for i in 0..COUNT {
378                 if i % 2 == 0 {
379                     thread::sleep(ms(50));
380                 }
381                 s.send(i).unwrap();
382             }
383         });
384 
385         scope.spawn(|_| {
386             for i in 0..COUNT {
387                 if i % 2 == 0 {
388                     thread::sleep(ms(50));
389                 }
390                 loop {
391                     if let Ok(x) = r.recv_timeout(ms(10)) {
392                         assert_eq!(x, i);
393                         break;
394                     }
395                 }
396             }
397         });
398     })
399     .unwrap();
400 }
401 
402 #[test]
drops()403 fn drops() {
404     #[cfg(miri)]
405     const RUNS: usize = 20;
406     #[cfg(not(miri))]
407     const RUNS: usize = 100;
408     #[cfg(miri)]
409     const STEPS: usize = 100;
410     #[cfg(not(miri))]
411     const STEPS: usize = 10_000;
412 
413     static DROPS: AtomicUsize = AtomicUsize::new(0);
414 
415     #[derive(Debug, PartialEq)]
416     struct DropCounter;
417 
418     impl Drop for DropCounter {
419         fn drop(&mut self) {
420             DROPS.fetch_add(1, Ordering::SeqCst);
421         }
422     }
423 
424     let mut rng = thread_rng();
425 
426     for _ in 0..RUNS {
427         let steps = rng.gen_range(0..STEPS);
428         let additional = rng.gen_range(0..STEPS / 10);
429 
430         DROPS.store(0, Ordering::SeqCst);
431         let (s, r) = unbounded::<DropCounter>();
432 
433         scope(|scope| {
434             scope.spawn(|_| {
435                 for _ in 0..steps {
436                     r.recv().unwrap();
437                 }
438             });
439 
440             scope.spawn(|_| {
441                 for _ in 0..steps {
442                     s.send(DropCounter).unwrap();
443                 }
444             });
445         })
446         .unwrap();
447 
448         for _ in 0..additional {
449             s.try_send(DropCounter).unwrap();
450         }
451 
452         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
453         drop(s);
454         drop(r);
455         assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
456     }
457 }
458 
459 #[test]
linearizable()460 fn linearizable() {
461     #[cfg(miri)]
462     const COUNT: usize = 100;
463     #[cfg(not(miri))]
464     const COUNT: usize = 25_000;
465     const THREADS: usize = 4;
466 
467     let (s, r) = unbounded();
468 
469     scope(|scope| {
470         for _ in 0..THREADS {
471             scope.spawn(|_| {
472                 for _ in 0..COUNT {
473                     s.send(0).unwrap();
474                     r.try_recv().unwrap();
475                 }
476             });
477         }
478     })
479     .unwrap();
480 }
481 
482 #[test]
fairness()483 fn fairness() {
484     #[cfg(miri)]
485     const COUNT: usize = 100;
486     #[cfg(not(miri))]
487     const COUNT: usize = 10_000;
488 
489     let (s1, r1) = unbounded::<()>();
490     let (s2, r2) = unbounded::<()>();
491 
492     for _ in 0..COUNT {
493         s1.send(()).unwrap();
494         s2.send(()).unwrap();
495     }
496 
497     let mut hits = [0usize; 2];
498     for _ in 0..COUNT {
499         select! {
500             recv(r1) -> _ => hits[0] += 1,
501             recv(r2) -> _ => hits[1] += 1,
502         }
503     }
504     assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
505 }
506 
507 #[test]
fairness_duplicates()508 fn fairness_duplicates() {
509     #[cfg(miri)]
510     const COUNT: usize = 100;
511     #[cfg(not(miri))]
512     const COUNT: usize = 10_000;
513 
514     let (s, r) = unbounded();
515 
516     for _ in 0..COUNT {
517         s.send(()).unwrap();
518     }
519 
520     let mut hits = [0usize; 5];
521     for _ in 0..COUNT {
522         select! {
523             recv(r) -> _ => hits[0] += 1,
524             recv(r) -> _ => hits[1] += 1,
525             recv(r) -> _ => hits[2] += 1,
526             recv(r) -> _ => hits[3] += 1,
527             recv(r) -> _ => hits[4] += 1,
528         }
529     }
530     assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
531 }
532 
533 #[test]
recv_in_send()534 fn recv_in_send() {
535     let (s, r) = unbounded();
536     s.send(()).unwrap();
537 
538     select! {
539         send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
540     }
541 }
542 
543 #[test]
channel_through_channel()544 fn channel_through_channel() {
545     #[cfg(miri)]
546     const COUNT: usize = 100;
547     #[cfg(not(miri))]
548     const COUNT: usize = 1000;
549 
550     type T = Box<dyn Any + Send>;
551 
552     let (s, r) = unbounded::<T>();
553 
554     scope(|scope| {
555         scope.spawn(move |_| {
556             let mut s = s;
557 
558             for _ in 0..COUNT {
559                 let (new_s, new_r) = unbounded();
560                 let new_r: T = Box::new(Some(new_r));
561 
562                 s.send(new_r).unwrap();
563                 s = new_s;
564             }
565         });
566 
567         scope.spawn(move |_| {
568             let mut r = r;
569 
570             for _ in 0..COUNT {
571                 r = r
572                     .recv()
573                     .unwrap()
574                     .downcast_mut::<Option<Receiver<T>>>()
575                     .unwrap()
576                     .take()
577                     .unwrap()
578             }
579         });
580     })
581     .unwrap();
582 }
583