1 use crate::sync::oneshot;
2 
3 use loom::future::block_on;
4 use loom::thread;
5 use std::future::poll_fn;
6 use std::task::Poll::{Pending, Ready};
7 
8 #[test]
smoke()9 fn smoke() {
10     loom::model(|| {
11         let (tx, rx) = oneshot::channel();
12 
13         thread::spawn(move || {
14             tx.send(1).unwrap();
15         });
16 
17         let value = block_on(rx).unwrap();
18         assert_eq!(1, value);
19     });
20 }
21 
22 #[test]
changing_rx_task()23 fn changing_rx_task() {
24     loom::model(|| {
25         let (tx, mut rx) = oneshot::channel();
26 
27         thread::spawn(move || {
28             tx.send(1).unwrap();
29         });
30 
31         let rx = thread::spawn(move || {
32             let ready = block_on(poll_fn(|cx| match Pin::new(&mut rx).poll(cx) {
33                 Ready(Ok(value)) => {
34                     assert_eq!(1, value);
35                     Ready(true)
36                 }
37                 Ready(Err(_)) => unimplemented!(),
38                 Pending => Ready(false),
39             }));
40 
41             if ready {
42                 None
43             } else {
44                 Some(rx)
45             }
46         })
47         .join()
48         .unwrap();
49 
50         if let Some(rx) = rx {
51             // Previous task parked, use a new task...
52             let value = block_on(rx).unwrap();
53             assert_eq!(1, value);
54         }
55     });
56 }
57 
58 #[test]
try_recv_close()59 fn try_recv_close() {
60     // reproduces https://github.com/tokio-rs/tokio/issues/4225
61     loom::model(|| {
62         let (tx, mut rx) = oneshot::channel();
63         thread::spawn(move || {
64             let _ = tx.send(());
65         });
66 
67         rx.close();
68         let _ = rx.try_recv();
69     })
70 }
71 
72 #[test]
recv_closed()73 fn recv_closed() {
74     // reproduces https://github.com/tokio-rs/tokio/issues/4225
75     loom::model(|| {
76         let (tx, mut rx) = oneshot::channel();
77 
78         thread::spawn(move || {
79             let _ = tx.send(1);
80         });
81 
82         rx.close();
83         let _ = block_on(rx);
84     });
85 }
86 
87 // TODO: Move this into `oneshot` proper.
88 
89 use std::future::Future;
90 use std::pin::Pin;
91 use std::task::{Context, Poll};
92 
93 struct OnClose<'a> {
94     tx: &'a mut oneshot::Sender<i32>,
95 }
96 
97 impl<'a> OnClose<'a> {
new(tx: &'a mut oneshot::Sender<i32>) -> Self98     fn new(tx: &'a mut oneshot::Sender<i32>) -> Self {
99         OnClose { tx }
100     }
101 }
102 
103 impl Future for OnClose<'_> {
104     type Output = bool;
105 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool>106     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
107         let fut = self.get_mut().tx.closed();
108         crate::pin!(fut);
109 
110         Ready(fut.poll(cx).is_ready())
111     }
112 }
113 
114 #[test]
changing_tx_task()115 fn changing_tx_task() {
116     loom::model(|| {
117         let (mut tx, rx) = oneshot::channel::<i32>();
118 
119         thread::spawn(move || {
120             drop(rx);
121         });
122 
123         let tx = thread::spawn(move || {
124             let t1 = block_on(OnClose::new(&mut tx));
125 
126             if t1 {
127                 None
128             } else {
129                 Some(tx)
130             }
131         })
132         .join()
133         .unwrap();
134 
135         if let Some(mut tx) = tx {
136             // Previous task parked, use a new task...
137             block_on(OnClose::new(&mut tx));
138         }
139     });
140 }
141 
142 #[test]
checking_tx_send_ok_not_drop()143 fn checking_tx_send_ok_not_drop() {
144     use std::borrow::Borrow;
145     use std::cell::Cell;
146 
147     loom::thread_local! {
148         static IS_RX: Cell<bool> = Cell::new(true);
149     }
150 
151     struct Msg;
152 
153     impl Drop for Msg {
154         fn drop(&mut self) {
155             IS_RX.with(|is_rx: &Cell<_>| {
156                 // On `tx.send(msg)` returning `Err(msg)`,
157                 // we call `std::mem::forget(msg)`, so that
158                 // `drop` is not expected to be called in the
159                 // tx thread.
160                 assert!(is_rx.get());
161             });
162         }
163     }
164 
165     let mut builder = loom::model::Builder::new();
166     builder.preemption_bound = Some(2);
167 
168     builder.check(|| {
169         let (tx, rx) = oneshot::channel();
170 
171         // tx thread
172         let tx_thread_join_handle = thread::spawn(move || {
173             // Ensure that `Msg::drop` in this thread will see is_rx == false
174             IS_RX.with(|is_rx: &Cell<_>| {
175                 is_rx.set(false);
176             });
177             if let Err(msg) = tx.send(Msg) {
178                 std::mem::forget(msg);
179             }
180         });
181 
182         // main thread is the rx thread
183         drop(rx);
184 
185         tx_thread_join_handle.join().unwrap();
186     });
187 }
188