1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "sync")]
3
4 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
5 use wasm_bindgen_test::wasm_bindgen_test as test;
6 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
7 use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
8
9 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
10 use tokio::test as maybe_tokio_test;
11
12 use tokio::sync::oneshot;
13 use tokio::sync::oneshot::error::TryRecvError;
14 use tokio_test::*;
15
16 use std::future::Future;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19
20 #[allow(unused)]
21 trait AssertSend: Send {}
22 impl AssertSend for oneshot::Sender<i32> {}
23 impl AssertSend for oneshot::Receiver<i32> {}
24
25 #[allow(unused)]
26 trait SenderExt {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>27 fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>;
28 }
29
30 impl<T> SenderExt for oneshot::Sender<T> {
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>31 fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
32 tokio::pin! {
33 let fut = self.closed();
34 }
35 fut.poll(cx)
36 }
37 }
38
39 #[test]
send_recv()40 fn send_recv() {
41 let (tx, rx) = oneshot::channel();
42 let mut rx = task::spawn(rx);
43
44 assert_pending!(rx.poll());
45
46 assert_ok!(tx.send(1));
47
48 assert!(rx.is_woken());
49
50 let val = assert_ready_ok!(rx.poll());
51 assert_eq!(val, 1);
52 }
53
54 #[maybe_tokio_test]
async_send_recv()55 async fn async_send_recv() {
56 let (tx, rx) = oneshot::channel();
57
58 assert_ok!(tx.send(1));
59 assert_eq!(1, assert_ok!(rx.await));
60 }
61
62 #[test]
close_tx()63 fn close_tx() {
64 let (tx, rx) = oneshot::channel::<i32>();
65 let mut rx = task::spawn(rx);
66
67 assert_pending!(rx.poll());
68
69 drop(tx);
70
71 assert!(rx.is_woken());
72 assert_ready_err!(rx.poll());
73 }
74
75 #[test]
close_rx()76 fn close_rx() {
77 // First, without checking poll_closed()
78 //
79 let (tx, _) = oneshot::channel();
80
81 assert_err!(tx.send(1));
82
83 // Second, via poll_closed();
84
85 let (tx, rx) = oneshot::channel();
86 let mut tx = task::spawn(tx);
87
88 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
89
90 drop(rx);
91
92 assert!(tx.is_woken());
93 assert!(tx.is_closed());
94 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
95
96 assert_err!(tx.into_inner().send(1));
97 }
98
99 #[tokio::test]
100 #[cfg(feature = "full")]
async_rx_closed()101 async fn async_rx_closed() {
102 let (mut tx, rx) = oneshot::channel::<()>();
103
104 tokio::spawn(async move {
105 drop(rx);
106 });
107
108 tx.closed().await;
109 }
110
111 #[test]
explicit_close_poll()112 fn explicit_close_poll() {
113 // First, with message sent
114 let (tx, rx) = oneshot::channel();
115 let mut rx = task::spawn(rx);
116
117 assert_ok!(tx.send(1));
118
119 rx.close();
120
121 let value = assert_ready_ok!(rx.poll());
122 assert_eq!(value, 1);
123
124 // Second, without the message sent
125 let (tx, rx) = oneshot::channel::<i32>();
126 let mut tx = task::spawn(tx);
127 let mut rx = task::spawn(rx);
128
129 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
130
131 rx.close();
132
133 assert!(tx.is_woken());
134 assert!(tx.is_closed());
135 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
136
137 assert_err!(tx.into_inner().send(1));
138 assert_ready_err!(rx.poll());
139
140 // Again, but without sending the value this time
141 let (tx, rx) = oneshot::channel::<i32>();
142 let mut tx = task::spawn(tx);
143 let mut rx = task::spawn(rx);
144
145 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
146
147 rx.close();
148
149 assert!(tx.is_woken());
150 assert!(tx.is_closed());
151 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
152
153 assert_ready_err!(rx.poll());
154 }
155
156 #[test]
explicit_close_try_recv()157 fn explicit_close_try_recv() {
158 // First, with message sent
159 let (tx, mut rx) = oneshot::channel();
160
161 assert_ok!(tx.send(1));
162
163 rx.close();
164
165 let val = assert_ok!(rx.try_recv());
166 assert_eq!(1, val);
167
168 // Second, without the message sent
169 let (tx, mut rx) = oneshot::channel::<i32>();
170 let mut tx = task::spawn(tx);
171
172 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
173
174 rx.close();
175
176 assert!(tx.is_woken());
177 assert!(tx.is_closed());
178 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
179
180 assert_err!(rx.try_recv());
181 }
182
183 #[test]
184 #[should_panic]
185 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
close_try_recv_poll()186 fn close_try_recv_poll() {
187 let (_tx, rx) = oneshot::channel::<i32>();
188 let mut rx = task::spawn(rx);
189
190 rx.close();
191
192 assert_err!(rx.try_recv());
193
194 let _ = rx.poll();
195 }
196
197 #[test]
close_after_recv()198 fn close_after_recv() {
199 let (tx, mut rx) = oneshot::channel::<i32>();
200
201 tx.send(17).unwrap();
202
203 assert_eq!(17, rx.try_recv().unwrap());
204 rx.close();
205 }
206
207 #[test]
try_recv_after_completion()208 fn try_recv_after_completion() {
209 let (tx, mut rx) = oneshot::channel::<i32>();
210
211 tx.send(17).unwrap();
212
213 assert_eq!(17, rx.try_recv().unwrap());
214 assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
215 rx.close();
216 }
217
218 #[test]
try_recv_after_completion_await()219 fn try_recv_after_completion_await() {
220 let (tx, rx) = oneshot::channel::<i32>();
221 let mut rx = task::spawn(rx);
222
223 tx.send(17).unwrap();
224
225 assert_eq!(Ok(17), assert_ready!(rx.poll()));
226 assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
227 rx.close();
228 }
229
230 #[test]
drops_tasks()231 fn drops_tasks() {
232 let (mut tx, mut rx) = oneshot::channel::<i32>();
233 let mut tx_task = task::spawn(());
234 let mut rx_task = task::spawn(());
235
236 assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
237 assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
238
239 drop(tx);
240 drop(rx);
241
242 assert_eq!(1, tx_task.waker_ref_count());
243 assert_eq!(1, rx_task.waker_ref_count());
244 }
245
246 #[test]
receiver_changes_task()247 fn receiver_changes_task() {
248 let (tx, mut rx) = oneshot::channel();
249
250 let mut task1 = task::spawn(());
251 let mut task2 = task::spawn(());
252
253 assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
254
255 assert_eq!(2, task1.waker_ref_count());
256 assert_eq!(1, task2.waker_ref_count());
257
258 assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
259
260 assert_eq!(1, task1.waker_ref_count());
261 assert_eq!(2, task2.waker_ref_count());
262
263 assert_ok!(tx.send(1));
264
265 assert!(!task1.is_woken());
266 assert!(task2.is_woken());
267
268 assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
269 }
270
271 #[test]
sender_changes_task()272 fn sender_changes_task() {
273 let (mut tx, rx) = oneshot::channel::<i32>();
274
275 let mut task1 = task::spawn(());
276 let mut task2 = task::spawn(());
277
278 assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
279
280 assert_eq!(2, task1.waker_ref_count());
281 assert_eq!(1, task2.waker_ref_count());
282
283 assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
284
285 assert_eq!(1, task1.waker_ref_count());
286 assert_eq!(2, task2.waker_ref_count());
287
288 drop(rx);
289
290 assert!(!task1.is_woken());
291 assert!(task2.is_woken());
292
293 assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
294 }
295