1 #![warn(rust_2018_idioms)]
2 #![cfg(all(feature = "full", not(miri)))]
3 
4 // All io tests that deal with shutdown is currently ignored because there are known bugs in with
5 // shutting down the io driver while concurrently registering new resources. See
6 // https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 for more details.
7 //
8 // When this has been fixed we want to re-enable these tests.
9 
10 use std::time::Duration;
11 use tokio::runtime::{Handle, Runtime};
12 use tokio::sync::mpsc;
13 #[cfg(not(target_os = "wasi"))]
14 use tokio::{net, time};
15 
16 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
17 macro_rules! multi_threaded_rt_test {
18     ($($t:tt)*) => {
19         mod threaded_scheduler_4_threads_only {
20             use super::*;
21 
22             $($t)*
23 
24             fn rt() -> Runtime {
25                 tokio::runtime::Builder::new_multi_thread()
26                     .worker_threads(4)
27                     .enable_all()
28                     .build()
29                     .unwrap()
30             }
31         }
32 
33         mod threaded_scheduler_1_thread_only {
34             use super::*;
35 
36             $($t)*
37 
38             fn rt() -> Runtime {
39                 tokio::runtime::Builder::new_multi_thread()
40                     .worker_threads(1)
41                     .enable_all()
42                     .build()
43                     .unwrap()
44             }
45         }
46     }
47 }
48 
49 #[cfg(not(target_os = "wasi"))]
50 macro_rules! rt_test {
51     ($($t:tt)*) => {
52         mod current_thread_scheduler {
53             use super::*;
54 
55             $($t)*
56 
57             fn rt() -> Runtime {
58                 tokio::runtime::Builder::new_current_thread()
59                     .enable_all()
60                     .build()
61                     .unwrap()
62             }
63         }
64 
65         mod threaded_scheduler_4_threads {
66             use super::*;
67 
68             $($t)*
69 
70             fn rt() -> Runtime {
71                 tokio::runtime::Builder::new_multi_thread()
72                     .worker_threads(4)
73                     .enable_all()
74                     .build()
75                     .unwrap()
76             }
77         }
78 
79         mod threaded_scheduler_1_thread {
80             use super::*;
81 
82             $($t)*
83 
84             fn rt() -> Runtime {
85                 tokio::runtime::Builder::new_multi_thread()
86                     .worker_threads(1)
87                     .enable_all()
88                     .build()
89                     .unwrap()
90             }
91         }
92     }
93 }
94 
95 // ==== runtime independent futures ======
96 
97 #[test]
basic()98 fn basic() {
99     test_with_runtimes(|| {
100         let one = Handle::current().block_on(async { 1 });
101         assert_eq!(1, one);
102     });
103 }
104 
105 #[test]
bounded_mpsc_channel()106 fn bounded_mpsc_channel() {
107     test_with_runtimes(|| {
108         let (tx, mut rx) = mpsc::channel(1024);
109 
110         Handle::current().block_on(tx.send(42)).unwrap();
111 
112         let value = Handle::current().block_on(rx.recv()).unwrap();
113         assert_eq!(value, 42);
114     });
115 }
116 
117 #[test]
unbounded_mpsc_channel()118 fn unbounded_mpsc_channel() {
119     test_with_runtimes(|| {
120         let (tx, mut rx) = mpsc::unbounded_channel();
121 
122         let _ = tx.send(42);
123 
124         let value = Handle::current().block_on(rx.recv()).unwrap();
125         assert_eq!(value, 42);
126     })
127 }
128 
129 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support file operations or bind
130 rt_test! {
131     use tokio::fs;
132     // ==== spawn blocking futures ======
133 
134     #[test]
135     fn basic_fs() {
136         let rt = rt();
137         let _enter = rt.enter();
138 
139         let contents = Handle::current()
140             .block_on(fs::read_to_string("Cargo.toml"))
141             .unwrap();
142         assert!(contents.contains("https://tokio.rs"));
143     }
144 
145     #[test]
146     fn fs_shutdown_before_started() {
147         let rt = rt();
148         let _enter = rt.enter();
149         rt.shutdown_timeout(Duration::from_secs(1000));
150 
151         let err: std::io::Error = Handle::current()
152             .block_on(fs::read_to_string("Cargo.toml"))
153             .unwrap_err();
154 
155         assert_eq!(err.kind(), std::io::ErrorKind::Other);
156 
157         let inner_err = err.get_ref().expect("no inner error");
158         assert_eq!(inner_err.to_string(), "background task failed");
159     }
160 
161     #[test]
162     fn basic_spawn_blocking() {
163         use tokio::task::spawn_blocking;
164         let rt = rt();
165         let _enter = rt.enter();
166 
167         let answer = Handle::current()
168             .block_on(spawn_blocking(|| {
169                 std::thread::sleep(Duration::from_millis(100));
170                 42
171             }))
172             .unwrap();
173 
174         assert_eq!(answer, 42);
175     }
176 
177     #[test]
178     fn spawn_blocking_after_shutdown_fails() {
179         use tokio::task::spawn_blocking;
180         let rt = rt();
181         let _enter = rt.enter();
182         rt.shutdown_timeout(Duration::from_secs(1000));
183 
184         let join_err = Handle::current()
185             .block_on(spawn_blocking(|| {
186                 std::thread::sleep(Duration::from_millis(100));
187                 42
188             }))
189             .unwrap_err();
190 
191         assert!(join_err.is_cancelled());
192     }
193 
194     #[test]
195     fn spawn_blocking_started_before_shutdown_continues() {
196         use tokio::task::spawn_blocking;
197         let rt = rt();
198         let _enter = rt.enter();
199 
200         let handle = spawn_blocking(|| {
201             std::thread::sleep(Duration::from_secs(1));
202             42
203         });
204 
205         rt.shutdown_timeout(Duration::from_secs(1000));
206 
207         let answer = Handle::current().block_on(handle).unwrap();
208 
209         assert_eq!(answer, 42);
210     }
211 
212     // ==== net ======
213 
214     #[test]
215     #[cfg_attr(miri, ignore)] // No `socket` in miri.
216     fn tcp_listener_bind() {
217         let rt = rt();
218         let _enter = rt.enter();
219 
220         Handle::current()
221             .block_on(net::TcpListener::bind("127.0.0.1:0"))
222             .unwrap();
223     }
224 
225     // All io tests are ignored for now. See above why that is.
226     #[ignore]
227     #[test]
228     fn tcp_listener_connect_after_shutdown() {
229         let rt = rt();
230         let _enter = rt.enter();
231 
232         rt.shutdown_timeout(Duration::from_secs(1000));
233 
234         let err = Handle::current()
235             .block_on(net::TcpListener::bind("127.0.0.1:0"))
236             .unwrap_err();
237 
238         assert_eq!(err.kind(), std::io::ErrorKind::Other);
239         assert_eq!(
240             err.get_ref().unwrap().to_string(),
241             "A Tokio 1.x context was found, but it is being shutdown.",
242         );
243     }
244 
245     // All io tests are ignored for now. See above why that is.
246     #[ignore]
247     #[test]
248     fn tcp_listener_connect_before_shutdown() {
249         let rt = rt();
250         let _enter = rt.enter();
251 
252         let bind_future = net::TcpListener::bind("127.0.0.1:0");
253 
254         rt.shutdown_timeout(Duration::from_secs(1000));
255 
256         let err = Handle::current().block_on(bind_future).unwrap_err();
257 
258         assert_eq!(err.kind(), std::io::ErrorKind::Other);
259         assert_eq!(
260             err.get_ref().unwrap().to_string(),
261             "A Tokio 1.x context was found, but it is being shutdown.",
262         );
263     }
264 
265     #[test]
266     #[cfg_attr(miri, ignore)] // No `socket` in miri.
267     fn udp_socket_bind() {
268         let rt = rt();
269         let _enter = rt.enter();
270 
271         Handle::current()
272             .block_on(net::UdpSocket::bind("127.0.0.1:0"))
273             .unwrap();
274     }
275 
276     // All io tests are ignored for now. See above why that is.
277     #[ignore]
278     #[test]
279     fn udp_stream_bind_after_shutdown() {
280         let rt = rt();
281         let _enter = rt.enter();
282 
283         rt.shutdown_timeout(Duration::from_secs(1000));
284 
285         let err = Handle::current()
286             .block_on(net::UdpSocket::bind("127.0.0.1:0"))
287             .unwrap_err();
288 
289         assert_eq!(err.kind(), std::io::ErrorKind::Other);
290         assert_eq!(
291             err.get_ref().unwrap().to_string(),
292             "A Tokio 1.x context was found, but it is being shutdown.",
293         );
294     }
295 
296     // All io tests are ignored for now. See above why that is.
297     #[ignore]
298     #[test]
299     fn udp_stream_bind_before_shutdown() {
300         let rt = rt();
301         let _enter = rt.enter();
302 
303         let bind_future = net::UdpSocket::bind("127.0.0.1:0");
304 
305         rt.shutdown_timeout(Duration::from_secs(1000));
306 
307         let err = Handle::current().block_on(bind_future).unwrap_err();
308 
309         assert_eq!(err.kind(), std::io::ErrorKind::Other);
310         assert_eq!(
311             err.get_ref().unwrap().to_string(),
312             "A Tokio 1.x context was found, but it is being shutdown.",
313         );
314     }
315 
316     // All io tests are ignored for now. See above why that is.
317     #[ignore]
318     #[cfg(unix)]
319     #[test]
320     fn unix_listener_bind_after_shutdown() {
321         let rt = rt();
322         let _enter = rt.enter();
323 
324         let dir = tempfile::tempdir().unwrap();
325         let path = dir.path().join("socket");
326 
327         rt.shutdown_timeout(Duration::from_secs(1000));
328 
329         let err = net::UnixListener::bind(path).unwrap_err();
330 
331         assert_eq!(err.kind(), std::io::ErrorKind::Other);
332         assert_eq!(
333             err.get_ref().unwrap().to_string(),
334             "A Tokio 1.x context was found, but it is being shutdown.",
335         );
336     }
337 
338     // All io tests are ignored for now. See above why that is.
339     #[ignore]
340     #[cfg(unix)]
341     #[test]
342     fn unix_listener_shutdown_after_bind() {
343         let rt = rt();
344         let _enter = rt.enter();
345 
346         let dir = tempfile::tempdir().unwrap();
347         let path = dir.path().join("socket");
348 
349         let listener = net::UnixListener::bind(path).unwrap();
350 
351         rt.shutdown_timeout(Duration::from_secs(1000));
352 
353         // this should not timeout but fail immediately since the runtime has been shutdown
354         let err = Handle::current().block_on(listener.accept()).unwrap_err();
355 
356         assert_eq!(err.kind(), std::io::ErrorKind::Other);
357         assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
358     }
359 
360     // All io tests are ignored for now. See above why that is.
361     #[ignore]
362     #[cfg(unix)]
363     #[test]
364     fn unix_listener_shutdown_after_accept() {
365         let rt = rt();
366         let _enter = rt.enter();
367 
368         let dir = tempfile::tempdir().unwrap();
369         let path = dir.path().join("socket");
370 
371         let listener = net::UnixListener::bind(path).unwrap();
372 
373         let accept_future = listener.accept();
374 
375         rt.shutdown_timeout(Duration::from_secs(1000));
376 
377         // this should not timeout but fail immediately since the runtime has been shutdown
378         let err = Handle::current().block_on(accept_future).unwrap_err();
379 
380         assert_eq!(err.kind(), std::io::ErrorKind::Other);
381         assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
382     }
383 
384     // ==== nesting ======
385 
386     #[test]
387     #[should_panic(
388         expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks."
389     )]
390     fn nesting() {
391         fn some_non_async_function() -> i32 {
392             Handle::current().block_on(time::sleep(Duration::from_millis(10)));
393             1
394         }
395 
396         let rt = rt();
397 
398         rt.block_on(async { some_non_async_function() });
399     }
400 
401     #[test]
402     fn spawn_after_runtime_dropped() {
403         use futures::future::FutureExt;
404 
405         let rt = rt();
406 
407         let handle = rt.block_on(async move {
408             Handle::current()
409         });
410 
411         let jh1 = handle.spawn(futures::future::pending::<()>());
412 
413         drop(rt);
414 
415         let jh2 = handle.spawn(futures::future::pending::<()>());
416 
417         let err1 = jh1.now_or_never().unwrap().unwrap_err();
418         let err2 = jh2.now_or_never().unwrap().unwrap_err();
419         assert!(err1.is_cancelled());
420         assert!(err2.is_cancelled());
421     }
422 }
423 
424 #[cfg(not(target_os = "wasi"))]
425 multi_threaded_rt_test! {
426     #[cfg(unix)]
427     #[cfg_attr(miri, ignore)] // No `socket` in miri.
428     #[test]
429     fn unix_listener_bind() {
430         let rt = rt();
431         let _enter = rt.enter();
432 
433         let dir = tempfile::tempdir().unwrap();
434         let path = dir.path().join("socket");
435 
436         let listener = net::UnixListener::bind(path).unwrap();
437 
438         // this should timeout and not fail immediately since the runtime has not been shutdown
439         let _: tokio::time::error::Elapsed = Handle::current()
440             .block_on(tokio::time::timeout(
441                 Duration::from_millis(10),
442                 listener.accept(),
443             ))
444             .unwrap_err();
445     }
446 
447     // ==== timers ======
448 
449     // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no
450     // one to drive the timers so they will just hang forever. Therefore they are not tested.
451 
452     #[test]
453     fn sleep() {
454         let rt = rt();
455         let _enter = rt.enter();
456 
457         Handle::current().block_on(time::sleep(Duration::from_millis(100)));
458     }
459 
460     #[test]
461     #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
462     fn sleep_before_shutdown_panics() {
463         let rt = rt();
464         let _enter = rt.enter();
465 
466         let f = time::sleep(Duration::from_millis(100));
467 
468         rt.shutdown_timeout(Duration::from_secs(1000));
469 
470         Handle::current().block_on(f);
471     }
472 
473     #[test]
474     #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
475     fn sleep_after_shutdown_panics() {
476         let rt = rt();
477         let _enter = rt.enter();
478 
479         rt.shutdown_timeout(Duration::from_secs(1000));
480 
481         Handle::current().block_on(time::sleep(Duration::from_millis(100)));
482     }
483 }
484 
485 // ==== utils ======
486 
487 /// Create a new multi threaded runtime
488 #[cfg(not(target_os = "wasi"))]
new_multi_thread(n: usize) -> Runtime489 fn new_multi_thread(n: usize) -> Runtime {
490     tokio::runtime::Builder::new_multi_thread()
491         .worker_threads(n)
492         .enable_all()
493         .build()
494         .unwrap()
495 }
496 
497 /// Create a new single threaded runtime
new_current_thread() -> Runtime498 fn new_current_thread() -> Runtime {
499     tokio::runtime::Builder::new_current_thread()
500         .enable_all()
501         .build()
502         .unwrap()
503 }
504 
505 /// Utility to test things on both kinds of runtimes both before and after shutting it down.
test_with_runtimes<F>(f: F) where F: Fn(),506 fn test_with_runtimes<F>(f: F)
507 where
508     F: Fn(),
509 {
510     {
511         let rt = new_current_thread();
512         let _enter = rt.enter();
513         f();
514 
515         rt.shutdown_timeout(Duration::from_secs(1000));
516         f();
517     }
518 
519     #[cfg(not(target_os = "wasi"))]
520     {
521         let rt = new_multi_thread(1);
522         let _enter = rt.enter();
523         f();
524 
525         rt.shutdown_timeout(Duration::from_secs(1000));
526         f();
527     }
528 
529     #[cfg(not(target_os = "wasi"))]
530     {
531         let rt = new_multi_thread(4);
532         let _enter = rt.enter();
533         f();
534 
535         rt.shutdown_timeout(Duration::from_secs(1000));
536         f();
537     }
538 }
539