1 use crate::sync::oneshot;
2
3 use loom::future::block_on;
4 use loom::thread;
5 use std::future::poll_fn;
6 use std::task::Poll::{Pending, Ready};
7
8 #[test]
smoke()9 fn smoke() {
10 loom::model(|| {
11 let (tx, rx) = oneshot::channel();
12
13 thread::spawn(move || {
14 tx.send(1).unwrap();
15 });
16
17 let value = block_on(rx).unwrap();
18 assert_eq!(1, value);
19 });
20 }
21
22 #[test]
changing_rx_task()23 fn changing_rx_task() {
24 loom::model(|| {
25 let (tx, mut rx) = oneshot::channel();
26
27 thread::spawn(move || {
28 tx.send(1).unwrap();
29 });
30
31 let rx = thread::spawn(move || {
32 let ready = block_on(poll_fn(|cx| match Pin::new(&mut rx).poll(cx) {
33 Ready(Ok(value)) => {
34 assert_eq!(1, value);
35 Ready(true)
36 }
37 Ready(Err(_)) => unimplemented!(),
38 Pending => Ready(false),
39 }));
40
41 if ready {
42 None
43 } else {
44 Some(rx)
45 }
46 })
47 .join()
48 .unwrap();
49
50 if let Some(rx) = rx {
51 // Previous task parked, use a new task...
52 let value = block_on(rx).unwrap();
53 assert_eq!(1, value);
54 }
55 });
56 }
57
58 #[test]
try_recv_close()59 fn try_recv_close() {
60 // reproduces https://github.com/tokio-rs/tokio/issues/4225
61 loom::model(|| {
62 let (tx, mut rx) = oneshot::channel();
63 thread::spawn(move || {
64 let _ = tx.send(());
65 });
66
67 rx.close();
68 let _ = rx.try_recv();
69 })
70 }
71
72 #[test]
recv_closed()73 fn recv_closed() {
74 // reproduces https://github.com/tokio-rs/tokio/issues/4225
75 loom::model(|| {
76 let (tx, mut rx) = oneshot::channel();
77
78 thread::spawn(move || {
79 let _ = tx.send(1);
80 });
81
82 rx.close();
83 let _ = block_on(rx);
84 });
85 }
86
87 // TODO: Move this into `oneshot` proper.
88
89 use std::future::Future;
90 use std::pin::Pin;
91 use std::task::{Context, Poll};
92
93 struct OnClose<'a> {
94 tx: &'a mut oneshot::Sender<i32>,
95 }
96
97 impl<'a> OnClose<'a> {
new(tx: &'a mut oneshot::Sender<i32>) -> Self98 fn new(tx: &'a mut oneshot::Sender<i32>) -> Self {
99 OnClose { tx }
100 }
101 }
102
103 impl Future for OnClose<'_> {
104 type Output = bool;
105
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool>106 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
107 let fut = self.get_mut().tx.closed();
108 crate::pin!(fut);
109
110 Ready(fut.poll(cx).is_ready())
111 }
112 }
113
114 #[test]
changing_tx_task()115 fn changing_tx_task() {
116 loom::model(|| {
117 let (mut tx, rx) = oneshot::channel::<i32>();
118
119 thread::spawn(move || {
120 drop(rx);
121 });
122
123 let tx = thread::spawn(move || {
124 let t1 = block_on(OnClose::new(&mut tx));
125
126 if t1 {
127 None
128 } else {
129 Some(tx)
130 }
131 })
132 .join()
133 .unwrap();
134
135 if let Some(mut tx) = tx {
136 // Previous task parked, use a new task...
137 block_on(OnClose::new(&mut tx));
138 }
139 });
140 }
141
142 #[test]
checking_tx_send_ok_not_drop()143 fn checking_tx_send_ok_not_drop() {
144 use std::borrow::Borrow;
145 use std::cell::Cell;
146
147 loom::thread_local! {
148 static IS_RX: Cell<bool> = Cell::new(true);
149 }
150
151 struct Msg;
152
153 impl Drop for Msg {
154 fn drop(&mut self) {
155 IS_RX.with(|is_rx: &Cell<_>| {
156 // On `tx.send(msg)` returning `Err(msg)`,
157 // we call `std::mem::forget(msg)`, so that
158 // `drop` is not expected to be called in the
159 // tx thread.
160 assert!(is_rx.get());
161 });
162 }
163 }
164
165 let mut builder = loom::model::Builder::new();
166 builder.preemption_bound = Some(2);
167
168 builder.check(|| {
169 let (tx, rx) = oneshot::channel();
170
171 // tx thread
172 let tx_thread_join_handle = thread::spawn(move || {
173 // Ensure that `Msg::drop` in this thread will see is_rx == false
174 IS_RX.with(|is_rx: &Cell<_>| {
175 is_rx.set(false);
176 });
177 if let Err(msg) = tx.send(Msg) {
178 std::mem::forget(msg);
179 }
180 });
181
182 // main thread is the rx thread
183 drop(rx);
184
185 tx_thread_join_handle.join().unwrap();
186 });
187 }
188