1 #![allow(unknown_lints, unexpected_cfgs)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 #![cfg(not(miri))] // Possible bug on Miri.
5 
6 use tokio::runtime::Runtime;
7 use tokio::sync::oneshot;
8 use tokio::time::{timeout, Duration};
9 use tokio_test::{assert_err, assert_ok};
10 
11 use std::future::Future;
12 use std::pin::Pin;
13 use std::sync::atomic::{AtomicBool, Ordering};
14 use std::task::{Context, Poll};
15 use std::thread;
16 
17 mod support {
18     pub(crate) mod mpsc_stream;
19 }
20 
21 macro_rules! cfg_metrics {
22     ($($t:tt)*) => {
23         #[cfg(all(tokio_unstable, target_has_atomic = "64"))]
24         {
25             $( $t )*
26         }
27     }
28 }
29 
30 #[test]
spawned_task_does_not_progress_without_block_on()31 fn spawned_task_does_not_progress_without_block_on() {
32     let (tx, mut rx) = oneshot::channel();
33 
34     let rt = rt();
35 
36     rt.spawn(async move {
37         assert_ok!(tx.send("hello"));
38     });
39 
40     thread::sleep(Duration::from_millis(50));
41 
42     assert_err!(rx.try_recv());
43 
44     let out = rt.block_on(async { assert_ok!(rx.await) });
45 
46     assert_eq!(out, "hello");
47 }
48 
49 #[test]
no_extra_poll()50 fn no_extra_poll() {
51     use pin_project_lite::pin_project;
52     use std::pin::Pin;
53     use std::sync::{
54         atomic::{AtomicUsize, Ordering::SeqCst},
55         Arc,
56     };
57     use std::task::{Context, Poll};
58     use tokio_stream::{Stream, StreamExt};
59 
60     pin_project! {
61         struct TrackPolls<S> {
62             npolls: Arc<AtomicUsize>,
63             #[pin]
64             s: S,
65         }
66     }
67 
68     impl<S> Stream for TrackPolls<S>
69     where
70         S: Stream,
71     {
72         type Item = S::Item;
73         fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74             let this = self.project();
75             this.npolls.fetch_add(1, SeqCst);
76             this.s.poll_next(cx)
77         }
78     }
79 
80     let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>();
81     let rx = TrackPolls {
82         npolls: Arc::new(AtomicUsize::new(0)),
83         s: rx,
84     };
85     let npolls = Arc::clone(&rx.npolls);
86 
87     let rt = rt();
88 
89     // TODO: could probably avoid this, but why not.
90     let mut rx = Box::pin(rx);
91 
92     rt.spawn(async move { while rx.next().await.is_some() {} });
93     rt.block_on(async {
94         tokio::task::yield_now().await;
95     });
96 
97     // should have been polled exactly once: the initial poll
98     assert_eq!(npolls.load(SeqCst), 1);
99 
100     tx.send(()).unwrap();
101     rt.block_on(async {
102         tokio::task::yield_now().await;
103     });
104 
105     // should have been polled twice more: once to yield Some(), then once to yield Pending
106     assert_eq!(npolls.load(SeqCst), 1 + 2);
107 
108     drop(tx);
109     rt.block_on(async {
110         tokio::task::yield_now().await;
111     });
112 
113     // should have been polled once more: to yield None
114     assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
115 }
116 
117 #[test]
acquire_mutex_in_drop()118 fn acquire_mutex_in_drop() {
119     use futures::future::pending;
120     use tokio::task;
121 
122     let (tx1, rx1) = oneshot::channel();
123     let (tx2, rx2) = oneshot::channel();
124 
125     let rt = rt();
126 
127     rt.spawn(async move {
128         let _ = rx2.await;
129         unreachable!();
130     });
131 
132     rt.spawn(async move {
133         let _ = rx1.await;
134         tx2.send(()).unwrap();
135         unreachable!();
136     });
137 
138     // Spawn a task that will never notify
139     rt.spawn(async move {
140         pending::<()>().await;
141         tx1.send(()).unwrap();
142     });
143 
144     // Tick the loop
145     rt.block_on(async {
146         task::yield_now().await;
147     });
148 
149     // Drop the rt
150     drop(rt);
151 }
152 
153 #[test]
drop_tasks_in_context()154 fn drop_tasks_in_context() {
155     static SUCCESS: AtomicBool = AtomicBool::new(false);
156 
157     struct ContextOnDrop;
158 
159     impl Future for ContextOnDrop {
160         type Output = ();
161 
162         fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
163             Poll::Pending
164         }
165     }
166 
167     impl Drop for ContextOnDrop {
168         fn drop(&mut self) {
169             if tokio::runtime::Handle::try_current().is_ok() {
170                 SUCCESS.store(true, Ordering::SeqCst);
171             }
172         }
173     }
174 
175     let rt = rt();
176     rt.spawn(ContextOnDrop);
177     drop(rt);
178 
179     assert!(SUCCESS.load(Ordering::SeqCst));
180 }
181 
182 #[test]
183 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
184 #[should_panic(expected = "boom")]
wake_in_drop_after_panic()185 fn wake_in_drop_after_panic() {
186     struct WakeOnDrop(Option<oneshot::Sender<()>>);
187 
188     impl Drop for WakeOnDrop {
189         fn drop(&mut self) {
190             let _ = self.0.take().unwrap().send(());
191         }
192     }
193 
194     let rt = rt();
195 
196     let (tx1, rx1) = oneshot::channel::<()>();
197     let (tx2, rx2) = oneshot::channel::<()>();
198 
199     // Spawn two tasks. We don't know the order in which they are dropped, so we
200     // make both tasks identical. When the first task is dropped, we wake up the
201     // second task. This ensures that we trigger a wakeup on a live task while
202     // handling the "boom" panic, no matter the order in which the tasks are
203     // dropped.
204     rt.spawn(async move {
205         let _wake_on_drop = WakeOnDrop(Some(tx2));
206         let _ = rx1.await;
207         unreachable!()
208     });
209 
210     rt.spawn(async move {
211         let _wake_on_drop = WakeOnDrop(Some(tx1));
212         let _ = rx2.await;
213         unreachable!()
214     });
215 
216     rt.block_on(async {
217         tokio::task::yield_now().await;
218         panic!("boom");
219     });
220 }
221 
222 #[test]
spawn_two()223 fn spawn_two() {
224     let rt = rt();
225 
226     let out = rt.block_on(async {
227         let (tx, rx) = oneshot::channel();
228 
229         tokio::spawn(async move {
230             tokio::spawn(async move {
231                 tx.send("ZOMG").unwrap();
232             });
233         });
234 
235         assert_ok!(rx.await)
236     });
237 
238     assert_eq!(out, "ZOMG");
239 
240     cfg_metrics! {
241         let metrics = rt.metrics();
242         drop(rt);
243         assert_eq!(0, metrics.remote_schedule_count());
244 
245         let mut local = 0;
246         for i in 0..metrics.num_workers() {
247             local += metrics.worker_local_schedule_count(i);
248         }
249 
250         assert_eq!(2, local);
251     }
252 }
253 
254 #[cfg_attr(target_os = "wasi", ignore = "WASI: std::thread::spawn not supported")]
255 #[test]
spawn_remote()256 fn spawn_remote() {
257     let rt = rt();
258 
259     let out = rt.block_on(async {
260         let (tx, rx) = oneshot::channel();
261 
262         let handle = tokio::spawn(async move {
263             std::thread::spawn(move || {
264                 std::thread::sleep(Duration::from_millis(10));
265                 tx.send("ZOMG").unwrap();
266             });
267 
268             rx.await.unwrap()
269         });
270 
271         handle.await.unwrap()
272     });
273 
274     assert_eq!(out, "ZOMG");
275 
276     cfg_metrics! {
277         let metrics = rt.metrics();
278         drop(rt);
279         assert_eq!(1, metrics.remote_schedule_count());
280 
281         let mut local = 0;
282         for i in 0..metrics.num_workers() {
283             local += metrics.worker_local_schedule_count(i);
284         }
285 
286         assert_eq!(1, local);
287     }
288 }
289 
290 #[test]
291 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
292 #[should_panic(
293     expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
294 )]
timeout_panics_when_no_time_handle()295 fn timeout_panics_when_no_time_handle() {
296     let rt = tokio::runtime::Builder::new_current_thread()
297         .build()
298         .unwrap();
299     rt.block_on(async {
300         let (_tx, rx) = oneshot::channel::<()>();
301         let dur = Duration::from_millis(20);
302         let _ = timeout(dur, rx).await;
303     });
304 }
305 
306 #[cfg(tokio_unstable)]
307 mod unstable {
308     use tokio::runtime::{Builder, RngSeed, UnhandledPanic};
309 
310     #[test]
311     #[should_panic(
312         expected = "a spawned task panicked and the runtime is configured to shut down on unhandled panic"
313     )]
shutdown_on_panic()314     fn shutdown_on_panic() {
315         let rt = Builder::new_current_thread()
316             .unhandled_panic(UnhandledPanic::ShutdownRuntime)
317             .build()
318             .unwrap();
319 
320         rt.block_on(async {
321             tokio::spawn(async {
322                 panic!("boom");
323             });
324 
325             futures::future::pending::<()>().await;
326         })
327     }
328 
329     #[test]
330     #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
spawns_do_nothing()331     fn spawns_do_nothing() {
332         use std::sync::Arc;
333 
334         let rt = Builder::new_current_thread()
335             .unhandled_panic(UnhandledPanic::ShutdownRuntime)
336             .build()
337             .unwrap();
338 
339         let rt1 = Arc::new(rt);
340         let rt2 = rt1.clone();
341 
342         let _ = std::thread::spawn(move || {
343             rt2.block_on(async {
344                 tokio::spawn(async {
345                     panic!("boom");
346                 });
347 
348                 futures::future::pending::<()>().await;
349             })
350         })
351         .join();
352 
353         let task = rt1.spawn(async {});
354         let res = futures::executor::block_on(task);
355         assert!(res.is_err());
356     }
357 
358     #[test]
359     #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
shutdown_all_concurrent_block_on()360     fn shutdown_all_concurrent_block_on() {
361         const N: usize = 2;
362         use std::sync::{mpsc, Arc};
363 
364         let rt = Builder::new_current_thread()
365             .unhandled_panic(UnhandledPanic::ShutdownRuntime)
366             .build()
367             .unwrap();
368 
369         let rt = Arc::new(rt);
370         let mut ths = vec![];
371         let (tx, rx) = mpsc::channel();
372 
373         for _ in 0..N {
374             let rt = rt.clone();
375             let tx = tx.clone();
376             ths.push(std::thread::spawn(move || {
377                 rt.block_on(async {
378                     tx.send(()).unwrap();
379                     futures::future::pending::<()>().await;
380                 });
381             }));
382         }
383 
384         for _ in 0..N {
385             rx.recv().unwrap();
386         }
387 
388         rt.spawn(async {
389             panic!("boom");
390         });
391 
392         for th in ths {
393             assert!(th.join().is_err());
394         }
395     }
396 
397     #[test]
rng_seed()398     fn rng_seed() {
399         let seed = b"bytes used to generate seed";
400         let rt1 = tokio::runtime::Builder::new_current_thread()
401             .rng_seed(RngSeed::from_bytes(seed))
402             .build()
403             .unwrap();
404         let rt1_values = rt1.block_on(async {
405             let rand_1 = tokio::macros::support::thread_rng_n(100);
406             let rand_2 = tokio::macros::support::thread_rng_n(100);
407 
408             (rand_1, rand_2)
409         });
410 
411         let rt2 = tokio::runtime::Builder::new_current_thread()
412             .rng_seed(RngSeed::from_bytes(seed))
413             .build()
414             .unwrap();
415         let rt2_values = rt2.block_on(async {
416             let rand_1 = tokio::macros::support::thread_rng_n(100);
417             let rand_2 = tokio::macros::support::thread_rng_n(100);
418 
419             (rand_1, rand_2)
420         });
421 
422         assert_eq!(rt1_values, rt2_values);
423     }
424 
425     #[test]
rng_seed_multi_enter()426     fn rng_seed_multi_enter() {
427         let seed = b"bytes used to generate seed";
428 
429         fn two_rand_values() -> (u32, u32) {
430             let rand_1 = tokio::macros::support::thread_rng_n(100);
431             let rand_2 = tokio::macros::support::thread_rng_n(100);
432 
433             (rand_1, rand_2)
434         }
435 
436         let rt1 = tokio::runtime::Builder::new_current_thread()
437             .rng_seed(RngSeed::from_bytes(seed))
438             .build()
439             .unwrap();
440         let rt1_values_1 = rt1.block_on(async { two_rand_values() });
441         let rt1_values_2 = rt1.block_on(async { two_rand_values() });
442 
443         let rt2 = tokio::runtime::Builder::new_current_thread()
444             .rng_seed(RngSeed::from_bytes(seed))
445             .build()
446             .unwrap();
447         let rt2_values_1 = rt2.block_on(async { two_rand_values() });
448         let rt2_values_2 = rt2.block_on(async { two_rand_values() });
449 
450         assert_eq!(rt1_values_1, rt2_values_1);
451         assert_eq!(rt1_values_2, rt2_values_2);
452     }
453 }
454 
rt() -> Runtime455 fn rt() -> Runtime {
456     tokio::runtime::Builder::new_current_thread()
457         .enable_all()
458         .build()
459         .unwrap()
460 }
461