1 #![allow(unknown_lints, unexpected_cfgs)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(all(
4     feature = "full",
5     tokio_unstable,
6     not(target_os = "wasi"),
7     target_has_atomic = "64"
8 ))]
9 
10 use std::future::Future;
11 use std::sync::{mpsc, Arc, Mutex};
12 use std::task::Poll;
13 use std::thread;
14 use tokio::macros::support::poll_fn;
15 
16 use tokio::runtime::{HistogramConfiguration, HistogramScale, LogHistogram, Runtime};
17 use tokio::task::consume_budget;
18 use tokio::time::{self, Duration};
19 
20 #[test]
num_workers()21 fn num_workers() {
22     let rt = current_thread();
23     assert_eq!(1, rt.metrics().num_workers());
24 
25     let rt = threaded();
26     assert_eq!(2, rt.metrics().num_workers());
27 }
28 
29 #[test]
num_blocking_threads()30 fn num_blocking_threads() {
31     let rt = current_thread();
32     assert_eq!(0, rt.metrics().num_blocking_threads());
33     let _ = rt.block_on(rt.spawn_blocking(move || {}));
34     assert_eq!(1, rt.metrics().num_blocking_threads());
35 
36     let rt = threaded();
37     assert_eq!(0, rt.metrics().num_blocking_threads());
38     let _ = rt.block_on(rt.spawn_blocking(move || {}));
39     assert_eq!(1, rt.metrics().num_blocking_threads());
40 }
41 
42 #[test]
num_idle_blocking_threads()43 fn num_idle_blocking_threads() {
44     let rt = current_thread();
45     assert_eq!(0, rt.metrics().num_idle_blocking_threads());
46     let _ = rt.block_on(rt.spawn_blocking(move || {}));
47     rt.block_on(async {
48         time::sleep(Duration::from_millis(5)).await;
49     });
50 
51     // We need to wait until the blocking thread has become idle. Usually 5ms is
52     // enough for this to happen, but not always. When it isn't enough, sleep
53     // for another second. We don't always wait for a whole second since we want
54     // the test suite to finish quickly.
55     //
56     // Note that the timeout for idle threads to be killed is 10 seconds.
57     if 0 == rt.metrics().num_idle_blocking_threads() {
58         rt.block_on(async {
59             time::sleep(Duration::from_secs(1)).await;
60         });
61     }
62 
63     assert_eq!(1, rt.metrics().num_idle_blocking_threads());
64 }
65 
66 #[test]
blocking_queue_depth()67 fn blocking_queue_depth() {
68     let rt = tokio::runtime::Builder::new_current_thread()
69         .enable_all()
70         .max_blocking_threads(1)
71         .build()
72         .unwrap();
73 
74     assert_eq!(0, rt.metrics().blocking_queue_depth());
75 
76     let ready = Arc::new(Mutex::new(()));
77     let guard = ready.lock().unwrap();
78 
79     let ready_cloned = ready.clone();
80     let wait_until_ready = move || {
81         let _unused = ready_cloned.lock().unwrap();
82     };
83 
84     let h1 = rt.spawn_blocking(wait_until_ready.clone());
85     let h2 = rt.spawn_blocking(wait_until_ready);
86     assert!(rt.metrics().blocking_queue_depth() > 0);
87 
88     drop(guard);
89 
90     let _ = rt.block_on(h1);
91     let _ = rt.block_on(h2);
92 
93     assert_eq!(0, rt.metrics().blocking_queue_depth());
94 }
95 
96 #[test]
spawned_tasks_count()97 fn spawned_tasks_count() {
98     let rt = current_thread();
99     let metrics = rt.metrics();
100     assert_eq!(0, metrics.spawned_tasks_count());
101 
102     rt.block_on(rt.spawn(async move {
103         assert_eq!(1, metrics.spawned_tasks_count());
104     }))
105     .unwrap();
106 
107     assert_eq!(1, rt.metrics().spawned_tasks_count());
108 
109     let rt = threaded();
110     let metrics = rt.metrics();
111     assert_eq!(0, metrics.spawned_tasks_count());
112 
113     rt.block_on(rt.spawn(async move {
114         assert_eq!(1, metrics.spawned_tasks_count());
115     }))
116     .unwrap();
117 
118     assert_eq!(1, rt.metrics().spawned_tasks_count());
119 }
120 
121 #[test]
remote_schedule_count()122 fn remote_schedule_count() {
123     use std::thread;
124 
125     let rt = current_thread();
126     let handle = rt.handle().clone();
127     let task = thread::spawn(move || {
128         handle.spawn(async {
129             // DO nothing
130         })
131     })
132     .join()
133     .unwrap();
134 
135     rt.block_on(task).unwrap();
136 
137     assert_eq!(1, rt.metrics().remote_schedule_count());
138 
139     let rt = threaded();
140     let handle = rt.handle().clone();
141     let task = thread::spawn(move || {
142         handle.spawn(async {
143             // DO nothing
144         })
145     })
146     .join()
147     .unwrap();
148 
149     rt.block_on(task).unwrap();
150 
151     assert_eq!(1, rt.metrics().remote_schedule_count());
152 }
153 
154 #[test]
worker_thread_id_current_thread()155 fn worker_thread_id_current_thread() {
156     let rt = current_thread();
157     let metrics = rt.metrics();
158 
159     // Check that runtime is on this thread.
160     rt.block_on(async {});
161     assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0));
162 
163     // Move runtime to another thread.
164     let thread_id = std::thread::scope(|scope| {
165         let join_handle = scope.spawn(|| {
166             rt.block_on(async {});
167         });
168         join_handle.thread().id()
169     });
170     assert_eq!(Some(thread_id), metrics.worker_thread_id(0));
171 
172     // Move runtime back to this thread.
173     rt.block_on(async {});
174     assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0));
175 }
176 
177 #[test]
worker_thread_id_threaded()178 fn worker_thread_id_threaded() {
179     let rt = threaded();
180     let metrics = rt.metrics();
181 
182     rt.block_on(rt.spawn(async move {
183         // Check that we are running on a worker thread and determine
184         // the index of our worker.
185         let thread_id = std::thread::current().id();
186         let this_worker = (0..2)
187             .position(|w| metrics.worker_thread_id(w) == Some(thread_id))
188             .expect("task not running on any worker thread");
189 
190         // Force worker to another thread.
191         let moved_thread_id = tokio::task::block_in_place(|| {
192             assert_eq!(thread_id, std::thread::current().id());
193 
194             // Wait for worker to move to another thread.
195             for _ in 0..100 {
196                 let new_id = metrics.worker_thread_id(this_worker).unwrap();
197                 if thread_id != new_id {
198                     return new_id;
199                 }
200                 std::thread::sleep(Duration::from_millis(100));
201             }
202 
203             panic!("worker did not move to new thread");
204         });
205 
206         // After blocking task worker either stays on new thread or
207         // is moved back to current thread.
208         assert!(
209             metrics.worker_thread_id(this_worker) == Some(moved_thread_id)
210                 || metrics.worker_thread_id(this_worker) == Some(thread_id)
211         );
212     }))
213     .unwrap()
214 }
215 
216 #[test]
worker_park_count()217 fn worker_park_count() {
218     let rt = current_thread();
219     let metrics = rt.metrics();
220     rt.block_on(async {
221         time::sleep(Duration::from_millis(1)).await;
222     });
223     drop(rt);
224     assert!(1 <= metrics.worker_park_count(0));
225 
226     let rt = threaded();
227     let metrics = rt.metrics();
228     rt.block_on(async {
229         time::sleep(Duration::from_millis(1)).await;
230     });
231     drop(rt);
232     assert!(1 <= metrics.worker_park_count(0));
233     assert!(1 <= metrics.worker_park_count(1));
234 }
235 
236 #[test]
worker_park_unpark_count()237 fn worker_park_unpark_count() {
238     let rt = current_thread();
239     let metrics = rt.metrics();
240     rt.block_on(rt.spawn(async {})).unwrap();
241     drop(rt);
242     assert!(2 <= metrics.worker_park_unpark_count(0));
243 
244     let rt = threaded();
245     let metrics = rt.metrics();
246 
247     // Wait for workers to be parked after runtime startup.
248     for _ in 0..100 {
249         if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) {
250             break;
251         }
252         std::thread::sleep(std::time::Duration::from_millis(100));
253     }
254     assert_eq!(1, metrics.worker_park_unpark_count(0));
255     assert_eq!(1, metrics.worker_park_unpark_count(1));
256 
257     // Spawn a task to unpark and then park a worker.
258     rt.block_on(rt.spawn(async {})).unwrap();
259     for _ in 0..100 {
260         if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) {
261             break;
262         }
263         std::thread::sleep(std::time::Duration::from_millis(100));
264     }
265     assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1));
266 
267     // Both threads unpark for runtime shutdown.
268     drop(rt);
269     assert_eq!(0, metrics.worker_park_unpark_count(0) % 2);
270     assert_eq!(0, metrics.worker_park_unpark_count(1) % 2);
271     assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1));
272 }
273 
274 #[test]
worker_noop_count()275 fn worker_noop_count() {
276     // There isn't really a great way to generate no-op parks as they happen as
277     // false-positive events under concurrency.
278 
279     let rt = current_thread();
280     let metrics = rt.metrics();
281     rt.block_on(async {
282         time::sleep(Duration::from_millis(1)).await;
283     });
284     drop(rt);
285     assert!(0 < metrics.worker_noop_count(0));
286 
287     let rt = threaded();
288     let metrics = rt.metrics();
289     rt.block_on(async {
290         time::sleep(Duration::from_millis(1)).await;
291     });
292     drop(rt);
293     assert!(0 < metrics.worker_noop_count(0));
294     assert!(0 < metrics.worker_noop_count(1));
295 }
296 
297 #[test]
worker_steal_count()298 fn worker_steal_count() {
299     // This metric only applies to the multi-threaded runtime.
300     for _ in 0..10 {
301         let rt = threaded_no_lifo();
302         let metrics = rt.metrics();
303 
304         let successfully_spawned_stealable_task = rt.block_on(async {
305             // The call to `try_spawn_stealable_task` may time out, which means
306             // that the sending task couldn't be scheduled due to a deadlock in
307             // the runtime.
308             // This is expected behaviour, we just retry until we succeed or
309             // exhaust all tries, the latter causing this test to fail.
310             try_spawn_stealable_task().await.is_ok()
311         });
312 
313         drop(rt);
314 
315         if successfully_spawned_stealable_task {
316             let n: u64 = (0..metrics.num_workers())
317                 .map(|i| metrics.worker_steal_count(i))
318                 .sum();
319 
320             assert_eq!(1, n);
321             return;
322         }
323     }
324 
325     panic!("exhausted every try to schedule the stealable task");
326 }
327 
328 #[test]
worker_poll_count_and_time()329 fn worker_poll_count_and_time() {
330     const N: u64 = 5;
331 
332     async fn task() {
333         // Sync sleep
334         std::thread::sleep(std::time::Duration::from_micros(10));
335     }
336 
337     let rt = current_thread();
338     let metrics = rt.metrics();
339     rt.block_on(async {
340         for _ in 0..N {
341             tokio::spawn(task()).await.unwrap();
342         }
343     });
344     drop(rt);
345     assert_eq!(N, metrics.worker_poll_count(0));
346     // Not currently supported for current-thread runtime
347     assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0));
348 
349     // Does not populate the histogram
350     assert!(!metrics.poll_time_histogram_enabled());
351     for i in 0..10 {
352         assert_eq!(0, metrics.poll_time_histogram_bucket_count(0, i));
353     }
354 
355     let rt = threaded();
356     let metrics = rt.metrics();
357     rt.block_on(async {
358         for _ in 0..N {
359             tokio::spawn(task()).await.unwrap();
360         }
361     });
362     drop(rt);
363     // Account for the `block_on` task
364     let n = (0..metrics.num_workers())
365         .map(|i| metrics.worker_poll_count(i))
366         .sum();
367 
368     assert_eq!(N, n);
369 
370     let n: Duration = (0..metrics.num_workers())
371         .map(|i| metrics.worker_mean_poll_time(i))
372         .sum();
373 
374     assert!(n > Duration::default());
375 
376     // Does not populate the histogram
377     assert!(!metrics.poll_time_histogram_enabled());
378     for n in 0..metrics.num_workers() {
379         for i in 0..10 {
380             assert_eq!(0, metrics.poll_time_histogram_bucket_count(n, i));
381         }
382     }
383 }
384 
385 #[test]
log_histogram()386 fn log_histogram() {
387     const N: u64 = 50;
388     let rt = tokio::runtime::Builder::new_current_thread()
389         .enable_all()
390         .enable_metrics_poll_time_histogram()
391         .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
392             LogHistogram::builder()
393                 .max_value(Duration::from_secs(60))
394                 .min_value(Duration::from_nanos(100))
395                 .max_error(0.25),
396         ))
397         .build()
398         .unwrap();
399     let metrics = rt.metrics();
400     let num_buckets = rt.metrics().poll_time_histogram_num_buckets();
401     assert_eq!(num_buckets, 119);
402     rt.block_on(async {
403         for _ in 0..N {
404             tokio::spawn(async {}).await.unwrap();
405         }
406     });
407     drop(rt);
408     assert_eq!(
409         metrics.poll_time_histogram_bucket_range(0),
410         Duration::from_nanos(0)..Duration::from_nanos(96)
411     );
412     assert_eq!(
413         metrics.poll_time_histogram_bucket_range(1),
414         Duration::from_nanos(96)..Duration::from_nanos(96 + 2_u64.pow(4))
415     );
416     assert_eq!(
417         metrics.poll_time_histogram_bucket_range(118).end,
418         Duration::from_nanos(u64::MAX)
419     );
420     let n = (0..metrics.num_workers())
421         .flat_map(|i| (0..num_buckets).map(move |j| (i, j)))
422         .map(|(worker, bucket)| metrics.poll_time_histogram_bucket_count(worker, bucket))
423         .sum();
424     assert_eq!(N, n);
425 }
426 
427 #[test]
428 #[allow(deprecated)]
legacy_log_histogram()429 fn legacy_log_histogram() {
430     let rt = tokio::runtime::Builder::new_multi_thread()
431         .enable_all()
432         .enable_metrics_poll_time_histogram()
433         .metrics_poll_count_histogram_scale(HistogramScale::Log)
434         .metrics_poll_count_histogram_resolution(Duration::from_micros(50))
435         .metrics_poll_count_histogram_buckets(20)
436         .build()
437         .unwrap();
438     let num_buckets = rt.metrics().poll_time_histogram_num_buckets();
439     assert_eq!(num_buckets, 20);
440 }
441 
442 #[test]
log_histogram_default_configuration()443 fn log_histogram_default_configuration() {
444     let rt = tokio::runtime::Builder::new_current_thread()
445         .enable_all()
446         .enable_metrics_poll_time_histogram()
447         .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
448             LogHistogram::default(),
449         ))
450         .build()
451         .unwrap();
452     let num_buckets = rt.metrics().poll_time_histogram_num_buckets();
453     assert_eq!(num_buckets, 119);
454 }
455 
456 #[test]
worker_poll_count_histogram()457 fn worker_poll_count_histogram() {
458     const N: u64 = 5;
459 
460     let rts = [
461         tokio::runtime::Builder::new_current_thread()
462             .enable_all()
463             .enable_metrics_poll_time_histogram()
464             .metrics_poll_time_histogram_configuration(HistogramConfiguration::linear(
465                 Duration::from_millis(50),
466                 3,
467             ))
468             .build()
469             .unwrap(),
470         tokio::runtime::Builder::new_multi_thread()
471             .worker_threads(2)
472             .enable_all()
473             .enable_metrics_poll_time_histogram()
474             .metrics_poll_time_histogram_configuration(HistogramConfiguration::linear(
475                 Duration::from_millis(50),
476                 3,
477             ))
478             .build()
479             .unwrap(),
480     ];
481 
482     for rt in rts {
483         let metrics = rt.metrics();
484         rt.block_on(async {
485             for _ in 0..N {
486                 tokio::spawn(async {}).await.unwrap();
487             }
488         });
489         drop(rt);
490 
491         let num_workers = metrics.num_workers();
492         let num_buckets = metrics.poll_time_histogram_num_buckets();
493 
494         assert!(metrics.poll_time_histogram_enabled());
495         assert_eq!(num_buckets, 3);
496 
497         let n = (0..num_workers)
498             .flat_map(|i| (0..num_buckets).map(move |j| (i, j)))
499             .map(|(worker, bucket)| metrics.poll_time_histogram_bucket_count(worker, bucket))
500             .sum();
501         assert_eq!(N, n);
502     }
503 }
504 
505 #[test]
worker_poll_count_histogram_range()506 fn worker_poll_count_histogram_range() {
507     let max = Duration::from_nanos(u64::MAX);
508 
509     let rt = tokio::runtime::Builder::new_current_thread()
510         .enable_all()
511         .enable_metrics_poll_time_histogram()
512         .metrics_poll_time_histogram_configuration(HistogramConfiguration::linear(us(50), 3))
513         .build()
514         .unwrap();
515     let metrics = rt.metrics();
516 
517     assert_eq!(metrics.poll_time_histogram_bucket_range(0), us(0)..us(50));
518     assert_eq!(metrics.poll_time_histogram_bucket_range(1), us(50)..us(100));
519     assert_eq!(metrics.poll_time_histogram_bucket_range(2), us(100)..max);
520 
521     // ensure the old methods work too
522     #[allow(deprecated)]
523     let rt = tokio::runtime::Builder::new_current_thread()
524         .enable_all()
525         .enable_metrics_poll_time_histogram()
526         .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Log)
527         .metrics_poll_count_histogram_buckets(3)
528         .metrics_poll_count_histogram_resolution(us(50))
529         .build()
530         .unwrap();
531     let metrics = rt.metrics();
532 
533     let a = Duration::from_nanos(50000_u64.next_power_of_two());
534     let b = a * 2;
535 
536     assert_eq!(metrics.poll_time_histogram_bucket_range(0), us(0)..a);
537     assert_eq!(metrics.poll_time_histogram_bucket_range(1), a..b);
538     assert_eq!(metrics.poll_time_histogram_bucket_range(2), b..max);
539 }
540 
541 #[test]
worker_poll_count_histogram_disabled_without_explicit_enable()542 fn worker_poll_count_histogram_disabled_without_explicit_enable() {
543     let rts = [
544         tokio::runtime::Builder::new_current_thread()
545             .enable_all()
546             .metrics_poll_time_histogram_configuration(HistogramConfiguration::linear(
547                 Duration::from_millis(50),
548                 3,
549             ))
550             .build()
551             .unwrap(),
552         tokio::runtime::Builder::new_multi_thread()
553             .worker_threads(2)
554             .enable_all()
555             .metrics_poll_time_histogram_configuration(HistogramConfiguration::linear(
556                 Duration::from_millis(50),
557                 3,
558             ))
559             .build()
560             .unwrap(),
561     ];
562 
563     for rt in rts {
564         let metrics = rt.metrics();
565         assert!(!metrics.poll_time_histogram_enabled());
566     }
567 }
568 
569 #[test]
worker_total_busy_duration()570 fn worker_total_busy_duration() {
571     const N: usize = 5;
572 
573     let zero = Duration::from_millis(0);
574 
575     let rt = current_thread();
576     let metrics = rt.metrics();
577 
578     rt.block_on(async {
579         for _ in 0..N {
580             tokio::spawn(async {
581                 tokio::task::yield_now().await;
582             })
583             .await
584             .unwrap();
585         }
586     });
587 
588     drop(rt);
589 
590     assert!(zero < metrics.worker_total_busy_duration(0));
591 
592     let rt = threaded();
593     let metrics = rt.metrics();
594 
595     rt.block_on(async {
596         for _ in 0..N {
597             tokio::spawn(async {
598                 tokio::task::yield_now().await;
599             })
600             .await
601             .unwrap();
602         }
603     });
604 
605     drop(rt);
606 
607     for i in 0..metrics.num_workers() {
608         assert!(zero < metrics.worker_total_busy_duration(i));
609     }
610 }
611 
612 #[test]
worker_local_schedule_count()613 fn worker_local_schedule_count() {
614     let rt = current_thread();
615     let metrics = rt.metrics();
616     rt.block_on(async {
617         tokio::spawn(async {}).await.unwrap();
618     });
619     drop(rt);
620 
621     assert_eq!(1, metrics.worker_local_schedule_count(0));
622     assert_eq!(0, metrics.remote_schedule_count());
623 
624     let rt = threaded();
625     let metrics = rt.metrics();
626     rt.block_on(async {
627         // Move to the runtime
628         tokio::spawn(async {
629             tokio::spawn(async {}).await.unwrap();
630         })
631         .await
632         .unwrap();
633     });
634     drop(rt);
635 
636     let n: u64 = (0..metrics.num_workers())
637         .map(|i| metrics.worker_local_schedule_count(i))
638         .sum();
639 
640     assert_eq!(2, n);
641     assert_eq!(1, metrics.remote_schedule_count());
642 }
643 
644 #[test]
worker_overflow_count()645 fn worker_overflow_count() {
646     // Only applies to the threaded worker
647     let rt = threaded();
648     let metrics = rt.metrics();
649     rt.block_on(async {
650         // Move to the runtime
651         tokio::spawn(async {
652             let (tx1, rx1) = std::sync::mpsc::channel();
653             let (tx2, rx2) = std::sync::mpsc::channel();
654 
655             // First, we need to block the other worker until all tasks have
656             // been spawned.
657             //
658             // We spawn from outside the runtime to ensure that the other worker
659             // will pick it up:
660             // <https://github.com/tokio-rs/tokio/issues/4730>
661             tokio::task::spawn_blocking(|| {
662                 tokio::spawn(async move {
663                     tx1.send(()).unwrap();
664                     rx2.recv().unwrap();
665                 });
666             });
667 
668             rx1.recv().unwrap();
669 
670             // Spawn many tasks
671             for _ in 0..300 {
672                 tokio::spawn(async {});
673             }
674 
675             tx2.send(()).unwrap();
676         })
677         .await
678         .unwrap();
679     });
680     drop(rt);
681 
682     let n: u64 = (0..metrics.num_workers())
683         .map(|i| metrics.worker_overflow_count(i))
684         .sum();
685 
686     assert_eq!(1, n);
687 }
688 
689 #[test]
worker_local_queue_depth()690 fn worker_local_queue_depth() {
691     const N: usize = 100;
692 
693     let rt = current_thread();
694     let metrics = rt.metrics();
695     rt.block_on(async {
696         for _ in 0..N {
697             tokio::spawn(async {});
698         }
699 
700         assert_eq!(N, metrics.worker_local_queue_depth(0));
701     });
702 
703     let rt = threaded();
704     let metrics = rt.metrics();
705     rt.block_on(async move {
706         // Move to the runtime
707         tokio::spawn(async move {
708             let (tx1, rx1) = std::sync::mpsc::channel();
709             let (tx2, rx2) = std::sync::mpsc::channel();
710 
711             // First, we need to block the other worker until all tasks have
712             // been spawned.
713             tokio::spawn(async move {
714                 tx1.send(()).unwrap();
715                 rx2.recv().unwrap();
716             });
717 
718             // Bump the next-run spawn
719             tokio::spawn(async {});
720 
721             rx1.recv().unwrap();
722 
723             // Spawn some tasks
724             for _ in 0..100 {
725                 tokio::spawn(async {});
726             }
727 
728             let n: usize = (0..metrics.num_workers())
729                 .map(|i| metrics.worker_local_queue_depth(i))
730                 .sum();
731 
732             assert_eq!(n, N);
733 
734             tx2.send(()).unwrap();
735         })
736         .await
737         .unwrap();
738     });
739 }
740 
741 #[test]
budget_exhaustion_yield()742 fn budget_exhaustion_yield() {
743     let rt = current_thread();
744     let metrics = rt.metrics();
745 
746     assert_eq!(0, metrics.budget_forced_yield_count());
747 
748     let mut did_yield = false;
749 
750     // block on a task which consumes budget until it yields
751     rt.block_on(poll_fn(|cx| loop {
752         if did_yield {
753             return Poll::Ready(());
754         }
755 
756         let fut = consume_budget();
757         tokio::pin!(fut);
758 
759         if fut.poll(cx).is_pending() {
760             did_yield = true;
761             return Poll::Pending;
762         }
763     }));
764 
765     assert_eq!(1, rt.metrics().budget_forced_yield_count());
766 }
767 
768 #[test]
budget_exhaustion_yield_with_joins()769 fn budget_exhaustion_yield_with_joins() {
770     let rt = current_thread();
771     let metrics = rt.metrics();
772 
773     assert_eq!(0, metrics.budget_forced_yield_count());
774 
775     let mut did_yield_1 = false;
776     let mut did_yield_2 = false;
777 
778     // block on a task which consumes budget until it yields
779     rt.block_on(async {
780         tokio::join!(
781             poll_fn(|cx| loop {
782                 if did_yield_1 {
783                     return Poll::Ready(());
784                 }
785 
786                 let fut = consume_budget();
787                 tokio::pin!(fut);
788 
789                 if fut.poll(cx).is_pending() {
790                     did_yield_1 = true;
791                     return Poll::Pending;
792                 }
793             }),
794             poll_fn(|cx| loop {
795                 if did_yield_2 {
796                     return Poll::Ready(());
797                 }
798 
799                 let fut = consume_budget();
800                 tokio::pin!(fut);
801 
802                 if fut.poll(cx).is_pending() {
803                     did_yield_2 = true;
804                     return Poll::Pending;
805                 }
806             })
807         )
808     });
809 
810     assert_eq!(1, rt.metrics().budget_forced_yield_count());
811 }
812 
813 #[cfg(any(target_os = "linux", target_os = "macos"))]
814 #[test]
io_driver_fd_count()815 fn io_driver_fd_count() {
816     let rt = current_thread();
817     let metrics = rt.metrics();
818 
819     assert_eq!(metrics.io_driver_fd_registered_count(), 0);
820 
821     let stream = tokio::net::TcpStream::connect("google.com:80");
822     let stream = rt.block_on(async move { stream.await.unwrap() });
823 
824     assert_eq!(metrics.io_driver_fd_registered_count(), 1);
825     assert_eq!(metrics.io_driver_fd_deregistered_count(), 0);
826 
827     drop(stream);
828 
829     assert_eq!(metrics.io_driver_fd_deregistered_count(), 1);
830     assert_eq!(metrics.io_driver_fd_registered_count(), 1);
831 }
832 
833 #[cfg(any(target_os = "linux", target_os = "macos"))]
834 #[test]
io_driver_ready_count()835 fn io_driver_ready_count() {
836     let rt = current_thread();
837     let metrics = rt.metrics();
838 
839     let stream = tokio::net::TcpStream::connect("google.com:80");
840     let _stream = rt.block_on(async move { stream.await.unwrap() });
841 
842     assert_eq!(metrics.io_driver_ready_count(), 1);
843 }
844 
try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError>845 async fn try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError> {
846     // We use a blocking channel to synchronize the tasks.
847     let (tx, rx) = mpsc::channel();
848 
849     // Make sure we are in the context of the runtime.
850     tokio::spawn(async move {
851         // Spawn the task that sends to the channel.
852         //
853         // Note that the runtime needs to have the lifo slot disabled to make
854         // this task stealable.
855         tokio::spawn(async move {
856             tx.send(()).unwrap();
857         });
858 
859         // Blocking receive on the channel, timing out if the sending task
860         // wasn't scheduled in time.
861         rx.recv_timeout(Duration::from_secs(1))
862     })
863     .await
864     .unwrap()?;
865 
866     Ok(())
867 }
868 
current_thread() -> Runtime869 fn current_thread() -> Runtime {
870     tokio::runtime::Builder::new_current_thread()
871         .enable_all()
872         .build()
873         .unwrap()
874 }
875 
threaded() -> Runtime876 fn threaded() -> Runtime {
877     tokio::runtime::Builder::new_multi_thread()
878         .worker_threads(2)
879         .enable_all()
880         .build()
881         .unwrap()
882 }
883 
threaded_no_lifo() -> Runtime884 fn threaded_no_lifo() -> Runtime {
885     tokio::runtime::Builder::new_multi_thread()
886         .worker_threads(2)
887         .disable_lifo_slot()
888         .enable_all()
889         .build()
890         .unwrap()
891 }
892 
us(n: u64) -> Duration893 fn us(n: u64) -> Duration {
894     Duration::from_micros(n)
895 }
896