1 //! Tests for channel readiness using the `Select` struct.
2
3 use std::any::Any;
4 use std::cell::Cell;
5 use std::thread;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_channel::{after, bounded, tick, unbounded};
9 use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError};
10 use crossbeam_utils::thread::scope;
11
ms(ms: u64) -> Duration12 fn ms(ms: u64) -> Duration {
13 Duration::from_millis(ms)
14 }
15
16 #[test]
smoke1()17 fn smoke1() {
18 let (s1, r1) = unbounded::<usize>();
19 let (s2, r2) = unbounded::<usize>();
20
21 s1.send(1).unwrap();
22
23 let mut sel = Select::new();
24 sel.recv(&r1);
25 sel.recv(&r2);
26 assert_eq!(sel.ready(), 0);
27 assert_eq!(r1.try_recv(), Ok(1));
28
29 s2.send(2).unwrap();
30
31 let mut sel = Select::new();
32 sel.recv(&r1);
33 sel.recv(&r2);
34 assert_eq!(sel.ready(), 1);
35 assert_eq!(r2.try_recv(), Ok(2));
36 }
37
38 #[test]
smoke2()39 fn smoke2() {
40 let (_s1, r1) = unbounded::<i32>();
41 let (_s2, r2) = unbounded::<i32>();
42 let (_s3, r3) = unbounded::<i32>();
43 let (_s4, r4) = unbounded::<i32>();
44 let (s5, r5) = unbounded::<i32>();
45
46 s5.send(5).unwrap();
47
48 let mut sel = Select::new();
49 sel.recv(&r1);
50 sel.recv(&r2);
51 sel.recv(&r3);
52 sel.recv(&r4);
53 sel.recv(&r5);
54 assert_eq!(sel.ready(), 4);
55 assert_eq!(r5.try_recv(), Ok(5));
56 }
57
58 #[test]
disconnected()59 fn disconnected() {
60 let (s1, r1) = unbounded::<i32>();
61 let (s2, r2) = unbounded::<i32>();
62
63 scope(|scope| {
64 scope.spawn(|_| {
65 drop(s1);
66 thread::sleep(ms(500));
67 s2.send(5).unwrap();
68 });
69
70 let mut sel = Select::new();
71 sel.recv(&r1);
72 sel.recv(&r2);
73 match sel.ready_timeout(ms(1000)) {
74 Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
75 _ => panic!(),
76 }
77
78 r2.recv().unwrap();
79 })
80 .unwrap();
81
82 let mut sel = Select::new();
83 sel.recv(&r1);
84 sel.recv(&r2);
85 match sel.ready_timeout(ms(1000)) {
86 Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
87 _ => panic!(),
88 }
89
90 scope(|scope| {
91 scope.spawn(|_| {
92 thread::sleep(ms(500));
93 drop(s2);
94 });
95
96 let mut sel = Select::new();
97 sel.recv(&r2);
98 match sel.ready_timeout(ms(1000)) {
99 Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)),
100 _ => panic!(),
101 }
102 })
103 .unwrap();
104 }
105
106 #[test]
default()107 fn default() {
108 let (s1, r1) = unbounded::<i32>();
109 let (s2, r2) = unbounded::<i32>();
110
111 let mut sel = Select::new();
112 sel.recv(&r1);
113 sel.recv(&r2);
114 assert!(sel.try_ready().is_err());
115
116 drop(s1);
117
118 let mut sel = Select::new();
119 sel.recv(&r1);
120 sel.recv(&r2);
121 match sel.try_ready() {
122 Ok(0) => assert!(r1.try_recv().is_err()),
123 _ => panic!(),
124 }
125
126 s2.send(2).unwrap();
127
128 let mut sel = Select::new();
129 sel.recv(&r2);
130 match sel.try_ready() {
131 Ok(0) => assert_eq!(r2.try_recv(), Ok(2)),
132 _ => panic!(),
133 }
134
135 let mut sel = Select::new();
136 sel.recv(&r2);
137 assert!(sel.try_ready().is_err());
138
139 let mut sel = Select::new();
140 assert!(sel.try_ready().is_err());
141 }
142
143 #[test]
timeout()144 fn timeout() {
145 let (_s1, r1) = unbounded::<i32>();
146 let (s2, r2) = unbounded::<i32>();
147
148 scope(|scope| {
149 scope.spawn(|_| {
150 thread::sleep(ms(1500));
151 s2.send(2).unwrap();
152 });
153
154 let mut sel = Select::new();
155 sel.recv(&r1);
156 sel.recv(&r2);
157 assert!(sel.ready_timeout(ms(1000)).is_err());
158
159 let mut sel = Select::new();
160 sel.recv(&r1);
161 sel.recv(&r2);
162 match sel.ready_timeout(ms(1000)) {
163 Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
164 _ => panic!(),
165 }
166 })
167 .unwrap();
168
169 scope(|scope| {
170 let (s, r) = unbounded::<i32>();
171
172 scope.spawn(move |_| {
173 thread::sleep(ms(500));
174 drop(s);
175 });
176
177 let mut sel = Select::new();
178 assert!(sel.ready_timeout(ms(1000)).is_err());
179
180 let mut sel = Select::new();
181 sel.recv(&r);
182 match sel.try_ready() {
183 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
184 _ => panic!(),
185 }
186 })
187 .unwrap();
188 }
189
190 #[test]
default_when_disconnected()191 fn default_when_disconnected() {
192 let (_, r) = unbounded::<i32>();
193
194 let mut sel = Select::new();
195 sel.recv(&r);
196 match sel.try_ready() {
197 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
198 _ => panic!(),
199 }
200
201 let (_, r) = unbounded::<i32>();
202
203 let mut sel = Select::new();
204 sel.recv(&r);
205 match sel.ready_timeout(ms(1000)) {
206 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
207 _ => panic!(),
208 }
209
210 let (s, _) = bounded::<i32>(0);
211
212 let mut sel = Select::new();
213 sel.send(&s);
214 match sel.try_ready() {
215 Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
216 _ => panic!(),
217 }
218
219 let (s, _) = bounded::<i32>(0);
220
221 let mut sel = Select::new();
222 sel.send(&s);
223 match sel.ready_timeout(ms(1000)) {
224 Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
225 _ => panic!(),
226 }
227 }
228
229 #[test]
230 #[cfg_attr(miri, ignore)] // this test makes timing assumptions, but Miri is so slow it violates them
default_only()231 fn default_only() {
232 let start = Instant::now();
233
234 let mut sel = Select::new();
235 assert!(sel.try_ready().is_err());
236 let now = Instant::now();
237 assert!(now - start <= ms(50));
238
239 let start = Instant::now();
240 let mut sel = Select::new();
241 assert!(sel.ready_timeout(ms(500)).is_err());
242 let now = Instant::now();
243 assert!(now - start >= ms(450));
244 assert!(now - start <= ms(550));
245 }
246
247 #[test]
unblocks()248 fn unblocks() {
249 let (s1, r1) = bounded::<i32>(0);
250 let (s2, r2) = bounded::<i32>(0);
251
252 scope(|scope| {
253 scope.spawn(|_| {
254 thread::sleep(ms(500));
255 s2.send(2).unwrap();
256 });
257
258 let mut sel = Select::new();
259 sel.recv(&r1);
260 sel.recv(&r2);
261 match sel.ready_timeout(ms(1000)) {
262 Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
263 _ => panic!(),
264 }
265 })
266 .unwrap();
267
268 scope(|scope| {
269 scope.spawn(|_| {
270 thread::sleep(ms(500));
271 assert_eq!(r1.recv().unwrap(), 1);
272 });
273
274 let mut sel = Select::new();
275 let oper1 = sel.send(&s1);
276 let oper2 = sel.send(&s2);
277 let oper = sel.select_timeout(ms(1000));
278 match oper {
279 Err(_) => panic!(),
280 Ok(oper) => match oper.index() {
281 i if i == oper1 => oper.send(&s1, 1).unwrap(),
282 i if i == oper2 => panic!(),
283 _ => unreachable!(),
284 },
285 }
286 })
287 .unwrap();
288 }
289
290 #[test]
both_ready()291 fn both_ready() {
292 let (s1, r1) = bounded(0);
293 let (s2, r2) = bounded(0);
294
295 scope(|scope| {
296 scope.spawn(|_| {
297 thread::sleep(ms(500));
298 s1.send(1).unwrap();
299 assert_eq!(r2.recv().unwrap(), 2);
300 });
301
302 for _ in 0..2 {
303 let mut sel = Select::new();
304 sel.recv(&r1);
305 sel.send(&s2);
306 match sel.ready() {
307 0 => assert_eq!(r1.try_recv(), Ok(1)),
308 1 => s2.try_send(2).unwrap(),
309 _ => panic!(),
310 }
311 }
312 })
313 .unwrap();
314 }
315
316 #[test]
cloning1()317 fn cloning1() {
318 scope(|scope| {
319 let (s1, r1) = unbounded::<i32>();
320 let (_s2, r2) = unbounded::<i32>();
321 let (s3, r3) = unbounded::<()>();
322
323 scope.spawn(move |_| {
324 r3.recv().unwrap();
325 drop(s1.clone());
326 assert!(r3.try_recv().is_err());
327 s1.send(1).unwrap();
328 r3.recv().unwrap();
329 });
330
331 s3.send(()).unwrap();
332
333 let mut sel = Select::new();
334 sel.recv(&r1);
335 sel.recv(&r2);
336 match sel.ready() {
337 0 => drop(r1.try_recv()),
338 1 => drop(r2.try_recv()),
339 _ => panic!(),
340 }
341
342 s3.send(()).unwrap();
343 })
344 .unwrap();
345 }
346
347 #[test]
cloning2()348 fn cloning2() {
349 let (s1, r1) = unbounded::<()>();
350 let (s2, r2) = unbounded::<()>();
351 let (_s3, _r3) = unbounded::<()>();
352
353 scope(|scope| {
354 scope.spawn(move |_| {
355 let mut sel = Select::new();
356 sel.recv(&r1);
357 sel.recv(&r2);
358 match sel.ready() {
359 0 => panic!(),
360 1 => drop(r2.try_recv()),
361 _ => panic!(),
362 }
363 });
364
365 thread::sleep(ms(500));
366 drop(s1.clone());
367 s2.send(()).unwrap();
368 })
369 .unwrap();
370 }
371
372 #[test]
preflight1()373 fn preflight1() {
374 let (s, r) = unbounded();
375 s.send(()).unwrap();
376
377 let mut sel = Select::new();
378 sel.recv(&r);
379 match sel.ready() {
380 0 => drop(r.try_recv()),
381 _ => panic!(),
382 }
383 }
384
385 #[test]
preflight2()386 fn preflight2() {
387 let (s, r) = unbounded();
388 drop(s.clone());
389 s.send(()).unwrap();
390 drop(s);
391
392 let mut sel = Select::new();
393 sel.recv(&r);
394 match sel.ready() {
395 0 => assert_eq!(r.try_recv(), Ok(())),
396 _ => panic!(),
397 }
398
399 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
400 }
401
402 #[test]
preflight3()403 fn preflight3() {
404 let (s, r) = unbounded();
405 drop(s.clone());
406 s.send(()).unwrap();
407 drop(s);
408 r.recv().unwrap();
409
410 let mut sel = Select::new();
411 sel.recv(&r);
412 match sel.ready() {
413 0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
414 _ => panic!(),
415 }
416 }
417
418 #[test]
duplicate_operations()419 fn duplicate_operations() {
420 let (s, r) = unbounded::<i32>();
421 let hit = vec![Cell::new(false); 4];
422
423 while hit.iter().map(|h| h.get()).any(|hit| !hit) {
424 let mut sel = Select::new();
425 sel.recv(&r);
426 sel.recv(&r);
427 sel.send(&s);
428 sel.send(&s);
429 match sel.ready() {
430 0 => {
431 assert!(r.try_recv().is_ok());
432 hit[0].set(true);
433 }
434 1 => {
435 assert!(r.try_recv().is_ok());
436 hit[1].set(true);
437 }
438 2 => {
439 assert!(s.try_send(0).is_ok());
440 hit[2].set(true);
441 }
442 3 => {
443 assert!(s.try_send(0).is_ok());
444 hit[3].set(true);
445 }
446 _ => panic!(),
447 }
448 }
449 }
450
451 #[test]
nesting()452 fn nesting() {
453 let (s, r) = unbounded::<i32>();
454
455 let mut sel = Select::new();
456 sel.send(&s);
457 match sel.ready() {
458 0 => {
459 assert!(s.try_send(0).is_ok());
460
461 let mut sel = Select::new();
462 sel.recv(&r);
463 match sel.ready() {
464 0 => {
465 assert_eq!(r.try_recv(), Ok(0));
466
467 let mut sel = Select::new();
468 sel.send(&s);
469 match sel.ready() {
470 0 => {
471 assert!(s.try_send(1).is_ok());
472
473 let mut sel = Select::new();
474 sel.recv(&r);
475 match sel.ready() {
476 0 => {
477 assert_eq!(r.try_recv(), Ok(1));
478 }
479 _ => panic!(),
480 }
481 }
482 _ => panic!(),
483 }
484 }
485 _ => panic!(),
486 }
487 }
488 _ => panic!(),
489 }
490 }
491
492 #[test]
stress_recv()493 fn stress_recv() {
494 #[cfg(miri)]
495 const COUNT: usize = 100;
496 #[cfg(not(miri))]
497 const COUNT: usize = 10_000;
498
499 let (s1, r1) = unbounded();
500 let (s2, r2) = bounded(5);
501 let (s3, r3) = bounded(0);
502
503 scope(|scope| {
504 scope.spawn(|_| {
505 for i in 0..COUNT {
506 s1.send(i).unwrap();
507 r3.recv().unwrap();
508
509 s2.send(i).unwrap();
510 r3.recv().unwrap();
511 }
512 });
513
514 for i in 0..COUNT {
515 for _ in 0..2 {
516 let mut sel = Select::new();
517 sel.recv(&r1);
518 sel.recv(&r2);
519 match sel.ready() {
520 0 => assert_eq!(r1.try_recv(), Ok(i)),
521 1 => assert_eq!(r2.try_recv(), Ok(i)),
522 _ => panic!(),
523 }
524
525 s3.send(()).unwrap();
526 }
527 }
528 })
529 .unwrap();
530 }
531
532 #[test]
stress_send()533 fn stress_send() {
534 #[cfg(miri)]
535 const COUNT: usize = 100;
536 #[cfg(not(miri))]
537 const COUNT: usize = 10_000;
538
539 let (s1, r1) = bounded(0);
540 let (s2, r2) = bounded(0);
541 let (s3, r3) = bounded(100);
542
543 scope(|scope| {
544 scope.spawn(|_| {
545 for i in 0..COUNT {
546 assert_eq!(r1.recv().unwrap(), i);
547 assert_eq!(r2.recv().unwrap(), i);
548 r3.recv().unwrap();
549 }
550 });
551
552 for i in 0..COUNT {
553 for _ in 0..2 {
554 let mut sel = Select::new();
555 sel.send(&s1);
556 sel.send(&s2);
557 match sel.ready() {
558 0 => assert!(s1.try_send(i).is_ok()),
559 1 => assert!(s2.try_send(i).is_ok()),
560 _ => panic!(),
561 }
562 }
563 s3.send(()).unwrap();
564 }
565 })
566 .unwrap();
567 }
568
569 #[test]
stress_mixed()570 fn stress_mixed() {
571 #[cfg(miri)]
572 const COUNT: usize = 100;
573 #[cfg(not(miri))]
574 const COUNT: usize = 10_000;
575
576 let (s1, r1) = bounded(0);
577 let (s2, r2) = bounded(0);
578 let (s3, r3) = bounded(100);
579
580 scope(|scope| {
581 scope.spawn(|_| {
582 for i in 0..COUNT {
583 s1.send(i).unwrap();
584 assert_eq!(r2.recv().unwrap(), i);
585 r3.recv().unwrap();
586 }
587 });
588
589 for i in 0..COUNT {
590 for _ in 0..2 {
591 let mut sel = Select::new();
592 sel.recv(&r1);
593 sel.send(&s2);
594 match sel.ready() {
595 0 => assert_eq!(r1.try_recv(), Ok(i)),
596 1 => assert!(s2.try_send(i).is_ok()),
597 _ => panic!(),
598 }
599 }
600 s3.send(()).unwrap();
601 }
602 })
603 .unwrap();
604 }
605
606 #[test]
stress_timeout_two_threads()607 fn stress_timeout_two_threads() {
608 const COUNT: usize = 20;
609
610 let (s, r) = bounded(2);
611
612 scope(|scope| {
613 scope.spawn(|_| {
614 for i in 0..COUNT {
615 if i % 2 == 0 {
616 thread::sleep(ms(500));
617 }
618
619 loop {
620 let mut sel = Select::new();
621 sel.send(&s);
622 match sel.ready_timeout(ms(100)) {
623 Err(_) => {}
624 Ok(0) => {
625 assert!(s.try_send(i).is_ok());
626 break;
627 }
628 Ok(_) => panic!(),
629 }
630 }
631 }
632 });
633
634 scope.spawn(|_| {
635 for i in 0..COUNT {
636 if i % 2 == 0 {
637 thread::sleep(ms(500));
638 }
639
640 loop {
641 let mut sel = Select::new();
642 sel.recv(&r);
643 match sel.ready_timeout(ms(100)) {
644 Err(_) => {}
645 Ok(0) => {
646 assert_eq!(r.try_recv(), Ok(i));
647 break;
648 }
649 Ok(_) => panic!(),
650 }
651 }
652 }
653 });
654 })
655 .unwrap();
656 }
657
658 #[test]
send_recv_same_channel()659 fn send_recv_same_channel() {
660 let (s, r) = bounded::<i32>(0);
661 let mut sel = Select::new();
662 sel.send(&s);
663 sel.recv(&r);
664 assert!(sel.ready_timeout(ms(100)).is_err());
665
666 let (s, r) = unbounded::<i32>();
667 let mut sel = Select::new();
668 sel.send(&s);
669 sel.recv(&r);
670 match sel.ready_timeout(ms(100)) {
671 Err(_) => panic!(),
672 Ok(0) => assert!(s.try_send(0).is_ok()),
673 Ok(_) => panic!(),
674 }
675 }
676
677 #[test]
channel_through_channel()678 fn channel_through_channel() {
679 #[cfg(miri)]
680 const COUNT: usize = 100;
681 #[cfg(not(miri))]
682 const COUNT: usize = 1000;
683
684 type T = Box<dyn Any + Send>;
685
686 for cap in 1..4 {
687 let (s, r) = bounded::<T>(cap);
688
689 scope(|scope| {
690 scope.spawn(move |_| {
691 let mut s = s;
692
693 for _ in 0..COUNT {
694 let (new_s, new_r) = bounded(cap);
695 let new_r: T = Box::new(Some(new_r));
696
697 {
698 let mut sel = Select::new();
699 sel.send(&s);
700 match sel.ready() {
701 0 => assert!(s.try_send(new_r).is_ok()),
702 _ => panic!(),
703 }
704 }
705
706 s = new_s;
707 }
708 });
709
710 scope.spawn(move |_| {
711 let mut r = r;
712
713 for _ in 0..COUNT {
714 let new = {
715 let mut sel = Select::new();
716 sel.recv(&r);
717 match sel.ready() {
718 0 => r
719 .try_recv()
720 .unwrap()
721 .downcast_mut::<Option<Receiver<T>>>()
722 .unwrap()
723 .take()
724 .unwrap(),
725 _ => panic!(),
726 }
727 };
728 r = new;
729 }
730 });
731 })
732 .unwrap();
733 }
734 }
735
736 #[test]
fairness1()737 fn fairness1() {
738 #[cfg(miri)]
739 const COUNT: usize = 100;
740 #[cfg(not(miri))]
741 const COUNT: usize = 10_000;
742
743 let (s1, r1) = bounded::<()>(COUNT);
744 let (s2, r2) = unbounded::<()>();
745
746 for _ in 0..COUNT {
747 s1.send(()).unwrap();
748 s2.send(()).unwrap();
749 }
750
751 let hits = vec![Cell::new(0usize); 4];
752 for _ in 0..COUNT {
753 let after = after(ms(0));
754 let tick = tick(ms(0));
755
756 let mut sel = Select::new();
757 sel.recv(&r1);
758 sel.recv(&r2);
759 sel.recv(&after);
760 sel.recv(&tick);
761 match sel.ready() {
762 0 => {
763 r1.try_recv().unwrap();
764 hits[0].set(hits[0].get() + 1);
765 }
766 1 => {
767 r2.try_recv().unwrap();
768 hits[1].set(hits[1].get() + 1);
769 }
770 2 => {
771 after.try_recv().unwrap();
772 hits[2].set(hits[2].get() + 1);
773 }
774 3 => {
775 tick.try_recv().unwrap();
776 hits[3].set(hits[3].get() + 1);
777 }
778 _ => panic!(),
779 }
780 }
781 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
782 }
783
784 #[test]
fairness2()785 fn fairness2() {
786 #[cfg(miri)]
787 const COUNT: usize = 100;
788 #[cfg(not(miri))]
789 const COUNT: usize = 100_000;
790
791 let (s1, r1) = unbounded::<()>();
792 let (s2, r2) = bounded::<()>(1);
793 let (s3, r3) = bounded::<()>(0);
794
795 scope(|scope| {
796 scope.spawn(|_| {
797 for _ in 0..COUNT {
798 let mut sel = Select::new();
799 let mut oper1 = None;
800 let mut oper2 = None;
801 if s1.is_empty() {
802 oper1 = Some(sel.send(&s1));
803 }
804 if s2.is_empty() {
805 oper2 = Some(sel.send(&s2));
806 }
807 let oper3 = sel.send(&s3);
808 let oper = sel.select();
809 match oper.index() {
810 i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
811 i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
812 i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
813 _ => unreachable!(),
814 }
815 }
816 });
817
818 let hits = vec![Cell::new(0usize); 3];
819 for _ in 0..COUNT {
820 let mut sel = Select::new();
821 sel.recv(&r1);
822 sel.recv(&r2);
823 sel.recv(&r3);
824 loop {
825 match sel.ready() {
826 0 => {
827 if r1.try_recv().is_ok() {
828 hits[0].set(hits[0].get() + 1);
829 break;
830 }
831 }
832 1 => {
833 if r2.try_recv().is_ok() {
834 hits[1].set(hits[1].get() + 1);
835 break;
836 }
837 }
838 2 => {
839 if r3.try_recv().is_ok() {
840 hits[2].set(hits[2].get() + 1);
841 break;
842 }
843 }
844 _ => unreachable!(),
845 }
846 }
847 }
848 assert!(hits.iter().all(|x| x.get() > 0));
849 })
850 .unwrap();
851 }
852