1 use crate::sync::watch;
2
3 use loom::future::block_on;
4 use loom::thread;
5 use std::sync::Arc;
6
7 #[test]
smoke()8 fn smoke() {
9 loom::model(|| {
10 let (tx, mut rx1) = watch::channel(1);
11 let mut rx2 = rx1.clone();
12 let mut rx3 = rx1.clone();
13 let mut rx4 = rx1.clone();
14 let mut rx5 = rx1.clone();
15
16 let th = thread::spawn(move || {
17 tx.send(2).unwrap();
18 });
19
20 block_on(rx1.changed()).unwrap();
21 assert_eq!(*rx1.borrow(), 2);
22
23 block_on(rx2.changed()).unwrap();
24 assert_eq!(*rx2.borrow(), 2);
25
26 block_on(rx3.changed()).unwrap();
27 assert_eq!(*rx3.borrow(), 2);
28
29 block_on(rx4.changed()).unwrap();
30 assert_eq!(*rx4.borrow(), 2);
31
32 block_on(rx5.changed()).unwrap();
33 assert_eq!(*rx5.borrow(), 2);
34
35 th.join().unwrap();
36 })
37 }
38
39 #[test]
wait_for_test()40 fn wait_for_test() {
41 loom::model(move || {
42 let (tx, mut rx) = watch::channel(false);
43
44 let tx_arc = Arc::new(tx);
45 let tx1 = tx_arc.clone();
46 let tx2 = tx_arc.clone();
47
48 let th1 = thread::spawn(move || {
49 for _ in 0..2 {
50 tx1.send_modify(|_x| {});
51 }
52 });
53
54 let th2 = thread::spawn(move || {
55 tx2.send(true).unwrap();
56 });
57
58 assert_eq!(*block_on(rx.wait_for(|x| *x)).unwrap(), true);
59
60 th1.join().unwrap();
61 th2.join().unwrap();
62 });
63 }
64
65 #[test]
wait_for_returns_correct_value()66 fn wait_for_returns_correct_value() {
67 loom::model(move || {
68 let (tx, mut rx) = watch::channel(0);
69
70 let jh = thread::spawn(move || {
71 tx.send(1).unwrap();
72 tx.send(2).unwrap();
73 tx.send(3).unwrap();
74 });
75
76 // Stop at the first value we are called at.
77 let mut stopped_at = usize::MAX;
78 let returned = *block_on(rx.wait_for(|x| {
79 stopped_at = *x;
80 true
81 }))
82 .unwrap();
83
84 // Check that it returned the same value as the one we returned
85 // `true` for.
86 assert_eq!(stopped_at, returned);
87
88 jh.join().unwrap();
89 });
90 }
91
92 #[test]
multiple_sender_drop_concurrently()93 fn multiple_sender_drop_concurrently() {
94 loom::model(move || {
95 let (tx1, rx) = watch::channel(0);
96 let tx2 = tx1.clone();
97
98 let jh = thread::spawn(move || {
99 drop(tx2);
100 });
101 assert!(rx.has_changed().is_ok());
102
103 drop(tx1);
104
105 jh.join().unwrap();
106
107 // Check if all sender are dropped and closed flag is set.
108 assert!(rx.has_changed().is_err());
109 });
110 }
111