1 #![warn(rust_2018_idioms)]
2 #![cfg(all(feature = "full", not(miri)))]
3
4 // All io tests that deal with shutdown is currently ignored because there are known bugs in with
5 // shutting down the io driver while concurrently registering new resources. See
6 // https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 for more details.
7 //
8 // When this has been fixed we want to re-enable these tests.
9
10 use std::time::Duration;
11 use tokio::runtime::{Handle, Runtime};
12 use tokio::sync::mpsc;
13 #[cfg(not(target_os = "wasi"))]
14 use tokio::{net, time};
15
16 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
17 macro_rules! multi_threaded_rt_test {
18 ($($t:tt)*) => {
19 mod threaded_scheduler_4_threads_only {
20 use super::*;
21
22 $($t)*
23
24 fn rt() -> Runtime {
25 tokio::runtime::Builder::new_multi_thread()
26 .worker_threads(4)
27 .enable_all()
28 .build()
29 .unwrap()
30 }
31 }
32
33 mod threaded_scheduler_1_thread_only {
34 use super::*;
35
36 $($t)*
37
38 fn rt() -> Runtime {
39 tokio::runtime::Builder::new_multi_thread()
40 .worker_threads(1)
41 .enable_all()
42 .build()
43 .unwrap()
44 }
45 }
46 }
47 }
48
49 #[cfg(not(target_os = "wasi"))]
50 macro_rules! rt_test {
51 ($($t:tt)*) => {
52 mod current_thread_scheduler {
53 use super::*;
54
55 $($t)*
56
57 fn rt() -> Runtime {
58 tokio::runtime::Builder::new_current_thread()
59 .enable_all()
60 .build()
61 .unwrap()
62 }
63 }
64
65 mod threaded_scheduler_4_threads {
66 use super::*;
67
68 $($t)*
69
70 fn rt() -> Runtime {
71 tokio::runtime::Builder::new_multi_thread()
72 .worker_threads(4)
73 .enable_all()
74 .build()
75 .unwrap()
76 }
77 }
78
79 mod threaded_scheduler_1_thread {
80 use super::*;
81
82 $($t)*
83
84 fn rt() -> Runtime {
85 tokio::runtime::Builder::new_multi_thread()
86 .worker_threads(1)
87 .enable_all()
88 .build()
89 .unwrap()
90 }
91 }
92 }
93 }
94
95 // ==== runtime independent futures ======
96
97 #[test]
basic()98 fn basic() {
99 test_with_runtimes(|| {
100 let one = Handle::current().block_on(async { 1 });
101 assert_eq!(1, one);
102 });
103 }
104
105 #[test]
bounded_mpsc_channel()106 fn bounded_mpsc_channel() {
107 test_with_runtimes(|| {
108 let (tx, mut rx) = mpsc::channel(1024);
109
110 Handle::current().block_on(tx.send(42)).unwrap();
111
112 let value = Handle::current().block_on(rx.recv()).unwrap();
113 assert_eq!(value, 42);
114 });
115 }
116
117 #[test]
unbounded_mpsc_channel()118 fn unbounded_mpsc_channel() {
119 test_with_runtimes(|| {
120 let (tx, mut rx) = mpsc::unbounded_channel();
121
122 let _ = tx.send(42);
123
124 let value = Handle::current().block_on(rx.recv()).unwrap();
125 assert_eq!(value, 42);
126 })
127 }
128
129 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support file operations or bind
130 rt_test! {
131 use tokio::fs;
132 // ==== spawn blocking futures ======
133
134 #[test]
135 fn basic_fs() {
136 let rt = rt();
137 let _enter = rt.enter();
138
139 let contents = Handle::current()
140 .block_on(fs::read_to_string("Cargo.toml"))
141 .unwrap();
142 assert!(contents.contains("https://tokio.rs"));
143 }
144
145 #[test]
146 fn fs_shutdown_before_started() {
147 let rt = rt();
148 let _enter = rt.enter();
149 rt.shutdown_timeout(Duration::from_secs(1000));
150
151 let err: std::io::Error = Handle::current()
152 .block_on(fs::read_to_string("Cargo.toml"))
153 .unwrap_err();
154
155 assert_eq!(err.kind(), std::io::ErrorKind::Other);
156
157 let inner_err = err.get_ref().expect("no inner error");
158 assert_eq!(inner_err.to_string(), "background task failed");
159 }
160
161 #[test]
162 fn basic_spawn_blocking() {
163 use tokio::task::spawn_blocking;
164 let rt = rt();
165 let _enter = rt.enter();
166
167 let answer = Handle::current()
168 .block_on(spawn_blocking(|| {
169 std::thread::sleep(Duration::from_millis(100));
170 42
171 }))
172 .unwrap();
173
174 assert_eq!(answer, 42);
175 }
176
177 #[test]
178 fn spawn_blocking_after_shutdown_fails() {
179 use tokio::task::spawn_blocking;
180 let rt = rt();
181 let _enter = rt.enter();
182 rt.shutdown_timeout(Duration::from_secs(1000));
183
184 let join_err = Handle::current()
185 .block_on(spawn_blocking(|| {
186 std::thread::sleep(Duration::from_millis(100));
187 42
188 }))
189 .unwrap_err();
190
191 assert!(join_err.is_cancelled());
192 }
193
194 #[test]
195 fn spawn_blocking_started_before_shutdown_continues() {
196 use tokio::task::spawn_blocking;
197 let rt = rt();
198 let _enter = rt.enter();
199
200 let handle = spawn_blocking(|| {
201 std::thread::sleep(Duration::from_secs(1));
202 42
203 });
204
205 rt.shutdown_timeout(Duration::from_secs(1000));
206
207 let answer = Handle::current().block_on(handle).unwrap();
208
209 assert_eq!(answer, 42);
210 }
211
212 // ==== net ======
213
214 #[test]
215 #[cfg_attr(miri, ignore)] // No `socket` in miri.
216 fn tcp_listener_bind() {
217 let rt = rt();
218 let _enter = rt.enter();
219
220 Handle::current()
221 .block_on(net::TcpListener::bind("127.0.0.1:0"))
222 .unwrap();
223 }
224
225 // All io tests are ignored for now. See above why that is.
226 #[ignore]
227 #[test]
228 fn tcp_listener_connect_after_shutdown() {
229 let rt = rt();
230 let _enter = rt.enter();
231
232 rt.shutdown_timeout(Duration::from_secs(1000));
233
234 let err = Handle::current()
235 .block_on(net::TcpListener::bind("127.0.0.1:0"))
236 .unwrap_err();
237
238 assert_eq!(err.kind(), std::io::ErrorKind::Other);
239 assert_eq!(
240 err.get_ref().unwrap().to_string(),
241 "A Tokio 1.x context was found, but it is being shutdown.",
242 );
243 }
244
245 // All io tests are ignored for now. See above why that is.
246 #[ignore]
247 #[test]
248 fn tcp_listener_connect_before_shutdown() {
249 let rt = rt();
250 let _enter = rt.enter();
251
252 let bind_future = net::TcpListener::bind("127.0.0.1:0");
253
254 rt.shutdown_timeout(Duration::from_secs(1000));
255
256 let err = Handle::current().block_on(bind_future).unwrap_err();
257
258 assert_eq!(err.kind(), std::io::ErrorKind::Other);
259 assert_eq!(
260 err.get_ref().unwrap().to_string(),
261 "A Tokio 1.x context was found, but it is being shutdown.",
262 );
263 }
264
265 #[test]
266 #[cfg_attr(miri, ignore)] // No `socket` in miri.
267 fn udp_socket_bind() {
268 let rt = rt();
269 let _enter = rt.enter();
270
271 Handle::current()
272 .block_on(net::UdpSocket::bind("127.0.0.1:0"))
273 .unwrap();
274 }
275
276 // All io tests are ignored for now. See above why that is.
277 #[ignore]
278 #[test]
279 fn udp_stream_bind_after_shutdown() {
280 let rt = rt();
281 let _enter = rt.enter();
282
283 rt.shutdown_timeout(Duration::from_secs(1000));
284
285 let err = Handle::current()
286 .block_on(net::UdpSocket::bind("127.0.0.1:0"))
287 .unwrap_err();
288
289 assert_eq!(err.kind(), std::io::ErrorKind::Other);
290 assert_eq!(
291 err.get_ref().unwrap().to_string(),
292 "A Tokio 1.x context was found, but it is being shutdown.",
293 );
294 }
295
296 // All io tests are ignored for now. See above why that is.
297 #[ignore]
298 #[test]
299 fn udp_stream_bind_before_shutdown() {
300 let rt = rt();
301 let _enter = rt.enter();
302
303 let bind_future = net::UdpSocket::bind("127.0.0.1:0");
304
305 rt.shutdown_timeout(Duration::from_secs(1000));
306
307 let err = Handle::current().block_on(bind_future).unwrap_err();
308
309 assert_eq!(err.kind(), std::io::ErrorKind::Other);
310 assert_eq!(
311 err.get_ref().unwrap().to_string(),
312 "A Tokio 1.x context was found, but it is being shutdown.",
313 );
314 }
315
316 // All io tests are ignored for now. See above why that is.
317 #[ignore]
318 #[cfg(unix)]
319 #[test]
320 fn unix_listener_bind_after_shutdown() {
321 let rt = rt();
322 let _enter = rt.enter();
323
324 let dir = tempfile::tempdir().unwrap();
325 let path = dir.path().join("socket");
326
327 rt.shutdown_timeout(Duration::from_secs(1000));
328
329 let err = net::UnixListener::bind(path).unwrap_err();
330
331 assert_eq!(err.kind(), std::io::ErrorKind::Other);
332 assert_eq!(
333 err.get_ref().unwrap().to_string(),
334 "A Tokio 1.x context was found, but it is being shutdown.",
335 );
336 }
337
338 // All io tests are ignored for now. See above why that is.
339 #[ignore]
340 #[cfg(unix)]
341 #[test]
342 fn unix_listener_shutdown_after_bind() {
343 let rt = rt();
344 let _enter = rt.enter();
345
346 let dir = tempfile::tempdir().unwrap();
347 let path = dir.path().join("socket");
348
349 let listener = net::UnixListener::bind(path).unwrap();
350
351 rt.shutdown_timeout(Duration::from_secs(1000));
352
353 // this should not timeout but fail immediately since the runtime has been shutdown
354 let err = Handle::current().block_on(listener.accept()).unwrap_err();
355
356 assert_eq!(err.kind(), std::io::ErrorKind::Other);
357 assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
358 }
359
360 // All io tests are ignored for now. See above why that is.
361 #[ignore]
362 #[cfg(unix)]
363 #[test]
364 fn unix_listener_shutdown_after_accept() {
365 let rt = rt();
366 let _enter = rt.enter();
367
368 let dir = tempfile::tempdir().unwrap();
369 let path = dir.path().join("socket");
370
371 let listener = net::UnixListener::bind(path).unwrap();
372
373 let accept_future = listener.accept();
374
375 rt.shutdown_timeout(Duration::from_secs(1000));
376
377 // this should not timeout but fail immediately since the runtime has been shutdown
378 let err = Handle::current().block_on(accept_future).unwrap_err();
379
380 assert_eq!(err.kind(), std::io::ErrorKind::Other);
381 assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
382 }
383
384 // ==== nesting ======
385
386 #[test]
387 #[should_panic(
388 expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks."
389 )]
390 fn nesting() {
391 fn some_non_async_function() -> i32 {
392 Handle::current().block_on(time::sleep(Duration::from_millis(10)));
393 1
394 }
395
396 let rt = rt();
397
398 rt.block_on(async { some_non_async_function() });
399 }
400
401 #[test]
402 fn spawn_after_runtime_dropped() {
403 use futures::future::FutureExt;
404
405 let rt = rt();
406
407 let handle = rt.block_on(async move {
408 Handle::current()
409 });
410
411 let jh1 = handle.spawn(futures::future::pending::<()>());
412
413 drop(rt);
414
415 let jh2 = handle.spawn(futures::future::pending::<()>());
416
417 let err1 = jh1.now_or_never().unwrap().unwrap_err();
418 let err2 = jh2.now_or_never().unwrap().unwrap_err();
419 assert!(err1.is_cancelled());
420 assert!(err2.is_cancelled());
421 }
422 }
423
424 #[cfg(not(target_os = "wasi"))]
425 multi_threaded_rt_test! {
426 #[cfg(unix)]
427 #[cfg_attr(miri, ignore)] // No `socket` in miri.
428 #[test]
429 fn unix_listener_bind() {
430 let rt = rt();
431 let _enter = rt.enter();
432
433 let dir = tempfile::tempdir().unwrap();
434 let path = dir.path().join("socket");
435
436 let listener = net::UnixListener::bind(path).unwrap();
437
438 // this should timeout and not fail immediately since the runtime has not been shutdown
439 let _: tokio::time::error::Elapsed = Handle::current()
440 .block_on(tokio::time::timeout(
441 Duration::from_millis(10),
442 listener.accept(),
443 ))
444 .unwrap_err();
445 }
446
447 // ==== timers ======
448
449 // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no
450 // one to drive the timers so they will just hang forever. Therefore they are not tested.
451
452 #[test]
453 fn sleep() {
454 let rt = rt();
455 let _enter = rt.enter();
456
457 Handle::current().block_on(time::sleep(Duration::from_millis(100)));
458 }
459
460 #[test]
461 #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
462 fn sleep_before_shutdown_panics() {
463 let rt = rt();
464 let _enter = rt.enter();
465
466 let f = time::sleep(Duration::from_millis(100));
467
468 rt.shutdown_timeout(Duration::from_secs(1000));
469
470 Handle::current().block_on(f);
471 }
472
473 #[test]
474 #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
475 fn sleep_after_shutdown_panics() {
476 let rt = rt();
477 let _enter = rt.enter();
478
479 rt.shutdown_timeout(Duration::from_secs(1000));
480
481 Handle::current().block_on(time::sleep(Duration::from_millis(100)));
482 }
483 }
484
485 // ==== utils ======
486
487 /// Create a new multi threaded runtime
488 #[cfg(not(target_os = "wasi"))]
new_multi_thread(n: usize) -> Runtime489 fn new_multi_thread(n: usize) -> Runtime {
490 tokio::runtime::Builder::new_multi_thread()
491 .worker_threads(n)
492 .enable_all()
493 .build()
494 .unwrap()
495 }
496
497 /// Create a new single threaded runtime
new_current_thread() -> Runtime498 fn new_current_thread() -> Runtime {
499 tokio::runtime::Builder::new_current_thread()
500 .enable_all()
501 .build()
502 .unwrap()
503 }
504
505 /// Utility to test things on both kinds of runtimes both before and after shutting it down.
test_with_runtimes<F>(f: F) where F: Fn(),506 fn test_with_runtimes<F>(f: F)
507 where
508 F: Fn(),
509 {
510 {
511 let rt = new_current_thread();
512 let _enter = rt.enter();
513 f();
514
515 rt.shutdown_timeout(Duration::from_secs(1000));
516 f();
517 }
518
519 #[cfg(not(target_os = "wasi"))]
520 {
521 let rt = new_multi_thread(1);
522 let _enter = rt.enter();
523 f();
524
525 rt.shutdown_timeout(Duration::from_secs(1000));
526 f();
527 }
528
529 #[cfg(not(target_os = "wasi"))]
530 {
531 let rt = new_multi_thread(4);
532 let _enter = rt.enter();
533 f();
534
535 rt.shutdown_timeout(Duration::from_secs(1000));
536 f();
537 }
538 }
539