1 //! Tests for the `select!` macro.
2
3 #![forbid(unsafe_code)] // select! is safe.
4 #![allow(clippy::match_single_binding)]
5
6 use std::any::Any;
7 use std::cell::Cell;
8 use std::ops::Deref;
9 use std::thread;
10 use std::time::{Duration, Instant};
11
12 use crossbeam_channel::{after, bounded, never, select, select_biased, tick, unbounded};
13 use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError};
14 use crossbeam_utils::thread::scope;
15
ms(ms: u64) -> Duration16 fn ms(ms: u64) -> Duration {
17 Duration::from_millis(ms)
18 }
19
20 #[test]
smoke1()21 fn smoke1() {
22 let (s1, r1) = unbounded::<usize>();
23 let (s2, r2) = unbounded::<usize>();
24
25 s1.send(1).unwrap();
26
27 select! {
28 recv(r1) -> v => assert_eq!(v, Ok(1)),
29 recv(r2) -> _ => panic!(),
30 }
31
32 s2.send(2).unwrap();
33
34 select! {
35 recv(r1) -> _ => panic!(),
36 recv(r2) -> v => assert_eq!(v, Ok(2)),
37 }
38 }
39
40 #[test]
smoke2()41 fn smoke2() {
42 let (_s1, r1) = unbounded::<i32>();
43 let (_s2, r2) = unbounded::<i32>();
44 let (_s3, r3) = unbounded::<i32>();
45 let (_s4, r4) = unbounded::<i32>();
46 let (s5, r5) = unbounded::<i32>();
47
48 s5.send(5).unwrap();
49
50 select! {
51 recv(r1) -> _ => panic!(),
52 recv(r2) -> _ => panic!(),
53 recv(r3) -> _ => panic!(),
54 recv(r4) -> _ => panic!(),
55 recv(r5) -> v => assert_eq!(v, Ok(5)),
56 }
57 }
58
59 #[test]
disconnected()60 fn disconnected() {
61 let (s1, r1) = unbounded::<i32>();
62 let (s2, r2) = unbounded::<i32>();
63
64 scope(|scope| {
65 scope.spawn(|_| {
66 drop(s1);
67 thread::sleep(ms(500));
68 s2.send(5).unwrap();
69 });
70
71 select! {
72 recv(r1) -> v => assert!(v.is_err()),
73 recv(r2) -> _ => panic!(),
74 default(ms(1000)) => panic!(),
75 }
76
77 r2.recv().unwrap();
78 })
79 .unwrap();
80
81 select! {
82 recv(r1) -> v => assert!(v.is_err()),
83 recv(r2) -> _ => panic!(),
84 default(ms(1000)) => panic!(),
85 }
86
87 scope(|scope| {
88 scope.spawn(|_| {
89 thread::sleep(ms(500));
90 drop(s2);
91 });
92
93 select! {
94 recv(r2) -> v => assert!(v.is_err()),
95 default(ms(1000)) => panic!(),
96 }
97 })
98 .unwrap();
99 }
100
101 #[test]
default()102 fn default() {
103 let (s1, r1) = unbounded::<i32>();
104 let (s2, r2) = unbounded::<i32>();
105
106 select! {
107 recv(r1) -> _ => panic!(),
108 recv(r2) -> _ => panic!(),
109 default => {}
110 }
111
112 drop(s1);
113
114 select! {
115 recv(r1) -> v => assert!(v.is_err()),
116 recv(r2) -> _ => panic!(),
117 default => panic!(),
118 }
119
120 s2.send(2).unwrap();
121
122 select! {
123 recv(r2) -> v => assert_eq!(v, Ok(2)),
124 default => panic!(),
125 }
126
127 select! {
128 recv(r2) -> _ => panic!(),
129 default => {},
130 }
131
132 select! {
133 default => {},
134 }
135 }
136
137 #[test]
timeout()138 fn timeout() {
139 let (_s1, r1) = unbounded::<i32>();
140 let (s2, r2) = unbounded::<i32>();
141
142 scope(|scope| {
143 scope.spawn(|_| {
144 thread::sleep(ms(1500));
145 s2.send(2).unwrap();
146 });
147
148 select! {
149 recv(r1) -> _ => panic!(),
150 recv(r2) -> _ => panic!(),
151 default(ms(1000)) => {},
152 }
153
154 select! {
155 recv(r1) -> _ => panic!(),
156 recv(r2) -> v => assert_eq!(v, Ok(2)),
157 default(ms(1000)) => panic!(),
158 }
159 })
160 .unwrap();
161
162 scope(|scope| {
163 let (s, r) = unbounded::<i32>();
164
165 scope.spawn(move |_| {
166 thread::sleep(ms(500));
167 drop(s);
168 });
169
170 select! {
171 default(ms(1000)) => {
172 select! {
173 recv(r) -> v => assert!(v.is_err()),
174 default => panic!(),
175 }
176 }
177 }
178 })
179 .unwrap();
180 }
181
182 #[test]
default_when_disconnected()183 fn default_when_disconnected() {
184 let (_, r) = unbounded::<i32>();
185
186 select! {
187 recv(r) -> res => assert!(res.is_err()),
188 default => panic!(),
189 }
190
191 let (_, r) = unbounded::<i32>();
192
193 select! {
194 recv(r) -> res => assert!(res.is_err()),
195 default(ms(1000)) => panic!(),
196 }
197
198 let (s, _) = bounded::<i32>(0);
199
200 select! {
201 send(s, 0) -> res => assert!(res.is_err()),
202 default => panic!(),
203 }
204
205 let (s, _) = bounded::<i32>(0);
206
207 select! {
208 send(s, 0) -> res => assert!(res.is_err()),
209 default(ms(1000)) => panic!(),
210 }
211 }
212
213 #[test]
default_only()214 fn default_only() {
215 let start = Instant::now();
216 select! {
217 default => {}
218 }
219 let now = Instant::now();
220 assert!(now - start <= ms(50));
221
222 let start = Instant::now();
223 select! {
224 default(ms(500)) => {}
225 }
226 let now = Instant::now();
227 assert!(now - start >= ms(450));
228 assert!(now - start <= ms(550));
229 }
230
231 #[test]
unblocks()232 fn unblocks() {
233 let (s1, r1) = bounded::<i32>(0);
234 let (s2, r2) = bounded::<i32>(0);
235
236 scope(|scope| {
237 scope.spawn(|_| {
238 thread::sleep(ms(500));
239 s2.send(2).unwrap();
240 });
241
242 select! {
243 recv(r1) -> _ => panic!(),
244 recv(r2) -> v => assert_eq!(v, Ok(2)),
245 default(ms(1000)) => panic!(),
246 }
247 })
248 .unwrap();
249
250 scope(|scope| {
251 scope.spawn(|_| {
252 thread::sleep(ms(500));
253 assert_eq!(r1.recv().unwrap(), 1);
254 });
255
256 select! {
257 send(s1, 1) -> _ => {},
258 send(s2, 2) -> _ => panic!(),
259 default(ms(1000)) => panic!(),
260 }
261 })
262 .unwrap();
263 }
264
265 #[test]
both_ready()266 fn both_ready() {
267 let (s1, r1) = bounded(0);
268 let (s2, r2) = bounded(0);
269
270 scope(|scope| {
271 scope.spawn(|_| {
272 thread::sleep(ms(500));
273 s1.send(1).unwrap();
274 assert_eq!(r2.recv().unwrap(), 2);
275 });
276
277 for _ in 0..2 {
278 select! {
279 recv(r1) -> v => assert_eq!(v, Ok(1)),
280 send(s2, 2) -> _ => {},
281 }
282 }
283 })
284 .unwrap();
285 }
286
287 #[test]
loop_try()288 fn loop_try() {
289 const RUNS: usize = 20;
290
291 for _ in 0..RUNS {
292 let (s1, r1) = bounded::<i32>(0);
293 let (s2, r2) = bounded::<i32>(0);
294 let (s_end, r_end) = bounded::<()>(0);
295
296 scope(|scope| {
297 scope.spawn(|_| loop {
298 select! {
299 send(s1, 1) -> _ => break,
300 default => {}
301 }
302
303 select! {
304 recv(r_end) -> _ => break,
305 default => {}
306 }
307 });
308
309 scope.spawn(|_| loop {
310 if let Ok(x) = r2.try_recv() {
311 assert_eq!(x, 2);
312 break;
313 }
314
315 select! {
316 recv(r_end) -> _ => break,
317 default => {}
318 }
319 });
320
321 scope.spawn(|_| {
322 thread::sleep(ms(500));
323
324 select! {
325 recv(r1) -> v => assert_eq!(v, Ok(1)),
326 send(s2, 2) -> _ => {},
327 default(ms(500)) => panic!(),
328 }
329
330 drop(s_end);
331 });
332 })
333 .unwrap();
334 }
335 }
336
337 #[test]
cloning1()338 fn cloning1() {
339 scope(|scope| {
340 let (s1, r1) = unbounded::<i32>();
341 let (_s2, r2) = unbounded::<i32>();
342 let (s3, r3) = unbounded::<()>();
343
344 scope.spawn(move |_| {
345 r3.recv().unwrap();
346 drop(s1.clone());
347 assert_eq!(r3.try_recv(), Err(TryRecvError::Empty));
348 s1.send(1).unwrap();
349 r3.recv().unwrap();
350 });
351
352 s3.send(()).unwrap();
353
354 select! {
355 recv(r1) -> _ => {},
356 recv(r2) -> _ => {},
357 }
358
359 s3.send(()).unwrap();
360 })
361 .unwrap();
362 }
363
364 #[test]
cloning2()365 fn cloning2() {
366 let (s1, r1) = unbounded::<()>();
367 let (s2, r2) = unbounded::<()>();
368 let (_s3, _r3) = unbounded::<()>();
369
370 scope(|scope| {
371 scope.spawn(move |_| {
372 select! {
373 recv(r1) -> _ => panic!(),
374 recv(r2) -> _ => {},
375 }
376 });
377
378 thread::sleep(ms(500));
379 drop(s1.clone());
380 s2.send(()).unwrap();
381 })
382 .unwrap();
383 }
384
385 #[test]
preflight1()386 fn preflight1() {
387 let (s, r) = unbounded();
388 s.send(()).unwrap();
389
390 select! {
391 recv(r) -> _ => {}
392 }
393 }
394
395 #[test]
preflight2()396 fn preflight2() {
397 let (s, r) = unbounded();
398 drop(s.clone());
399 s.send(()).unwrap();
400 drop(s);
401
402 select! {
403 recv(r) -> v => assert!(v.is_ok()),
404 }
405 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
406 }
407
408 #[test]
preflight3()409 fn preflight3() {
410 let (s, r) = unbounded();
411 drop(s.clone());
412 s.send(()).unwrap();
413 drop(s);
414 r.recv().unwrap();
415
416 select! {
417 recv(r) -> v => assert!(v.is_err())
418 }
419 }
420
421 #[test]
duplicate_operations()422 fn duplicate_operations() {
423 let (s, r) = unbounded::<i32>();
424 let mut hit = [false; 4];
425
426 while hit.iter().any(|hit| !hit) {
427 select! {
428 recv(r) -> _ => hit[0] = true,
429 recv(r) -> _ => hit[1] = true,
430 send(s, 0) -> _ => hit[2] = true,
431 send(s, 0) -> _ => hit[3] = true,
432 }
433 }
434 }
435
436 #[test]
nesting()437 fn nesting() {
438 let (s, r) = unbounded::<i32>();
439
440 select! {
441 send(s, 0) -> _ => {
442 select! {
443 recv(r) -> v => {
444 assert_eq!(v, Ok(0));
445 select! {
446 send(s, 1) -> _ => {
447 select! {
448 recv(r) -> v => {
449 assert_eq!(v, Ok(1));
450 }
451 }
452 }
453 }
454 }
455 }
456 }
457 }
458 }
459
460 #[test]
461 #[should_panic(expected = "send panicked")]
panic_sender()462 fn panic_sender() {
463 fn get() -> Sender<i32> {
464 panic!("send panicked")
465 }
466
467 #[allow(unreachable_code)]
468 {
469 select! {
470 send(get(), panic!()) -> _ => {}
471 }
472 }
473 }
474
475 #[test]
476 #[should_panic(expected = "recv panicked")]
panic_receiver()477 fn panic_receiver() {
478 fn get() -> Receiver<i32> {
479 panic!("recv panicked")
480 }
481
482 select! {
483 recv(get()) -> _ => {}
484 }
485 }
486
487 #[test]
stress_recv()488 fn stress_recv() {
489 #[cfg(miri)]
490 const COUNT: usize = 50;
491 #[cfg(not(miri))]
492 const COUNT: usize = 10_000;
493
494 let (s1, r1) = unbounded();
495 let (s2, r2) = bounded(5);
496 let (s3, r3) = bounded(100);
497
498 scope(|scope| {
499 scope.spawn(|_| {
500 for i in 0..COUNT {
501 s1.send(i).unwrap();
502 r3.recv().unwrap();
503
504 s2.send(i).unwrap();
505 r3.recv().unwrap();
506 }
507 });
508
509 for i in 0..COUNT {
510 for _ in 0..2 {
511 select! {
512 recv(r1) -> v => assert_eq!(v, Ok(i)),
513 recv(r2) -> v => assert_eq!(v, Ok(i)),
514 }
515
516 s3.send(()).unwrap();
517 }
518 }
519 })
520 .unwrap();
521 }
522
523 #[test]
stress_send()524 fn stress_send() {
525 #[cfg(miri)]
526 const COUNT: usize = 100;
527 #[cfg(not(miri))]
528 const COUNT: usize = 10_000;
529
530 let (s1, r1) = bounded(0);
531 let (s2, r2) = bounded(0);
532 let (s3, r3) = bounded(100);
533
534 scope(|scope| {
535 scope.spawn(|_| {
536 for i in 0..COUNT {
537 assert_eq!(r1.recv().unwrap(), i);
538 assert_eq!(r2.recv().unwrap(), i);
539 r3.recv().unwrap();
540 }
541 });
542
543 for i in 0..COUNT {
544 for _ in 0..2 {
545 select! {
546 send(s1, i) -> _ => {},
547 send(s2, i) -> _ => {},
548 }
549 }
550 s3.send(()).unwrap();
551 }
552 })
553 .unwrap();
554 }
555
556 #[test]
stress_mixed()557 fn stress_mixed() {
558 #[cfg(miri)]
559 const COUNT: usize = 100;
560 #[cfg(not(miri))]
561 const COUNT: usize = 10_000;
562
563 let (s1, r1) = bounded(0);
564 let (s2, r2) = bounded(0);
565 let (s3, r3) = bounded(100);
566
567 scope(|scope| {
568 scope.spawn(|_| {
569 for i in 0..COUNT {
570 s1.send(i).unwrap();
571 assert_eq!(r2.recv().unwrap(), i);
572 r3.recv().unwrap();
573 }
574 });
575
576 for i in 0..COUNT {
577 for _ in 0..2 {
578 select! {
579 recv(r1) -> v => assert_eq!(v, Ok(i)),
580 send(s2, i) -> _ => {},
581 }
582 }
583 s3.send(()).unwrap();
584 }
585 })
586 .unwrap();
587 }
588
589 #[test]
stress_timeout_two_threads()590 fn stress_timeout_two_threads() {
591 const COUNT: usize = 20;
592
593 let (s, r) = bounded(2);
594
595 scope(|scope| {
596 scope.spawn(|_| {
597 for i in 0..COUNT {
598 if i % 2 == 0 {
599 thread::sleep(ms(500));
600 }
601
602 loop {
603 select! {
604 send(s, i) -> _ => break,
605 default(ms(100)) => {}
606 }
607 }
608 }
609 });
610
611 scope.spawn(|_| {
612 for i in 0..COUNT {
613 if i % 2 == 0 {
614 thread::sleep(ms(500));
615 }
616
617 loop {
618 select! {
619 recv(r) -> v => {
620 assert_eq!(v, Ok(i));
621 break;
622 }
623 default(ms(100)) => {}
624 }
625 }
626 }
627 });
628 })
629 .unwrap();
630 }
631
632 #[test]
send_recv_same_channel()633 fn send_recv_same_channel() {
634 let (s, r) = bounded::<i32>(0);
635 select! {
636 send(s, 0) -> _ => panic!(),
637 recv(r) -> _ => panic!(),
638 default(ms(500)) => {}
639 }
640
641 let (s, r) = unbounded::<i32>();
642 select! {
643 send(s, 0) -> _ => {},
644 recv(r) -> _ => panic!(),
645 default(ms(500)) => panic!(),
646 }
647 }
648
649 #[test]
matching()650 fn matching() {
651 const THREADS: usize = 44;
652
653 let (s, r) = &bounded::<usize>(0);
654
655 scope(|scope| {
656 for i in 0..THREADS {
657 scope.spawn(move |_| {
658 select! {
659 recv(r) -> v => assert_ne!(v.unwrap(), i),
660 send(s, i) -> _ => {},
661 }
662 });
663 }
664 })
665 .unwrap();
666
667 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
668 }
669
670 #[test]
matching_with_leftover()671 fn matching_with_leftover() {
672 const THREADS: usize = 55;
673
674 let (s, r) = &bounded::<usize>(0);
675
676 scope(|scope| {
677 for i in 0..THREADS {
678 scope.spawn(move |_| {
679 select! {
680 recv(r) -> v => assert_ne!(v.unwrap(), i),
681 send(s, i) -> _ => {},
682 }
683 });
684 }
685 s.send(!0).unwrap();
686 })
687 .unwrap();
688
689 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
690 }
691
692 #[test]
channel_through_channel()693 fn channel_through_channel() {
694 #[cfg(miri)]
695 const COUNT: usize = 100;
696 #[cfg(not(miri))]
697 const COUNT: usize = 1000;
698
699 type T = Box<dyn Any + Send>;
700
701 for cap in 0..3 {
702 let (s, r) = bounded::<T>(cap);
703
704 scope(|scope| {
705 scope.spawn(move |_| {
706 let mut s = s;
707
708 for _ in 0..COUNT {
709 let (new_s, new_r) = bounded(cap);
710 let new_r: T = Box::new(Some(new_r));
711
712 select! {
713 send(s, new_r) -> _ => {}
714 }
715
716 s = new_s;
717 }
718 });
719
720 scope.spawn(move |_| {
721 let mut r = r;
722
723 for _ in 0..COUNT {
724 r = select! {
725 recv(r) -> msg => {
726 msg.unwrap()
727 .downcast_mut::<Option<Receiver<T>>>()
728 .unwrap()
729 .take()
730 .unwrap()
731 }
732 }
733 }
734 });
735 })
736 .unwrap();
737 }
738 }
739
740 #[test]
linearizable_default()741 fn linearizable_default() {
742 #[cfg(miri)]
743 const COUNT: usize = 100;
744 #[cfg(not(miri))]
745 const COUNT: usize = 100_000;
746
747 for step in 0..2 {
748 let (start_s, start_r) = bounded::<()>(0);
749 let (end_s, end_r) = bounded::<()>(0);
750
751 let ((s1, r1), (s2, r2)) = if step == 0 {
752 (bounded::<i32>(1), bounded::<i32>(1))
753 } else {
754 (unbounded::<i32>(), unbounded::<i32>())
755 };
756
757 scope(|scope| {
758 scope.spawn(|_| {
759 for _ in 0..COUNT {
760 start_s.send(()).unwrap();
761
762 s1.send(1).unwrap();
763 select! {
764 recv(r1) -> _ => {}
765 recv(r2) -> _ => {}
766 default => unreachable!()
767 }
768
769 end_s.send(()).unwrap();
770 let _ = r2.try_recv();
771 }
772 });
773
774 for _ in 0..COUNT {
775 start_r.recv().unwrap();
776
777 s2.send(1).unwrap();
778 let _ = r1.try_recv();
779
780 end_r.recv().unwrap();
781 }
782 })
783 .unwrap();
784 }
785 }
786
787 #[test]
linearizable_timeout()788 fn linearizable_timeout() {
789 #[cfg(miri)]
790 const COUNT: usize = 100;
791 #[cfg(not(miri))]
792 const COUNT: usize = 100_000;
793
794 for step in 0..2 {
795 let (start_s, start_r) = bounded::<()>(0);
796 let (end_s, end_r) = bounded::<()>(0);
797
798 let ((s1, r1), (s2, r2)) = if step == 0 {
799 (bounded::<i32>(1), bounded::<i32>(1))
800 } else {
801 (unbounded::<i32>(), unbounded::<i32>())
802 };
803
804 scope(|scope| {
805 scope.spawn(|_| {
806 for _ in 0..COUNT {
807 start_s.send(()).unwrap();
808
809 s1.send(1).unwrap();
810 select! {
811 recv(r1) -> _ => {}
812 recv(r2) -> _ => {}
813 default(ms(0)) => unreachable!()
814 }
815
816 end_s.send(()).unwrap();
817 let _ = r2.try_recv();
818 }
819 });
820
821 for _ in 0..COUNT {
822 start_r.recv().unwrap();
823
824 s2.send(1).unwrap();
825 let _ = r1.try_recv();
826
827 end_r.recv().unwrap();
828 }
829 })
830 .unwrap();
831 }
832 }
833
834 #[test]
fairness1()835 fn fairness1() {
836 #[cfg(miri)]
837 const COUNT: usize = 100;
838 #[cfg(not(miri))]
839 const COUNT: usize = 10_000;
840
841 let (s1, r1) = bounded::<()>(COUNT);
842 let (s2, r2) = unbounded::<()>();
843
844 for _ in 0..COUNT {
845 s1.send(()).unwrap();
846 s2.send(()).unwrap();
847 }
848
849 let mut hits = [0usize; 4];
850 for _ in 0..COUNT {
851 select! {
852 recv(r1) -> _ => hits[0] += 1,
853 recv(r2) -> _ => hits[1] += 1,
854 recv(after(ms(0))) -> _ => hits[2] += 1,
855 recv(tick(ms(0))) -> _ => hits[3] += 1,
856 }
857 }
858 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
859 }
860
861 #[test]
fairness2()862 fn fairness2() {
863 #[cfg(miri)]
864 const COUNT: usize = 100;
865 #[cfg(not(miri))]
866 const COUNT: usize = 10_000;
867
868 let (s1, r1) = unbounded::<()>();
869 let (s2, r2) = bounded::<()>(1);
870 let (s3, r3) = bounded::<()>(0);
871
872 scope(|scope| {
873 scope.spawn(|_| {
874 let (hole, _r) = bounded(0);
875
876 for _ in 0..COUNT {
877 let s1 = if s1.is_empty() { &s1 } else { &hole };
878 let s2 = if s2.is_empty() { &s2 } else { &hole };
879
880 select! {
881 send(s1, ()) -> res => assert!(res.is_ok()),
882 send(s2, ()) -> res => assert!(res.is_ok()),
883 send(s3, ()) -> res => assert!(res.is_ok()),
884 }
885 }
886 });
887
888 let hits = vec![Cell::new(0usize); 3];
889 for _ in 0..COUNT {
890 select! {
891 recv(r1) -> _ => hits[0].set(hits[0].get() + 1),
892 recv(r2) -> _ => hits[1].set(hits[1].get() + 1),
893 recv(r3) -> _ => hits[2].set(hits[2].get() + 1),
894 }
895 }
896 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
897 })
898 .unwrap();
899 }
900
901 #[test]
fairness_recv()902 fn fairness_recv() {
903 #[cfg(miri)]
904 const COUNT: usize = 100;
905 #[cfg(not(miri))]
906 const COUNT: usize = 10_000;
907
908 let (s1, r1) = bounded::<()>(COUNT);
909 let (s2, r2) = unbounded::<()>();
910
911 for _ in 0..COUNT {
912 s1.send(()).unwrap();
913 s2.send(()).unwrap();
914 }
915
916 let mut hits = [0usize; 2];
917 while hits[0] + hits[1] < COUNT {
918 select! {
919 recv(r1) -> _ => hits[0] += 1,
920 recv(r2) -> _ => hits[1] += 1,
921 }
922 }
923 assert!(hits.iter().all(|x| *x >= COUNT / 4));
924 }
925
926 #[test]
fairness_send()927 fn fairness_send() {
928 #[cfg(miri)]
929 const COUNT: usize = 100;
930 #[cfg(not(miri))]
931 const COUNT: usize = 10_000;
932
933 let (s1, _r1) = bounded::<()>(COUNT);
934 let (s2, _r2) = unbounded::<()>();
935
936 let mut hits = [0usize; 2];
937 for _ in 0..COUNT {
938 select! {
939 send(s1, ()) -> _ => hits[0] += 1,
940 send(s2, ()) -> _ => hits[1] += 1,
941 }
942 }
943 assert!(hits.iter().all(|x| *x >= COUNT / 4));
944 }
945
946 #[test]
unfairness()947 fn unfairness() {
948 #[cfg(miri)]
949 const COUNT: usize = 100;
950 #[cfg(not(miri))]
951 const COUNT: usize = 10_000;
952
953 let (s1, r1) = unbounded::<()>();
954 let (s2, r2) = unbounded::<()>();
955 let (s3, r3) = unbounded::<()>();
956
957 for _ in 0..COUNT {
958 s1.send(()).unwrap();
959 s2.send(()).unwrap();
960 }
961 s3.send(()).unwrap();
962
963 let mut hits = [0usize; 3];
964 for _ in 0..COUNT {
965 select_biased! {
966 recv(r1) -> _ => hits[0] += 1,
967 recv(r2) -> _ => hits[1] += 1,
968 recv(r3) -> _ => hits[2] += 1,
969 }
970 }
971 assert_eq!(hits, [COUNT, 0, 0]);
972
973 for _ in 0..COUNT {
974 select_biased! {
975 recv(r1) -> _ => hits[0] += 1,
976 recv(r2) -> _ => hits[1] += 1,
977 recv(r3) -> _ => hits[2] += 1,
978 }
979 }
980 assert_eq!(hits, [COUNT, COUNT, 0]);
981 }
982
983 #[test]
unfairness_timeout()984 fn unfairness_timeout() {
985 #[cfg(miri)]
986 const COUNT: usize = 100;
987 #[cfg(not(miri))]
988 const COUNT: usize = 10_000;
989
990 let (s1, r1) = unbounded::<()>();
991 let (s2, r2) = unbounded::<()>();
992 let (s3, r3) = unbounded::<()>();
993
994 for _ in 0..COUNT {
995 s1.send(()).unwrap();
996 s2.send(()).unwrap();
997 }
998 s3.send(()).unwrap();
999
1000 let mut hits = [0usize; 3];
1001 for _ in 0..COUNT {
1002 select_biased! {
1003 recv(r1) -> _ => hits[0] += 1,
1004 recv(r2) -> _ => hits[1] += 1,
1005 recv(r3) -> _ => hits[2] += 1,
1006 default(ms(1000)) => unreachable!(),
1007 }
1008 }
1009 assert_eq!(hits, [COUNT, 0, 0]);
1010
1011 for _ in 0..COUNT {
1012 select_biased! {
1013 recv(r1) -> _ => hits[0] += 1,
1014 recv(r2) -> _ => hits[1] += 1,
1015 recv(r3) -> _ => hits[2] += 1,
1016 default(ms(1000)) => unreachable!(),
1017 }
1018 }
1019 assert_eq!(hits, [COUNT, COUNT, 0]);
1020 }
1021
1022 #[test]
unfairness_try()1023 fn unfairness_try() {
1024 #[cfg(miri)]
1025 const COUNT: usize = 100;
1026 #[cfg(not(miri))]
1027 const COUNT: usize = 10_000;
1028
1029 let (s1, r1) = unbounded::<()>();
1030 let (s2, r2) = unbounded::<()>();
1031 let (s3, r3) = unbounded::<()>();
1032
1033 for _ in 0..COUNT {
1034 s1.send(()).unwrap();
1035 s2.send(()).unwrap();
1036 }
1037 s3.send(()).unwrap();
1038
1039 let mut hits = [0usize; 3];
1040 for _ in 0..COUNT {
1041 select_biased! {
1042 recv(r1) -> _ => hits[0] += 1,
1043 recv(r2) -> _ => hits[1] += 1,
1044 recv(r3) -> _ => hits[2] += 1,
1045 default() => unreachable!(),
1046 }
1047 }
1048 assert_eq!(hits, [COUNT, 0, 0]);
1049
1050 for _ in 0..COUNT {
1051 select_biased! {
1052 recv(r1) -> _ => hits[0] += 1,
1053 recv(r2) -> _ => hits[1] += 1,
1054 recv(r3) -> _ => hits[2] += 1,
1055 default() => unreachable!(),
1056 }
1057 }
1058 assert_eq!(hits, [COUNT, COUNT, 0]);
1059 }
1060
1061 #[allow(clippy::or_fun_call, clippy::unnecessary_literal_unwrap)] // This is intentional.
1062 #[test]
references()1063 fn references() {
1064 let (s, r) = unbounded::<i32>();
1065 select! {
1066 send(s, 0) -> _ => {}
1067 recv(r) -> _ => {}
1068 }
1069 select! {
1070 send(&&&&s, 0) -> _ => {}
1071 recv(&&&&r) -> _ => {}
1072 }
1073 select! {
1074 recv(Some(&r).unwrap_or(&never())) -> _ => {},
1075 default => {}
1076 }
1077 select! {
1078 recv(Some(r).unwrap_or(never())) -> _ => {},
1079 default => {}
1080 }
1081 }
1082
1083 #[test]
case_blocks()1084 fn case_blocks() {
1085 let (s, r) = unbounded::<i32>();
1086
1087 select! {
1088 recv(r) -> _ => 3.0,
1089 recv(r) -> _ => loop {
1090 unreachable!()
1091 },
1092 recv(r) -> _ => match 7 + 3 {
1093 _ => unreachable!()
1094 },
1095 default => 7.
1096 };
1097
1098 select! {
1099 recv(r) -> msg => if msg.is_ok() {
1100 unreachable!()
1101 },
1102 default => ()
1103 }
1104
1105 drop(s);
1106 }
1107
1108 #[allow(clippy::redundant_closure_call)] // This is intentional.
1109 #[test]
move_handles()1110 fn move_handles() {
1111 let (s, r) = unbounded::<i32>();
1112 select! {
1113 recv((move || r)()) -> _ => {}
1114 send((move || s)(), 0) -> _ => {}
1115 }
1116 }
1117
1118 #[test]
infer_types()1119 fn infer_types() {
1120 let (s, r) = unbounded();
1121 select! {
1122 recv(r) -> _ => {}
1123 default => {}
1124 }
1125 s.send(()).unwrap();
1126
1127 let (s, r) = unbounded();
1128 select! {
1129 send(s, ()) -> _ => {}
1130 }
1131 r.recv().unwrap();
1132 }
1133
1134 #[test]
default_syntax()1135 fn default_syntax() {
1136 let (s, r) = bounded::<i32>(0);
1137
1138 select! {
1139 recv(r) -> _ => panic!(),
1140 default => {}
1141 }
1142 select! {
1143 send(s, 0) -> _ => panic!(),
1144 default() => {}
1145 }
1146 select! {
1147 default => {}
1148 }
1149 select! {
1150 default() => {}
1151 }
1152 }
1153
1154 #[test]
same_variable_name()1155 fn same_variable_name() {
1156 let (_, r) = unbounded::<i32>();
1157 select! {
1158 recv(r) -> r => assert!(r.is_err()),
1159 }
1160 }
1161
1162 #[test]
handles_on_heap()1163 fn handles_on_heap() {
1164 let (s, r) = unbounded::<i32>();
1165 let (s, r) = (Box::new(s), Box::new(r));
1166
1167 select! {
1168 send(*s, 0) -> _ => {}
1169 recv(*r) -> _ => {}
1170 default => {}
1171 }
1172
1173 drop(s);
1174 drop(r);
1175 }
1176
1177 #[test]
once_blocks()1178 fn once_blocks() {
1179 let (s, r) = unbounded::<i32>();
1180
1181 let once = Box::new(());
1182 select! {
1183 send(s, 0) -> _ => drop(once),
1184 }
1185
1186 let once = Box::new(());
1187 select! {
1188 recv(r) -> _ => drop(once),
1189 }
1190
1191 let once1 = Box::new(());
1192 let once2 = Box::new(());
1193 select! {
1194 send(s, 0) -> _ => drop(once1),
1195 default => drop(once2),
1196 }
1197
1198 let once1 = Box::new(());
1199 let once2 = Box::new(());
1200 select! {
1201 recv(r) -> _ => drop(once1),
1202 default => drop(once2),
1203 }
1204
1205 let once1 = Box::new(());
1206 let once2 = Box::new(());
1207 select! {
1208 recv(r) -> _ => drop(once1),
1209 send(s, 0) -> _ => drop(once2),
1210 }
1211 }
1212
1213 #[test]
once_receiver()1214 fn once_receiver() {
1215 let (_, r) = unbounded::<i32>();
1216
1217 let once = Box::new(());
1218 let get = move || {
1219 drop(once);
1220 r
1221 };
1222
1223 select! {
1224 recv(get()) -> _ => {}
1225 }
1226 }
1227
1228 #[test]
once_sender()1229 fn once_sender() {
1230 let (s, _) = unbounded::<i32>();
1231
1232 let once = Box::new(());
1233 let get = move || {
1234 drop(once);
1235 s
1236 };
1237
1238 select! {
1239 send(get(), 5) -> _ => {}
1240 }
1241 }
1242
1243 #[test]
parse_nesting()1244 fn parse_nesting() {
1245 let (_, r) = unbounded::<i32>();
1246
1247 select! {
1248 recv(r) -> _ => {}
1249 recv(r) -> _ => {
1250 select! {
1251 recv(r) -> _ => {}
1252 recv(r) -> _ => {
1253 select! {
1254 recv(r) -> _ => {}
1255 recv(r) -> _ => {
1256 select! {
1257 default => {}
1258 }
1259 }
1260 }
1261 }
1262 }
1263 }
1264 }
1265 }
1266
1267 #[test]
evaluate()1268 fn evaluate() {
1269 let (s, r) = unbounded::<i32>();
1270
1271 let v = select! {
1272 recv(r) -> _ => "foo".into(),
1273 send(s, 0) -> _ => "bar".to_owned(),
1274 default => "baz".to_string(),
1275 };
1276 assert_eq!(v, "bar");
1277
1278 let v = select! {
1279 recv(r) -> _ => "foo".into(),
1280 default => "baz".to_string(),
1281 };
1282 assert_eq!(v, "foo");
1283
1284 let v = select! {
1285 recv(r) -> _ => "foo".into(),
1286 default => "baz".to_string(),
1287 };
1288 assert_eq!(v, "baz");
1289 }
1290
1291 #[test]
deref()1292 fn deref() {
1293 use crossbeam_channel as cc;
1294
1295 struct Sender<T>(cc::Sender<T>);
1296 struct Receiver<T>(cc::Receiver<T>);
1297
1298 impl<T> Deref for Receiver<T> {
1299 type Target = cc::Receiver<T>;
1300
1301 fn deref(&self) -> &Self::Target {
1302 &self.0
1303 }
1304 }
1305
1306 impl<T> Deref for Sender<T> {
1307 type Target = cc::Sender<T>;
1308
1309 fn deref(&self) -> &Self::Target {
1310 &self.0
1311 }
1312 }
1313
1314 let (s, r) = bounded::<i32>(0);
1315 let (s, r) = (Sender(s), Receiver(r));
1316
1317 select! {
1318 send(s, 0) -> _ => panic!(),
1319 recv(r) -> _ => panic!(),
1320 default => {}
1321 }
1322 }
1323
1324 #[test]
result_types()1325 fn result_types() {
1326 let (s, _) = bounded::<i32>(0);
1327 let (_, r) = bounded::<i32>(0);
1328
1329 select! {
1330 recv(r) -> res => { let _: Result<i32, RecvError> = res; },
1331 }
1332 select! {
1333 recv(r) -> res => { let _: Result<i32, RecvError> = res; },
1334 default => {}
1335 }
1336 select! {
1337 recv(r) -> res => { let _: Result<i32, RecvError> = res; },
1338 default(ms(0)) => {}
1339 }
1340
1341 select! {
1342 send(s, 0) -> res => { let _: Result<(), SendError<i32>> = res; },
1343 }
1344 select! {
1345 send(s, 0) -> res => { let _: Result<(), SendError<i32>> = res; },
1346 default => {}
1347 }
1348 select! {
1349 send(s, 0) -> res => { let _: Result<(), SendError<i32>> = res; },
1350 default(ms(0)) => {}
1351 }
1352
1353 select! {
1354 send(s, 0) -> res => { let _: Result<(), SendError<i32>> = res; },
1355 recv(r) -> res => { let _: Result<i32, RecvError> = res; },
1356 }
1357 }
1358
1359 #[test]
try_recv()1360 fn try_recv() {
1361 let (s, r) = bounded(0);
1362
1363 scope(|scope| {
1364 scope.spawn(move |_| {
1365 select! {
1366 recv(r) -> _ => panic!(),
1367 default => {}
1368 }
1369 thread::sleep(ms(1500));
1370 select! {
1371 recv(r) -> v => assert_eq!(v, Ok(7)),
1372 default => panic!(),
1373 }
1374 thread::sleep(ms(500));
1375 select! {
1376 recv(r) -> v => assert_eq!(v, Err(RecvError)),
1377 default => panic!(),
1378 }
1379 });
1380 scope.spawn(move |_| {
1381 thread::sleep(ms(1000));
1382 select! {
1383 send(s, 7) -> res => res.unwrap(),
1384 }
1385 });
1386 })
1387 .unwrap();
1388 }
1389
1390 #[test]
recv()1391 fn recv() {
1392 let (s, r) = bounded(0);
1393
1394 scope(|scope| {
1395 scope.spawn(move |_| {
1396 select! {
1397 recv(r) -> v => assert_eq!(v, Ok(7)),
1398 }
1399 thread::sleep(ms(1000));
1400 select! {
1401 recv(r) -> v => assert_eq!(v, Ok(8)),
1402 }
1403 thread::sleep(ms(1000));
1404 select! {
1405 recv(r) -> v => assert_eq!(v, Ok(9)),
1406 }
1407 select! {
1408 recv(r) -> v => assert_eq!(v, Err(RecvError)),
1409 }
1410 });
1411 scope.spawn(move |_| {
1412 thread::sleep(ms(1500));
1413 select! {
1414 send(s, 7) -> res => res.unwrap(),
1415 }
1416 select! {
1417 send(s, 8) -> res => res.unwrap(),
1418 }
1419 select! {
1420 send(s, 9) -> res => res.unwrap(),
1421 }
1422 });
1423 })
1424 .unwrap();
1425 }
1426
1427 #[test]
recv_timeout()1428 fn recv_timeout() {
1429 let (s, r) = bounded::<i32>(0);
1430
1431 scope(|scope| {
1432 scope.spawn(move |_| {
1433 select! {
1434 recv(r) -> _ => panic!(),
1435 default(ms(1000)) => {}
1436 }
1437 select! {
1438 recv(r) -> v => assert_eq!(v, Ok(7)),
1439 default(ms(1000)) => panic!(),
1440 }
1441 select! {
1442 recv(r) -> v => assert_eq!(v, Err(RecvError)),
1443 default(ms(1000)) => panic!(),
1444 }
1445 });
1446 scope.spawn(move |_| {
1447 thread::sleep(ms(1500));
1448 select! {
1449 send(s, 7) -> res => res.unwrap(),
1450 }
1451 });
1452 })
1453 .unwrap();
1454 }
1455
1456 #[test]
try_send()1457 fn try_send() {
1458 let (s, r) = bounded(0);
1459
1460 scope(|scope| {
1461 scope.spawn(move |_| {
1462 select! {
1463 send(s, 7) -> _ => panic!(),
1464 default => {}
1465 }
1466 thread::sleep(ms(1500));
1467 select! {
1468 send(s, 8) -> res => res.unwrap(),
1469 default => panic!(),
1470 }
1471 thread::sleep(ms(500));
1472 select! {
1473 send(s, 8) -> res => assert_eq!(res, Err(SendError(8))),
1474 default => panic!(),
1475 }
1476 });
1477 scope.spawn(move |_| {
1478 thread::sleep(ms(1000));
1479 select! {
1480 recv(r) -> v => assert_eq!(v, Ok(8)),
1481 }
1482 });
1483 })
1484 .unwrap();
1485 }
1486
1487 #[test]
send()1488 fn send() {
1489 let (s, r) = bounded(0);
1490
1491 scope(|scope| {
1492 scope.spawn(move |_| {
1493 select! {
1494 send(s, 7) -> res => res.unwrap(),
1495 }
1496 thread::sleep(ms(1000));
1497 select! {
1498 send(s, 8) -> res => res.unwrap(),
1499 }
1500 thread::sleep(ms(1000));
1501 select! {
1502 send(s, 9) -> res => res.unwrap(),
1503 }
1504 });
1505 scope.spawn(move |_| {
1506 thread::sleep(ms(1500));
1507 select! {
1508 recv(r) -> v => assert_eq!(v, Ok(7)),
1509 }
1510 select! {
1511 recv(r) -> v => assert_eq!(v, Ok(8)),
1512 }
1513 select! {
1514 recv(r) -> v => assert_eq!(v, Ok(9)),
1515 }
1516 });
1517 })
1518 .unwrap();
1519 }
1520
1521 #[test]
send_timeout()1522 fn send_timeout() {
1523 let (s, r) = bounded(0);
1524
1525 scope(|scope| {
1526 scope.spawn(move |_| {
1527 select! {
1528 send(s, 7) -> _ => panic!(),
1529 default(ms(1000)) => {}
1530 }
1531 select! {
1532 send(s, 8) -> res => res.unwrap(),
1533 default(ms(1000)) => panic!(),
1534 }
1535 select! {
1536 send(s, 9) -> res => assert_eq!(res, Err(SendError(9))),
1537 default(ms(1000)) => panic!(),
1538 }
1539 });
1540 scope.spawn(move |_| {
1541 thread::sleep(ms(1500));
1542 select! {
1543 recv(r) -> v => assert_eq!(v, Ok(8)),
1544 }
1545 });
1546 })
1547 .unwrap();
1548 }
1549
1550 #[test]
disconnect_wakes_sender()1551 fn disconnect_wakes_sender() {
1552 let (s, r) = bounded(0);
1553
1554 scope(|scope| {
1555 scope.spawn(move |_| {
1556 select! {
1557 send(s, ()) -> res => assert_eq!(res, Err(SendError(()))),
1558 }
1559 });
1560 scope.spawn(move |_| {
1561 thread::sleep(ms(1000));
1562 drop(r);
1563 });
1564 })
1565 .unwrap();
1566 }
1567
1568 #[test]
disconnect_wakes_receiver()1569 fn disconnect_wakes_receiver() {
1570 let (s, r) = bounded::<()>(0);
1571
1572 scope(|scope| {
1573 scope.spawn(move |_| {
1574 select! {
1575 recv(r) -> res => assert_eq!(res, Err(RecvError)),
1576 }
1577 });
1578 scope.spawn(move |_| {
1579 thread::sleep(ms(1000));
1580 drop(s);
1581 });
1582 })
1583 .unwrap();
1584 }
1585
1586 #[test]
trailing_comma()1587 fn trailing_comma() {
1588 let (s, r) = unbounded::<usize>();
1589
1590 select! {
1591 send(s, 1,) -> _ => {},
1592 recv(r,) -> _ => {},
1593 default(ms(1000),) => {},
1594 }
1595 }
1596