1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "sync")]
3 
4 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
5 use wasm_bindgen_test::wasm_bindgen_test as test;
6 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
7 use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
8 
9 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
10 use tokio::test as maybe_tokio_test;
11 
12 use tokio::sync::oneshot;
13 use tokio::sync::oneshot::error::TryRecvError;
14 use tokio_test::*;
15 
16 use std::future::Future;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19 
20 #[allow(unused)]
21 trait AssertSend: Send {}
22 impl AssertSend for oneshot::Sender<i32> {}
23 impl AssertSend for oneshot::Receiver<i32> {}
24 
25 #[allow(unused)]
26 trait SenderExt {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>27     fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>;
28 }
29 
30 impl<T> SenderExt for oneshot::Sender<T> {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>31     fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
32         tokio::pin! {
33             let fut = self.closed();
34         }
35         fut.poll(cx)
36     }
37 }
38 
39 #[test]
send_recv()40 fn send_recv() {
41     let (tx, rx) = oneshot::channel();
42     let mut rx = task::spawn(rx);
43 
44     assert_pending!(rx.poll());
45 
46     assert_ok!(tx.send(1));
47 
48     assert!(rx.is_woken());
49 
50     let val = assert_ready_ok!(rx.poll());
51     assert_eq!(val, 1);
52 }
53 
54 #[maybe_tokio_test]
async_send_recv()55 async fn async_send_recv() {
56     let (tx, rx) = oneshot::channel();
57 
58     assert_ok!(tx.send(1));
59     assert_eq!(1, assert_ok!(rx.await));
60 }
61 
62 #[test]
close_tx()63 fn close_tx() {
64     let (tx, rx) = oneshot::channel::<i32>();
65     let mut rx = task::spawn(rx);
66 
67     assert_pending!(rx.poll());
68 
69     drop(tx);
70 
71     assert!(rx.is_woken());
72     assert_ready_err!(rx.poll());
73 }
74 
75 #[test]
close_rx()76 fn close_rx() {
77     // First, without checking poll_closed()
78     //
79     let (tx, _) = oneshot::channel();
80 
81     assert_err!(tx.send(1));
82 
83     // Second, via poll_closed();
84 
85     let (tx, rx) = oneshot::channel();
86     let mut tx = task::spawn(tx);
87 
88     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
89 
90     drop(rx);
91 
92     assert!(tx.is_woken());
93     assert!(tx.is_closed());
94     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
95 
96     assert_err!(tx.into_inner().send(1));
97 }
98 
99 #[tokio::test]
100 #[cfg(feature = "full")]
async_rx_closed()101 async fn async_rx_closed() {
102     let (mut tx, rx) = oneshot::channel::<()>();
103 
104     tokio::spawn(async move {
105         drop(rx);
106     });
107 
108     tx.closed().await;
109 }
110 
111 #[test]
explicit_close_poll()112 fn explicit_close_poll() {
113     // First, with message sent
114     let (tx, rx) = oneshot::channel();
115     let mut rx = task::spawn(rx);
116 
117     assert_ok!(tx.send(1));
118 
119     rx.close();
120 
121     let value = assert_ready_ok!(rx.poll());
122     assert_eq!(value, 1);
123 
124     // Second, without the message sent
125     let (tx, rx) = oneshot::channel::<i32>();
126     let mut tx = task::spawn(tx);
127     let mut rx = task::spawn(rx);
128 
129     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
130 
131     rx.close();
132 
133     assert!(tx.is_woken());
134     assert!(tx.is_closed());
135     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
136 
137     assert_err!(tx.into_inner().send(1));
138     assert_ready_err!(rx.poll());
139 
140     // Again, but without sending the value this time
141     let (tx, rx) = oneshot::channel::<i32>();
142     let mut tx = task::spawn(tx);
143     let mut rx = task::spawn(rx);
144 
145     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
146 
147     rx.close();
148 
149     assert!(tx.is_woken());
150     assert!(tx.is_closed());
151     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
152 
153     assert_ready_err!(rx.poll());
154 }
155 
156 #[test]
explicit_close_try_recv()157 fn explicit_close_try_recv() {
158     // First, with message sent
159     let (tx, mut rx) = oneshot::channel();
160 
161     assert_ok!(tx.send(1));
162 
163     rx.close();
164 
165     let val = assert_ok!(rx.try_recv());
166     assert_eq!(1, val);
167 
168     // Second, without the message sent
169     let (tx, mut rx) = oneshot::channel::<i32>();
170     let mut tx = task::spawn(tx);
171 
172     assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
173 
174     rx.close();
175 
176     assert!(tx.is_woken());
177     assert!(tx.is_closed());
178     assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
179 
180     assert_err!(rx.try_recv());
181 }
182 
183 #[test]
184 #[should_panic]
185 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
close_try_recv_poll()186 fn close_try_recv_poll() {
187     let (_tx, rx) = oneshot::channel::<i32>();
188     let mut rx = task::spawn(rx);
189 
190     rx.close();
191 
192     assert_err!(rx.try_recv());
193 
194     let _ = rx.poll();
195 }
196 
197 #[test]
close_after_recv()198 fn close_after_recv() {
199     let (tx, mut rx) = oneshot::channel::<i32>();
200 
201     tx.send(17).unwrap();
202 
203     assert_eq!(17, rx.try_recv().unwrap());
204     rx.close();
205 }
206 
207 #[test]
try_recv_after_completion()208 fn try_recv_after_completion() {
209     let (tx, mut rx) = oneshot::channel::<i32>();
210 
211     tx.send(17).unwrap();
212 
213     assert_eq!(17, rx.try_recv().unwrap());
214     assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
215     rx.close();
216 }
217 
218 #[test]
try_recv_after_completion_await()219 fn try_recv_after_completion_await() {
220     let (tx, rx) = oneshot::channel::<i32>();
221     let mut rx = task::spawn(rx);
222 
223     tx.send(17).unwrap();
224 
225     assert_eq!(Ok(17), assert_ready!(rx.poll()));
226     assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
227     rx.close();
228 }
229 
230 #[test]
drops_tasks()231 fn drops_tasks() {
232     let (mut tx, mut rx) = oneshot::channel::<i32>();
233     let mut tx_task = task::spawn(());
234     let mut rx_task = task::spawn(());
235 
236     assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
237     assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
238 
239     drop(tx);
240     drop(rx);
241 
242     assert_eq!(1, tx_task.waker_ref_count());
243     assert_eq!(1, rx_task.waker_ref_count());
244 }
245 
246 #[test]
receiver_changes_task()247 fn receiver_changes_task() {
248     let (tx, mut rx) = oneshot::channel();
249 
250     let mut task1 = task::spawn(());
251     let mut task2 = task::spawn(());
252 
253     assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
254 
255     assert_eq!(2, task1.waker_ref_count());
256     assert_eq!(1, task2.waker_ref_count());
257 
258     assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
259 
260     assert_eq!(1, task1.waker_ref_count());
261     assert_eq!(2, task2.waker_ref_count());
262 
263     assert_ok!(tx.send(1));
264 
265     assert!(!task1.is_woken());
266     assert!(task2.is_woken());
267 
268     assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
269 }
270 
271 #[test]
sender_changes_task()272 fn sender_changes_task() {
273     let (mut tx, rx) = oneshot::channel::<i32>();
274 
275     let mut task1 = task::spawn(());
276     let mut task2 = task::spawn(());
277 
278     assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
279 
280     assert_eq!(2, task1.waker_ref_count());
281     assert_eq!(1, task2.waker_ref_count());
282 
283     assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
284 
285     assert_eq!(1, task1.waker_ref_count());
286     assert_eq!(2, task2.waker_ref_count());
287 
288     drop(rx);
289 
290     assert!(!task1.is_woken());
291     assert!(task2.is_woken());
292 
293     assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
294 }
295