1 use crate::sync::watch;
2 
3 use loom::future::block_on;
4 use loom::thread;
5 use std::sync::Arc;
6 
7 #[test]
smoke()8 fn smoke() {
9     loom::model(|| {
10         let (tx, mut rx1) = watch::channel(1);
11         let mut rx2 = rx1.clone();
12         let mut rx3 = rx1.clone();
13         let mut rx4 = rx1.clone();
14         let mut rx5 = rx1.clone();
15 
16         let th = thread::spawn(move || {
17             tx.send(2).unwrap();
18         });
19 
20         block_on(rx1.changed()).unwrap();
21         assert_eq!(*rx1.borrow(), 2);
22 
23         block_on(rx2.changed()).unwrap();
24         assert_eq!(*rx2.borrow(), 2);
25 
26         block_on(rx3.changed()).unwrap();
27         assert_eq!(*rx3.borrow(), 2);
28 
29         block_on(rx4.changed()).unwrap();
30         assert_eq!(*rx4.borrow(), 2);
31 
32         block_on(rx5.changed()).unwrap();
33         assert_eq!(*rx5.borrow(), 2);
34 
35         th.join().unwrap();
36     })
37 }
38 
39 #[test]
wait_for_test()40 fn wait_for_test() {
41     loom::model(move || {
42         let (tx, mut rx) = watch::channel(false);
43 
44         let tx_arc = Arc::new(tx);
45         let tx1 = tx_arc.clone();
46         let tx2 = tx_arc.clone();
47 
48         let th1 = thread::spawn(move || {
49             for _ in 0..2 {
50                 tx1.send_modify(|_x| {});
51             }
52         });
53 
54         let th2 = thread::spawn(move || {
55             tx2.send(true).unwrap();
56         });
57 
58         assert_eq!(*block_on(rx.wait_for(|x| *x)).unwrap(), true);
59 
60         th1.join().unwrap();
61         th2.join().unwrap();
62     });
63 }
64 
65 #[test]
wait_for_returns_correct_value()66 fn wait_for_returns_correct_value() {
67     loom::model(move || {
68         let (tx, mut rx) = watch::channel(0);
69 
70         let jh = thread::spawn(move || {
71             tx.send(1).unwrap();
72             tx.send(2).unwrap();
73             tx.send(3).unwrap();
74         });
75 
76         // Stop at the first value we are called at.
77         let mut stopped_at = usize::MAX;
78         let returned = *block_on(rx.wait_for(|x| {
79             stopped_at = *x;
80             true
81         }))
82         .unwrap();
83 
84         // Check that it returned the same value as the one we returned
85         // `true` for.
86         assert_eq!(stopped_at, returned);
87 
88         jh.join().unwrap();
89     });
90 }
91 
92 #[test]
multiple_sender_drop_concurrently()93 fn multiple_sender_drop_concurrently() {
94     loom::model(move || {
95         let (tx1, rx) = watch::channel(0);
96         let tx2 = tx1.clone();
97 
98         let jh = thread::spawn(move || {
99             drop(tx2);
100         });
101         assert!(rx.has_changed().is_ok());
102 
103         drop(tx1);
104 
105         jh.join().unwrap();
106 
107         // Check if all sender are dropped and closed flag is set.
108         assert!(rx.has_changed().is_err());
109     });
110 }
111