1 use crate::sync::Notify;
2 use std::future::Future;
3 use std::sync::Arc;
4 use std::task::{Context, RawWaker, RawWakerVTable, Waker};
5 
6 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
7 use wasm_bindgen_test::wasm_bindgen_test as test;
8 
9 #[test]
notify_clones_waker_before_lock()10 fn notify_clones_waker_before_lock() {
11     const VTABLE: &RawWakerVTable = &RawWakerVTable::new(clone_w, wake, wake_by_ref, drop_w);
12 
13     unsafe fn clone_w(data: *const ()) -> RawWaker {
14         let ptr = data as *const Notify;
15         Arc::<Notify>::increment_strong_count(ptr);
16         // Or some other arbitrary code that shouldn't be executed while the
17         // Notify wait list is locked.
18         (*ptr).notify_one();
19         RawWaker::new(data, VTABLE)
20     }
21 
22     unsafe fn drop_w(data: *const ()) {
23         drop(Arc::<Notify>::from_raw(data as *const Notify));
24     }
25 
26     unsafe fn wake(_data: *const ()) {
27         unreachable!()
28     }
29 
30     unsafe fn wake_by_ref(_data: *const ()) {
31         unreachable!()
32     }
33 
34     let notify = Arc::new(Notify::new());
35     let notify2 = notify.clone();
36 
37     let waker =
38         unsafe { Waker::from_raw(RawWaker::new(Arc::into_raw(notify2) as *const _, VTABLE)) };
39     let mut cx = Context::from_waker(&waker);
40 
41     let future = notify.notified();
42     pin!(future);
43 
44     // The result doesn't matter, we're just testing that we don't deadlock.
45     let _ = future.poll(&mut cx);
46 }
47 
48 #[cfg(panic = "unwind")]
49 #[test]
notify_waiters_handles_panicking_waker()50 fn notify_waiters_handles_panicking_waker() {
51     use futures::task::ArcWake;
52 
53     let notify = Arc::new(Notify::new());
54 
55     struct PanickingWaker(#[allow(dead_code)] Arc<Notify>);
56 
57     impl ArcWake for PanickingWaker {
58         fn wake_by_ref(_arc_self: &Arc<Self>) {
59             panic!("waker panicked");
60         }
61     }
62 
63     let bad_fut = notify.notified();
64     pin!(bad_fut);
65 
66     let waker = futures::task::waker(Arc::new(PanickingWaker(notify.clone())));
67     let mut cx = Context::from_waker(&waker);
68     let _ = bad_fut.poll(&mut cx);
69 
70     let mut futs = Vec::new();
71     for _ in 0..32 {
72         let mut fut = tokio_test::task::spawn(notify.notified());
73         assert!(fut.poll().is_pending());
74         futs.push(fut);
75     }
76 
77     assert!(std::panic::catch_unwind(|| {
78         notify.notify_waiters();
79     })
80     .is_err());
81 
82     for mut fut in futs {
83         assert!(fut.poll().is_ready());
84     }
85 }
86 
87 #[test]
notify_simple()88 fn notify_simple() {
89     let notify = Notify::new();
90 
91     let mut fut1 = tokio_test::task::spawn(notify.notified());
92     assert!(fut1.poll().is_pending());
93 
94     let mut fut2 = tokio_test::task::spawn(notify.notified());
95     assert!(fut2.poll().is_pending());
96 
97     notify.notify_waiters();
98 
99     assert!(fut1.poll().is_ready());
100     assert!(fut2.poll().is_ready());
101 }
102 
103 #[test]
104 #[cfg(not(target_family = "wasm"))]
watch_test()105 fn watch_test() {
106     let rt = crate::runtime::Builder::new_current_thread()
107         .build()
108         .unwrap();
109 
110     rt.block_on(async {
111         let (tx, mut rx) = crate::sync::watch::channel(());
112 
113         crate::spawn(async move {
114             let _ = tx.send(());
115         });
116 
117         let _ = rx.changed().await;
118     });
119 }
120