1 #![allow(unknown_lints, unexpected_cfgs)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 #![cfg(not(miri))] // Possible bug on Miri.
5
6 use tokio::runtime::Runtime;
7 use tokio::sync::oneshot;
8 use tokio::time::{timeout, Duration};
9 use tokio_test::{assert_err, assert_ok};
10
11 use std::future::Future;
12 use std::pin::Pin;
13 use std::sync::atomic::{AtomicBool, Ordering};
14 use std::task::{Context, Poll};
15 use std::thread;
16
17 mod support {
18 pub(crate) mod mpsc_stream;
19 }
20
21 macro_rules! cfg_metrics {
22 ($($t:tt)*) => {
23 #[cfg(all(tokio_unstable, target_has_atomic = "64"))]
24 {
25 $( $t )*
26 }
27 }
28 }
29
30 #[test]
spawned_task_does_not_progress_without_block_on()31 fn spawned_task_does_not_progress_without_block_on() {
32 let (tx, mut rx) = oneshot::channel();
33
34 let rt = rt();
35
36 rt.spawn(async move {
37 assert_ok!(tx.send("hello"));
38 });
39
40 thread::sleep(Duration::from_millis(50));
41
42 assert_err!(rx.try_recv());
43
44 let out = rt.block_on(async { assert_ok!(rx.await) });
45
46 assert_eq!(out, "hello");
47 }
48
49 #[test]
no_extra_poll()50 fn no_extra_poll() {
51 use pin_project_lite::pin_project;
52 use std::pin::Pin;
53 use std::sync::{
54 atomic::{AtomicUsize, Ordering::SeqCst},
55 Arc,
56 };
57 use std::task::{Context, Poll};
58 use tokio_stream::{Stream, StreamExt};
59
60 pin_project! {
61 struct TrackPolls<S> {
62 npolls: Arc<AtomicUsize>,
63 #[pin]
64 s: S,
65 }
66 }
67
68 impl<S> Stream for TrackPolls<S>
69 where
70 S: Stream,
71 {
72 type Item = S::Item;
73 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74 let this = self.project();
75 this.npolls.fetch_add(1, SeqCst);
76 this.s.poll_next(cx)
77 }
78 }
79
80 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>();
81 let rx = TrackPolls {
82 npolls: Arc::new(AtomicUsize::new(0)),
83 s: rx,
84 };
85 let npolls = Arc::clone(&rx.npolls);
86
87 let rt = rt();
88
89 // TODO: could probably avoid this, but why not.
90 let mut rx = Box::pin(rx);
91
92 rt.spawn(async move { while rx.next().await.is_some() {} });
93 rt.block_on(async {
94 tokio::task::yield_now().await;
95 });
96
97 // should have been polled exactly once: the initial poll
98 assert_eq!(npolls.load(SeqCst), 1);
99
100 tx.send(()).unwrap();
101 rt.block_on(async {
102 tokio::task::yield_now().await;
103 });
104
105 // should have been polled twice more: once to yield Some(), then once to yield Pending
106 assert_eq!(npolls.load(SeqCst), 1 + 2);
107
108 drop(tx);
109 rt.block_on(async {
110 tokio::task::yield_now().await;
111 });
112
113 // should have been polled once more: to yield None
114 assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
115 }
116
117 #[test]
acquire_mutex_in_drop()118 fn acquire_mutex_in_drop() {
119 use futures::future::pending;
120 use tokio::task;
121
122 let (tx1, rx1) = oneshot::channel();
123 let (tx2, rx2) = oneshot::channel();
124
125 let rt = rt();
126
127 rt.spawn(async move {
128 let _ = rx2.await;
129 unreachable!();
130 });
131
132 rt.spawn(async move {
133 let _ = rx1.await;
134 tx2.send(()).unwrap();
135 unreachable!();
136 });
137
138 // Spawn a task that will never notify
139 rt.spawn(async move {
140 pending::<()>().await;
141 tx1.send(()).unwrap();
142 });
143
144 // Tick the loop
145 rt.block_on(async {
146 task::yield_now().await;
147 });
148
149 // Drop the rt
150 drop(rt);
151 }
152
153 #[test]
drop_tasks_in_context()154 fn drop_tasks_in_context() {
155 static SUCCESS: AtomicBool = AtomicBool::new(false);
156
157 struct ContextOnDrop;
158
159 impl Future for ContextOnDrop {
160 type Output = ();
161
162 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
163 Poll::Pending
164 }
165 }
166
167 impl Drop for ContextOnDrop {
168 fn drop(&mut self) {
169 if tokio::runtime::Handle::try_current().is_ok() {
170 SUCCESS.store(true, Ordering::SeqCst);
171 }
172 }
173 }
174
175 let rt = rt();
176 rt.spawn(ContextOnDrop);
177 drop(rt);
178
179 assert!(SUCCESS.load(Ordering::SeqCst));
180 }
181
182 #[test]
183 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
184 #[should_panic(expected = "boom")]
wake_in_drop_after_panic()185 fn wake_in_drop_after_panic() {
186 struct WakeOnDrop(Option<oneshot::Sender<()>>);
187
188 impl Drop for WakeOnDrop {
189 fn drop(&mut self) {
190 let _ = self.0.take().unwrap().send(());
191 }
192 }
193
194 let rt = rt();
195
196 let (tx1, rx1) = oneshot::channel::<()>();
197 let (tx2, rx2) = oneshot::channel::<()>();
198
199 // Spawn two tasks. We don't know the order in which they are dropped, so we
200 // make both tasks identical. When the first task is dropped, we wake up the
201 // second task. This ensures that we trigger a wakeup on a live task while
202 // handling the "boom" panic, no matter the order in which the tasks are
203 // dropped.
204 rt.spawn(async move {
205 let _wake_on_drop = WakeOnDrop(Some(tx2));
206 let _ = rx1.await;
207 unreachable!()
208 });
209
210 rt.spawn(async move {
211 let _wake_on_drop = WakeOnDrop(Some(tx1));
212 let _ = rx2.await;
213 unreachable!()
214 });
215
216 rt.block_on(async {
217 tokio::task::yield_now().await;
218 panic!("boom");
219 });
220 }
221
222 #[test]
spawn_two()223 fn spawn_two() {
224 let rt = rt();
225
226 let out = rt.block_on(async {
227 let (tx, rx) = oneshot::channel();
228
229 tokio::spawn(async move {
230 tokio::spawn(async move {
231 tx.send("ZOMG").unwrap();
232 });
233 });
234
235 assert_ok!(rx.await)
236 });
237
238 assert_eq!(out, "ZOMG");
239
240 cfg_metrics! {
241 let metrics = rt.metrics();
242 drop(rt);
243 assert_eq!(0, metrics.remote_schedule_count());
244
245 let mut local = 0;
246 for i in 0..metrics.num_workers() {
247 local += metrics.worker_local_schedule_count(i);
248 }
249
250 assert_eq!(2, local);
251 }
252 }
253
254 #[cfg_attr(target_os = "wasi", ignore = "WASI: std::thread::spawn not supported")]
255 #[test]
spawn_remote()256 fn spawn_remote() {
257 let rt = rt();
258
259 let out = rt.block_on(async {
260 let (tx, rx) = oneshot::channel();
261
262 let handle = tokio::spawn(async move {
263 std::thread::spawn(move || {
264 std::thread::sleep(Duration::from_millis(10));
265 tx.send("ZOMG").unwrap();
266 });
267
268 rx.await.unwrap()
269 });
270
271 handle.await.unwrap()
272 });
273
274 assert_eq!(out, "ZOMG");
275
276 cfg_metrics! {
277 let metrics = rt.metrics();
278 drop(rt);
279 assert_eq!(1, metrics.remote_schedule_count());
280
281 let mut local = 0;
282 for i in 0..metrics.num_workers() {
283 local += metrics.worker_local_schedule_count(i);
284 }
285
286 assert_eq!(1, local);
287 }
288 }
289
290 #[test]
291 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
292 #[should_panic(
293 expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
294 )]
timeout_panics_when_no_time_handle()295 fn timeout_panics_when_no_time_handle() {
296 let rt = tokio::runtime::Builder::new_current_thread()
297 .build()
298 .unwrap();
299 rt.block_on(async {
300 let (_tx, rx) = oneshot::channel::<()>();
301 let dur = Duration::from_millis(20);
302 let _ = timeout(dur, rx).await;
303 });
304 }
305
306 #[cfg(tokio_unstable)]
307 mod unstable {
308 use tokio::runtime::{Builder, RngSeed, UnhandledPanic};
309
310 #[test]
311 #[should_panic(
312 expected = "a spawned task panicked and the runtime is configured to shut down on unhandled panic"
313 )]
shutdown_on_panic()314 fn shutdown_on_panic() {
315 let rt = Builder::new_current_thread()
316 .unhandled_panic(UnhandledPanic::ShutdownRuntime)
317 .build()
318 .unwrap();
319
320 rt.block_on(async {
321 tokio::spawn(async {
322 panic!("boom");
323 });
324
325 futures::future::pending::<()>().await;
326 })
327 }
328
329 #[test]
330 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
spawns_do_nothing()331 fn spawns_do_nothing() {
332 use std::sync::Arc;
333
334 let rt = Builder::new_current_thread()
335 .unhandled_panic(UnhandledPanic::ShutdownRuntime)
336 .build()
337 .unwrap();
338
339 let rt1 = Arc::new(rt);
340 let rt2 = rt1.clone();
341
342 let _ = std::thread::spawn(move || {
343 rt2.block_on(async {
344 tokio::spawn(async {
345 panic!("boom");
346 });
347
348 futures::future::pending::<()>().await;
349 })
350 })
351 .join();
352
353 let task = rt1.spawn(async {});
354 let res = futures::executor::block_on(task);
355 assert!(res.is_err());
356 }
357
358 #[test]
359 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
shutdown_all_concurrent_block_on()360 fn shutdown_all_concurrent_block_on() {
361 const N: usize = 2;
362 use std::sync::{mpsc, Arc};
363
364 let rt = Builder::new_current_thread()
365 .unhandled_panic(UnhandledPanic::ShutdownRuntime)
366 .build()
367 .unwrap();
368
369 let rt = Arc::new(rt);
370 let mut ths = vec![];
371 let (tx, rx) = mpsc::channel();
372
373 for _ in 0..N {
374 let rt = rt.clone();
375 let tx = tx.clone();
376 ths.push(std::thread::spawn(move || {
377 rt.block_on(async {
378 tx.send(()).unwrap();
379 futures::future::pending::<()>().await;
380 });
381 }));
382 }
383
384 for _ in 0..N {
385 rx.recv().unwrap();
386 }
387
388 rt.spawn(async {
389 panic!("boom");
390 });
391
392 for th in ths {
393 assert!(th.join().is_err());
394 }
395 }
396
397 #[test]
rng_seed()398 fn rng_seed() {
399 let seed = b"bytes used to generate seed";
400 let rt1 = tokio::runtime::Builder::new_current_thread()
401 .rng_seed(RngSeed::from_bytes(seed))
402 .build()
403 .unwrap();
404 let rt1_values = rt1.block_on(async {
405 let rand_1 = tokio::macros::support::thread_rng_n(100);
406 let rand_2 = tokio::macros::support::thread_rng_n(100);
407
408 (rand_1, rand_2)
409 });
410
411 let rt2 = tokio::runtime::Builder::new_current_thread()
412 .rng_seed(RngSeed::from_bytes(seed))
413 .build()
414 .unwrap();
415 let rt2_values = rt2.block_on(async {
416 let rand_1 = tokio::macros::support::thread_rng_n(100);
417 let rand_2 = tokio::macros::support::thread_rng_n(100);
418
419 (rand_1, rand_2)
420 });
421
422 assert_eq!(rt1_values, rt2_values);
423 }
424
425 #[test]
rng_seed_multi_enter()426 fn rng_seed_multi_enter() {
427 let seed = b"bytes used to generate seed";
428
429 fn two_rand_values() -> (u32, u32) {
430 let rand_1 = tokio::macros::support::thread_rng_n(100);
431 let rand_2 = tokio::macros::support::thread_rng_n(100);
432
433 (rand_1, rand_2)
434 }
435
436 let rt1 = tokio::runtime::Builder::new_current_thread()
437 .rng_seed(RngSeed::from_bytes(seed))
438 .build()
439 .unwrap();
440 let rt1_values_1 = rt1.block_on(async { two_rand_values() });
441 let rt1_values_2 = rt1.block_on(async { two_rand_values() });
442
443 let rt2 = tokio::runtime::Builder::new_current_thread()
444 .rng_seed(RngSeed::from_bytes(seed))
445 .build()
446 .unwrap();
447 let rt2_values_1 = rt2.block_on(async { two_rand_values() });
448 let rt2_values_2 = rt2.block_on(async { two_rand_values() });
449
450 assert_eq!(rt1_values_1, rt2_values_1);
451 assert_eq!(rt1_values_2, rt2_values_2);
452 }
453 }
454
rt() -> Runtime455 fn rt() -> Runtime {
456 tokio::runtime::Builder::new_current_thread()
457 .enable_all()
458 .build()
459 .unwrap()
460 }
461