1 #![allow(unknown_lints, unexpected_cfgs)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(all(
4 feature = "full",
5 tokio_unstable,
6 not(target_os = "wasi"),
7 target_has_atomic = "64"
8 ))]
9
10 use std::future::Future;
11 use std::sync::{mpsc, Arc, Mutex};
12 use std::task::Poll;
13 use std::thread;
14 use tokio::macros::support::poll_fn;
15
16 use tokio::runtime::{HistogramConfiguration, HistogramScale, LogHistogram, Runtime};
17 use tokio::task::consume_budget;
18 use tokio::time::{self, Duration};
19
20 #[test]
num_workers()21 fn num_workers() {
22 let rt = current_thread();
23 assert_eq!(1, rt.metrics().num_workers());
24
25 let rt = threaded();
26 assert_eq!(2, rt.metrics().num_workers());
27 }
28
29 #[test]
num_blocking_threads()30 fn num_blocking_threads() {
31 let rt = current_thread();
32 assert_eq!(0, rt.metrics().num_blocking_threads());
33 let _ = rt.block_on(rt.spawn_blocking(move || {}));
34 assert_eq!(1, rt.metrics().num_blocking_threads());
35
36 let rt = threaded();
37 assert_eq!(0, rt.metrics().num_blocking_threads());
38 let _ = rt.block_on(rt.spawn_blocking(move || {}));
39 assert_eq!(1, rt.metrics().num_blocking_threads());
40 }
41
42 #[test]
num_idle_blocking_threads()43 fn num_idle_blocking_threads() {
44 let rt = current_thread();
45 assert_eq!(0, rt.metrics().num_idle_blocking_threads());
46 let _ = rt.block_on(rt.spawn_blocking(move || {}));
47 rt.block_on(async {
48 time::sleep(Duration::from_millis(5)).await;
49 });
50
51 // We need to wait until the blocking thread has become idle. Usually 5ms is
52 // enough for this to happen, but not always. When it isn't enough, sleep
53 // for another second. We don't always wait for a whole second since we want
54 // the test suite to finish quickly.
55 //
56 // Note that the timeout for idle threads to be killed is 10 seconds.
57 if 0 == rt.metrics().num_idle_blocking_threads() {
58 rt.block_on(async {
59 time::sleep(Duration::from_secs(1)).await;
60 });
61 }
62
63 assert_eq!(1, rt.metrics().num_idle_blocking_threads());
64 }
65
66 #[test]
blocking_queue_depth()67 fn blocking_queue_depth() {
68 let rt = tokio::runtime::Builder::new_current_thread()
69 .enable_all()
70 .max_blocking_threads(1)
71 .build()
72 .unwrap();
73
74 assert_eq!(0, rt.metrics().blocking_queue_depth());
75
76 let ready = Arc::new(Mutex::new(()));
77 let guard = ready.lock().unwrap();
78
79 let ready_cloned = ready.clone();
80 let wait_until_ready = move || {
81 let _unused = ready_cloned.lock().unwrap();
82 };
83
84 let h1 = rt.spawn_blocking(wait_until_ready.clone());
85 let h2 = rt.spawn_blocking(wait_until_ready);
86 assert!(rt.metrics().blocking_queue_depth() > 0);
87
88 drop(guard);
89
90 let _ = rt.block_on(h1);
91 let _ = rt.block_on(h2);
92
93 assert_eq!(0, rt.metrics().blocking_queue_depth());
94 }
95
96 #[test]
spawned_tasks_count()97 fn spawned_tasks_count() {
98 let rt = current_thread();
99 let metrics = rt.metrics();
100 assert_eq!(0, metrics.spawned_tasks_count());
101
102 rt.block_on(rt.spawn(async move {
103 assert_eq!(1, metrics.spawned_tasks_count());
104 }))
105 .unwrap();
106
107 assert_eq!(1, rt.metrics().spawned_tasks_count());
108
109 let rt = threaded();
110 let metrics = rt.metrics();
111 assert_eq!(0, metrics.spawned_tasks_count());
112
113 rt.block_on(rt.spawn(async move {
114 assert_eq!(1, metrics.spawned_tasks_count());
115 }))
116 .unwrap();
117
118 assert_eq!(1, rt.metrics().spawned_tasks_count());
119 }
120
121 #[test]
remote_schedule_count()122 fn remote_schedule_count() {
123 use std::thread;
124
125 let rt = current_thread();
126 let handle = rt.handle().clone();
127 let task = thread::spawn(move || {
128 handle.spawn(async {
129 // DO nothing
130 })
131 })
132 .join()
133 .unwrap();
134
135 rt.block_on(task).unwrap();
136
137 assert_eq!(1, rt.metrics().remote_schedule_count());
138
139 let rt = threaded();
140 let handle = rt.handle().clone();
141 let task = thread::spawn(move || {
142 handle.spawn(async {
143 // DO nothing
144 })
145 })
146 .join()
147 .unwrap();
148
149 rt.block_on(task).unwrap();
150
151 assert_eq!(1, rt.metrics().remote_schedule_count());
152 }
153
154 #[test]
worker_thread_id_current_thread()155 fn worker_thread_id_current_thread() {
156 let rt = current_thread();
157 let metrics = rt.metrics();
158
159 // Check that runtime is on this thread.
160 rt.block_on(async {});
161 assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0));
162
163 // Move runtime to another thread.
164 let thread_id = std::thread::scope(|scope| {
165 let join_handle = scope.spawn(|| {
166 rt.block_on(async {});
167 });
168 join_handle.thread().id()
169 });
170 assert_eq!(Some(thread_id), metrics.worker_thread_id(0));
171
172 // Move runtime back to this thread.
173 rt.block_on(async {});
174 assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0));
175 }
176
177 #[test]
worker_thread_id_threaded()178 fn worker_thread_id_threaded() {
179 let rt = threaded();
180 let metrics = rt.metrics();
181
182 rt.block_on(rt.spawn(async move {
183 // Check that we are running on a worker thread and determine
184 // the index of our worker.
185 let thread_id = std::thread::current().id();
186 let this_worker = (0..2)
187 .position(|w| metrics.worker_thread_id(w) == Some(thread_id))
188 .expect("task not running on any worker thread");
189
190 // Force worker to another thread.
191 let moved_thread_id = tokio::task::block_in_place(|| {
192 assert_eq!(thread_id, std::thread::current().id());
193
194 // Wait for worker to move to another thread.
195 for _ in 0..100 {
196 let new_id = metrics.worker_thread_id(this_worker).unwrap();
197 if thread_id != new_id {
198 return new_id;
199 }
200 std::thread::sleep(Duration::from_millis(100));
201 }
202
203 panic!("worker did not move to new thread");
204 });
205
206 // After blocking task worker either stays on new thread or
207 // is moved back to current thread.
208 assert!(
209 metrics.worker_thread_id(this_worker) == Some(moved_thread_id)
210 || metrics.worker_thread_id(this_worker) == Some(thread_id)
211 );
212 }))
213 .unwrap()
214 }
215
216 #[test]
worker_park_count()217 fn worker_park_count() {
218 let rt = current_thread();
219 let metrics = rt.metrics();
220 rt.block_on(async {
221 time::sleep(Duration::from_millis(1)).await;
222 });
223 drop(rt);
224 assert!(1 <= metrics.worker_park_count(0));
225
226 let rt = threaded();
227 let metrics = rt.metrics();
228 rt.block_on(async {
229 time::sleep(Duration::from_millis(1)).await;
230 });
231 drop(rt);
232 assert!(1 <= metrics.worker_park_count(0));
233 assert!(1 <= metrics.worker_park_count(1));
234 }
235
236 #[test]
worker_park_unpark_count()237 fn worker_park_unpark_count() {
238 let rt = current_thread();
239 let metrics = rt.metrics();
240 rt.block_on(rt.spawn(async {})).unwrap();
241 drop(rt);
242 assert!(2 <= metrics.worker_park_unpark_count(0));
243
244 let rt = threaded();
245 let metrics = rt.metrics();
246
247 // Wait for workers to be parked after runtime startup.
248 for _ in 0..100 {
249 if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) {
250 break;
251 }
252 std::thread::sleep(std::time::Duration::from_millis(100));
253 }
254 assert_eq!(1, metrics.worker_park_unpark_count(0));
255 assert_eq!(1, metrics.worker_park_unpark_count(1));
256
257 // Spawn a task to unpark and then park a worker.
258 rt.block_on(rt.spawn(async {})).unwrap();
259 for _ in 0..100 {
260 if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) {
261 break;
262 }
263 std::thread::sleep(std::time::Duration::from_millis(100));
264 }
265 assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1));
266
267 // Both threads unpark for runtime shutdown.
268 drop(rt);
269 assert_eq!(0, metrics.worker_park_unpark_count(0) % 2);
270 assert_eq!(0, metrics.worker_park_unpark_count(1) % 2);
271 assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1));
272 }
273
274 #[test]
worker_noop_count()275 fn worker_noop_count() {
276 // There isn't really a great way to generate no-op parks as they happen as
277 // false-positive events under concurrency.
278
279 let rt = current_thread();
280 let metrics = rt.metrics();
281 rt.block_on(async {
282 time::sleep(Duration::from_millis(1)).await;
283 });
284 drop(rt);
285 assert!(0 < metrics.worker_noop_count(0));
286
287 let rt = threaded();
288 let metrics = rt.metrics();
289 rt.block_on(async {
290 time::sleep(Duration::from_millis(1)).await;
291 });
292 drop(rt);
293 assert!(0 < metrics.worker_noop_count(0));
294 assert!(0 < metrics.worker_noop_count(1));
295 }
296
297 #[test]
worker_steal_count()298 fn worker_steal_count() {
299 // This metric only applies to the multi-threaded runtime.
300 for _ in 0..10 {
301 let rt = threaded_no_lifo();
302 let metrics = rt.metrics();
303
304 let successfully_spawned_stealable_task = rt.block_on(async {
305 // The call to `try_spawn_stealable_task` may time out, which means
306 // that the sending task couldn't be scheduled due to a deadlock in
307 // the runtime.
308 // This is expected behaviour, we just retry until we succeed or
309 // exhaust all tries, the latter causing this test to fail.
310 try_spawn_stealable_task().await.is_ok()
311 });
312
313 drop(rt);
314
315 if successfully_spawned_stealable_task {
316 let n: u64 = (0..metrics.num_workers())
317 .map(|i| metrics.worker_steal_count(i))
318 .sum();
319
320 assert_eq!(1, n);
321 return;
322 }
323 }
324
325 panic!("exhausted every try to schedule the stealable task");
326 }
327
328 #[test]
worker_poll_count_and_time()329 fn worker_poll_count_and_time() {
330 const N: u64 = 5;
331
332 async fn task() {
333 // Sync sleep
334 std::thread::sleep(std::time::Duration::from_micros(10));
335 }
336
337 let rt = current_thread();
338 let metrics = rt.metrics();
339 rt.block_on(async {
340 for _ in 0..N {
341 tokio::spawn(task()).await.unwrap();
342 }
343 });
344 drop(rt);
345 assert_eq!(N, metrics.worker_poll_count(0));
346 // Not currently supported for current-thread runtime
347 assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0));
348
349 // Does not populate the histogram
350 assert!(!metrics.poll_time_histogram_enabled());
351 for i in 0..10 {
352 assert_eq!(0, metrics.poll_time_histogram_bucket_count(0, i));
353 }
354
355 let rt = threaded();
356 let metrics = rt.metrics();
357 rt.block_on(async {
358 for _ in 0..N {
359 tokio::spawn(task()).await.unwrap();
360 }
361 });
362 drop(rt);
363 // Account for the `block_on` task
364 let n = (0..metrics.num_workers())
365 .map(|i| metrics.worker_poll_count(i))
366 .sum();
367
368 assert_eq!(N, n);
369
370 let n: Duration = (0..metrics.num_workers())
371 .map(|i| metrics.worker_mean_poll_time(i))
372 .sum();
373
374 assert!(n > Duration::default());
375
376 // Does not populate the histogram
377 assert!(!metrics.poll_time_histogram_enabled());
378 for n in 0..metrics.num_workers() {
379 for i in 0..10 {
380 assert_eq!(0, metrics.poll_time_histogram_bucket_count(n, i));
381 }
382 }
383 }
384
385 #[test]
log_histogram()386 fn log_histogram() {
387 const N: u64 = 50;
388 let rt = tokio::runtime::Builder::new_current_thread()
389 .enable_all()
390 .enable_metrics_poll_time_histogram()
391 .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
392 LogHistogram::builder()
393 .max_value(Duration::from_secs(60))
394 .min_value(Duration::from_nanos(100))
395 .max_error(0.25),
396 ))
397 .build()
398 .unwrap();
399 let metrics = rt.metrics();
400 let num_buckets = rt.metrics().poll_time_histogram_num_buckets();
401 assert_eq!(num_buckets, 119);
402 rt.block_on(async {
403 for _ in 0..N {
404 tokio::spawn(async {}).await.unwrap();
405 }
406 });
407 drop(rt);
408 assert_eq!(
409 metrics.poll_time_histogram_bucket_range(0),
410 Duration::from_nanos(0)..Duration::from_nanos(96)
411 );
412 assert_eq!(
413 metrics.poll_time_histogram_bucket_range(1),
414 Duration::from_nanos(96)..Duration::from_nanos(96 + 2_u64.pow(4))
415 );
416 assert_eq!(
417 metrics.poll_time_histogram_bucket_range(118).end,
418 Duration::from_nanos(u64::MAX)
419 );
420 let n = (0..metrics.num_workers())
421 .flat_map(|i| (0..num_buckets).map(move |j| (i, j)))
422 .map(|(worker, bucket)| metrics.poll_time_histogram_bucket_count(worker, bucket))
423 .sum();
424 assert_eq!(N, n);
425 }
426
427 #[test]
428 #[allow(deprecated)]
legacy_log_histogram()429 fn legacy_log_histogram() {
430 let rt = tokio::runtime::Builder::new_multi_thread()
431 .enable_all()
432 .enable_metrics_poll_time_histogram()
433 .metrics_poll_count_histogram_scale(HistogramScale::Log)
434 .metrics_poll_count_histogram_resolution(Duration::from_micros(50))
435 .metrics_poll_count_histogram_buckets(20)
436 .build()
437 .unwrap();
438 let num_buckets = rt.metrics().poll_time_histogram_num_buckets();
439 assert_eq!(num_buckets, 20);
440 }
441
442 #[test]
log_histogram_default_configuration()443 fn log_histogram_default_configuration() {
444 let rt = tokio::runtime::Builder::new_current_thread()
445 .enable_all()
446 .enable_metrics_poll_time_histogram()
447 .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
448 LogHistogram::default(),
449 ))
450 .build()
451 .unwrap();
452 let num_buckets = rt.metrics().poll_time_histogram_num_buckets();
453 assert_eq!(num_buckets, 119);
454 }
455
456 #[test]
worker_poll_count_histogram()457 fn worker_poll_count_histogram() {
458 const N: u64 = 5;
459
460 let rts = [
461 tokio::runtime::Builder::new_current_thread()
462 .enable_all()
463 .enable_metrics_poll_time_histogram()
464 .metrics_poll_time_histogram_configuration(HistogramConfiguration::linear(
465 Duration::from_millis(50),
466 3,
467 ))
468 .build()
469 .unwrap(),
470 tokio::runtime::Builder::new_multi_thread()
471 .worker_threads(2)
472 .enable_all()
473 .enable_metrics_poll_time_histogram()
474 .metrics_poll_time_histogram_configuration(HistogramConfiguration::linear(
475 Duration::from_millis(50),
476 3,
477 ))
478 .build()
479 .unwrap(),
480 ];
481
482 for rt in rts {
483 let metrics = rt.metrics();
484 rt.block_on(async {
485 for _ in 0..N {
486 tokio::spawn(async {}).await.unwrap();
487 }
488 });
489 drop(rt);
490
491 let num_workers = metrics.num_workers();
492 let num_buckets = metrics.poll_time_histogram_num_buckets();
493
494 assert!(metrics.poll_time_histogram_enabled());
495 assert_eq!(num_buckets, 3);
496
497 let n = (0..num_workers)
498 .flat_map(|i| (0..num_buckets).map(move |j| (i, j)))
499 .map(|(worker, bucket)| metrics.poll_time_histogram_bucket_count(worker, bucket))
500 .sum();
501 assert_eq!(N, n);
502 }
503 }
504
505 #[test]
worker_poll_count_histogram_range()506 fn worker_poll_count_histogram_range() {
507 let max = Duration::from_nanos(u64::MAX);
508
509 let rt = tokio::runtime::Builder::new_current_thread()
510 .enable_all()
511 .enable_metrics_poll_time_histogram()
512 .metrics_poll_time_histogram_configuration(HistogramConfiguration::linear(us(50), 3))
513 .build()
514 .unwrap();
515 let metrics = rt.metrics();
516
517 assert_eq!(metrics.poll_time_histogram_bucket_range(0), us(0)..us(50));
518 assert_eq!(metrics.poll_time_histogram_bucket_range(1), us(50)..us(100));
519 assert_eq!(metrics.poll_time_histogram_bucket_range(2), us(100)..max);
520
521 // ensure the old methods work too
522 #[allow(deprecated)]
523 let rt = tokio::runtime::Builder::new_current_thread()
524 .enable_all()
525 .enable_metrics_poll_time_histogram()
526 .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Log)
527 .metrics_poll_count_histogram_buckets(3)
528 .metrics_poll_count_histogram_resolution(us(50))
529 .build()
530 .unwrap();
531 let metrics = rt.metrics();
532
533 let a = Duration::from_nanos(50000_u64.next_power_of_two());
534 let b = a * 2;
535
536 assert_eq!(metrics.poll_time_histogram_bucket_range(0), us(0)..a);
537 assert_eq!(metrics.poll_time_histogram_bucket_range(1), a..b);
538 assert_eq!(metrics.poll_time_histogram_bucket_range(2), b..max);
539 }
540
541 #[test]
worker_poll_count_histogram_disabled_without_explicit_enable()542 fn worker_poll_count_histogram_disabled_without_explicit_enable() {
543 let rts = [
544 tokio::runtime::Builder::new_current_thread()
545 .enable_all()
546 .metrics_poll_time_histogram_configuration(HistogramConfiguration::linear(
547 Duration::from_millis(50),
548 3,
549 ))
550 .build()
551 .unwrap(),
552 tokio::runtime::Builder::new_multi_thread()
553 .worker_threads(2)
554 .enable_all()
555 .metrics_poll_time_histogram_configuration(HistogramConfiguration::linear(
556 Duration::from_millis(50),
557 3,
558 ))
559 .build()
560 .unwrap(),
561 ];
562
563 for rt in rts {
564 let metrics = rt.metrics();
565 assert!(!metrics.poll_time_histogram_enabled());
566 }
567 }
568
569 #[test]
worker_total_busy_duration()570 fn worker_total_busy_duration() {
571 const N: usize = 5;
572
573 let zero = Duration::from_millis(0);
574
575 let rt = current_thread();
576 let metrics = rt.metrics();
577
578 rt.block_on(async {
579 for _ in 0..N {
580 tokio::spawn(async {
581 tokio::task::yield_now().await;
582 })
583 .await
584 .unwrap();
585 }
586 });
587
588 drop(rt);
589
590 assert!(zero < metrics.worker_total_busy_duration(0));
591
592 let rt = threaded();
593 let metrics = rt.metrics();
594
595 rt.block_on(async {
596 for _ in 0..N {
597 tokio::spawn(async {
598 tokio::task::yield_now().await;
599 })
600 .await
601 .unwrap();
602 }
603 });
604
605 drop(rt);
606
607 for i in 0..metrics.num_workers() {
608 assert!(zero < metrics.worker_total_busy_duration(i));
609 }
610 }
611
612 #[test]
worker_local_schedule_count()613 fn worker_local_schedule_count() {
614 let rt = current_thread();
615 let metrics = rt.metrics();
616 rt.block_on(async {
617 tokio::spawn(async {}).await.unwrap();
618 });
619 drop(rt);
620
621 assert_eq!(1, metrics.worker_local_schedule_count(0));
622 assert_eq!(0, metrics.remote_schedule_count());
623
624 let rt = threaded();
625 let metrics = rt.metrics();
626 rt.block_on(async {
627 // Move to the runtime
628 tokio::spawn(async {
629 tokio::spawn(async {}).await.unwrap();
630 })
631 .await
632 .unwrap();
633 });
634 drop(rt);
635
636 let n: u64 = (0..metrics.num_workers())
637 .map(|i| metrics.worker_local_schedule_count(i))
638 .sum();
639
640 assert_eq!(2, n);
641 assert_eq!(1, metrics.remote_schedule_count());
642 }
643
644 #[test]
worker_overflow_count()645 fn worker_overflow_count() {
646 // Only applies to the threaded worker
647 let rt = threaded();
648 let metrics = rt.metrics();
649 rt.block_on(async {
650 // Move to the runtime
651 tokio::spawn(async {
652 let (tx1, rx1) = std::sync::mpsc::channel();
653 let (tx2, rx2) = std::sync::mpsc::channel();
654
655 // First, we need to block the other worker until all tasks have
656 // been spawned.
657 //
658 // We spawn from outside the runtime to ensure that the other worker
659 // will pick it up:
660 // <https://github.com/tokio-rs/tokio/issues/4730>
661 tokio::task::spawn_blocking(|| {
662 tokio::spawn(async move {
663 tx1.send(()).unwrap();
664 rx2.recv().unwrap();
665 });
666 });
667
668 rx1.recv().unwrap();
669
670 // Spawn many tasks
671 for _ in 0..300 {
672 tokio::spawn(async {});
673 }
674
675 tx2.send(()).unwrap();
676 })
677 .await
678 .unwrap();
679 });
680 drop(rt);
681
682 let n: u64 = (0..metrics.num_workers())
683 .map(|i| metrics.worker_overflow_count(i))
684 .sum();
685
686 assert_eq!(1, n);
687 }
688
689 #[test]
worker_local_queue_depth()690 fn worker_local_queue_depth() {
691 const N: usize = 100;
692
693 let rt = current_thread();
694 let metrics = rt.metrics();
695 rt.block_on(async {
696 for _ in 0..N {
697 tokio::spawn(async {});
698 }
699
700 assert_eq!(N, metrics.worker_local_queue_depth(0));
701 });
702
703 let rt = threaded();
704 let metrics = rt.metrics();
705 rt.block_on(async move {
706 // Move to the runtime
707 tokio::spawn(async move {
708 let (tx1, rx1) = std::sync::mpsc::channel();
709 let (tx2, rx2) = std::sync::mpsc::channel();
710
711 // First, we need to block the other worker until all tasks have
712 // been spawned.
713 tokio::spawn(async move {
714 tx1.send(()).unwrap();
715 rx2.recv().unwrap();
716 });
717
718 // Bump the next-run spawn
719 tokio::spawn(async {});
720
721 rx1.recv().unwrap();
722
723 // Spawn some tasks
724 for _ in 0..100 {
725 tokio::spawn(async {});
726 }
727
728 let n: usize = (0..metrics.num_workers())
729 .map(|i| metrics.worker_local_queue_depth(i))
730 .sum();
731
732 assert_eq!(n, N);
733
734 tx2.send(()).unwrap();
735 })
736 .await
737 .unwrap();
738 });
739 }
740
741 #[test]
budget_exhaustion_yield()742 fn budget_exhaustion_yield() {
743 let rt = current_thread();
744 let metrics = rt.metrics();
745
746 assert_eq!(0, metrics.budget_forced_yield_count());
747
748 let mut did_yield = false;
749
750 // block on a task which consumes budget until it yields
751 rt.block_on(poll_fn(|cx| loop {
752 if did_yield {
753 return Poll::Ready(());
754 }
755
756 let fut = consume_budget();
757 tokio::pin!(fut);
758
759 if fut.poll(cx).is_pending() {
760 did_yield = true;
761 return Poll::Pending;
762 }
763 }));
764
765 assert_eq!(1, rt.metrics().budget_forced_yield_count());
766 }
767
768 #[test]
budget_exhaustion_yield_with_joins()769 fn budget_exhaustion_yield_with_joins() {
770 let rt = current_thread();
771 let metrics = rt.metrics();
772
773 assert_eq!(0, metrics.budget_forced_yield_count());
774
775 let mut did_yield_1 = false;
776 let mut did_yield_2 = false;
777
778 // block on a task which consumes budget until it yields
779 rt.block_on(async {
780 tokio::join!(
781 poll_fn(|cx| loop {
782 if did_yield_1 {
783 return Poll::Ready(());
784 }
785
786 let fut = consume_budget();
787 tokio::pin!(fut);
788
789 if fut.poll(cx).is_pending() {
790 did_yield_1 = true;
791 return Poll::Pending;
792 }
793 }),
794 poll_fn(|cx| loop {
795 if did_yield_2 {
796 return Poll::Ready(());
797 }
798
799 let fut = consume_budget();
800 tokio::pin!(fut);
801
802 if fut.poll(cx).is_pending() {
803 did_yield_2 = true;
804 return Poll::Pending;
805 }
806 })
807 )
808 });
809
810 assert_eq!(1, rt.metrics().budget_forced_yield_count());
811 }
812
813 #[cfg(any(target_os = "linux", target_os = "macos"))]
814 #[test]
io_driver_fd_count()815 fn io_driver_fd_count() {
816 let rt = current_thread();
817 let metrics = rt.metrics();
818
819 assert_eq!(metrics.io_driver_fd_registered_count(), 0);
820
821 let stream = tokio::net::TcpStream::connect("google.com:80");
822 let stream = rt.block_on(async move { stream.await.unwrap() });
823
824 assert_eq!(metrics.io_driver_fd_registered_count(), 1);
825 assert_eq!(metrics.io_driver_fd_deregistered_count(), 0);
826
827 drop(stream);
828
829 assert_eq!(metrics.io_driver_fd_deregistered_count(), 1);
830 assert_eq!(metrics.io_driver_fd_registered_count(), 1);
831 }
832
833 #[cfg(any(target_os = "linux", target_os = "macos"))]
834 #[test]
io_driver_ready_count()835 fn io_driver_ready_count() {
836 let rt = current_thread();
837 let metrics = rt.metrics();
838
839 let stream = tokio::net::TcpStream::connect("google.com:80");
840 let _stream = rt.block_on(async move { stream.await.unwrap() });
841
842 assert_eq!(metrics.io_driver_ready_count(), 1);
843 }
844
try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError>845 async fn try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError> {
846 // We use a blocking channel to synchronize the tasks.
847 let (tx, rx) = mpsc::channel();
848
849 // Make sure we are in the context of the runtime.
850 tokio::spawn(async move {
851 // Spawn the task that sends to the channel.
852 //
853 // Note that the runtime needs to have the lifo slot disabled to make
854 // this task stealable.
855 tokio::spawn(async move {
856 tx.send(()).unwrap();
857 });
858
859 // Blocking receive on the channel, timing out if the sending task
860 // wasn't scheduled in time.
861 rx.recv_timeout(Duration::from_secs(1))
862 })
863 .await
864 .unwrap()?;
865
866 Ok(())
867 }
868
current_thread() -> Runtime869 fn current_thread() -> Runtime {
870 tokio::runtime::Builder::new_current_thread()
871 .enable_all()
872 .build()
873 .unwrap()
874 }
875
threaded() -> Runtime876 fn threaded() -> Runtime {
877 tokio::runtime::Builder::new_multi_thread()
878 .worker_threads(2)
879 .enable_all()
880 .build()
881 .unwrap()
882 }
883
threaded_no_lifo() -> Runtime884 fn threaded_no_lifo() -> Runtime {
885 tokio::runtime::Builder::new_multi_thread()
886 .worker_threads(2)
887 .disable_lifo_slot()
888 .enable_all()
889 .build()
890 .unwrap()
891 }
892
us(n: u64) -> Duration893 fn us(n: u64) -> Duration {
894 Duration::from_micros(n)
895 }
896