1 use crate::runtime::metrics::{HistogramBatch, WorkerMetrics};
2 
3 use std::sync::atomic::Ordering::Relaxed;
4 use std::time::{Duration, Instant};
5 
6 pub(crate) struct MetricsBatch {
7     /// Number of times the worker parked.
8     park_count: u64,
9 
10     /// Number of times the worker parked and unparked.
11     park_unpark_count: u64,
12 
13     /// Number of times the worker woke w/o doing work.
14     noop_count: u64,
15 
16     /// Number of tasks stolen.
17     steal_count: u64,
18 
19     /// Number of times tasks where stolen.
20     steal_operations: u64,
21 
22     /// Number of tasks that were polled by the worker.
23     poll_count: u64,
24 
25     /// Number of tasks polled when the worker entered park. This is used to
26     /// track the noop count.
27     poll_count_on_last_park: u64,
28 
29     /// Number of tasks that were scheduled locally on this worker.
30     local_schedule_count: u64,
31 
32     /// Number of tasks moved to the global queue to make space in the local
33     /// queue
34     overflow_count: u64,
35 
36     /// The total busy duration in nanoseconds.
37     busy_duration_total: u64,
38 
39     /// Instant at which work last resumed (continued after park).
40     processing_scheduled_tasks_started_at: Instant,
41 
42     /// If `Some`, tracks poll times in nanoseconds
43     poll_timer: Option<PollTimer>,
44 }
45 
46 struct PollTimer {
47     /// Histogram of poll counts within each band.
48     poll_counts: HistogramBatch,
49 
50     /// Instant when the most recent task started polling.
51     poll_started_at: Instant,
52 }
53 
54 impl MetricsBatch {
new(worker_metrics: &WorkerMetrics) -> MetricsBatch55     pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
56         let now = Instant::now();
57 
58         MetricsBatch {
59             park_count: 0,
60             park_unpark_count: 0,
61             noop_count: 0,
62             steal_count: 0,
63             steal_operations: 0,
64             poll_count: 0,
65             poll_count_on_last_park: 0,
66             local_schedule_count: 0,
67             overflow_count: 0,
68             busy_duration_total: 0,
69             processing_scheduled_tasks_started_at: now,
70             poll_timer: worker_metrics
71                 .poll_count_histogram
72                 .as_ref()
73                 .map(|worker_poll_counts| PollTimer {
74                     poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
75                     poll_started_at: now,
76                 }),
77         }
78     }
79 
submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64)80     pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
81         worker.mean_poll_time.store(mean_poll_time, Relaxed);
82         worker.park_count.store(self.park_count, Relaxed);
83         worker
84             .park_unpark_count
85             .store(self.park_unpark_count, Relaxed);
86         worker.noop_count.store(self.noop_count, Relaxed);
87         worker.steal_count.store(self.steal_count, Relaxed);
88         worker
89             .steal_operations
90             .store(self.steal_operations, Relaxed);
91         worker.poll_count.store(self.poll_count, Relaxed);
92 
93         worker
94             .busy_duration_total
95             .store(self.busy_duration_total, Relaxed);
96 
97         worker
98             .local_schedule_count
99             .store(self.local_schedule_count, Relaxed);
100         worker.overflow_count.store(self.overflow_count, Relaxed);
101 
102         if let Some(poll_timer) = &self.poll_timer {
103             let dst = worker.poll_count_histogram.as_ref().unwrap();
104             poll_timer.poll_counts.submit(dst);
105         }
106     }
107 
108     /// The worker is about to park.
about_to_park(&mut self)109     pub(crate) fn about_to_park(&mut self) {
110         self.park_count += 1;
111         self.park_unpark_count += 1;
112 
113         if self.poll_count_on_last_park == self.poll_count {
114             self.noop_count += 1;
115         } else {
116             self.poll_count_on_last_park = self.poll_count;
117         }
118     }
119 
120     /// The worker was unparked.
unparked(&mut self)121     pub(crate) fn unparked(&mut self) {
122         self.park_unpark_count += 1;
123     }
124 
125     /// Start processing a batch of tasks
start_processing_scheduled_tasks(&mut self)126     pub(crate) fn start_processing_scheduled_tasks(&mut self) {
127         self.processing_scheduled_tasks_started_at = Instant::now();
128     }
129 
130     /// Stop processing a batch of tasks
end_processing_scheduled_tasks(&mut self)131     pub(crate) fn end_processing_scheduled_tasks(&mut self) {
132         let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
133         self.busy_duration_total += duration_as_u64(busy_duration);
134     }
135 
136     /// Start polling an individual task
start_poll(&mut self)137     pub(crate) fn start_poll(&mut self) {
138         self.poll_count += 1;
139 
140         if let Some(poll_timer) = &mut self.poll_timer {
141             poll_timer.poll_started_at = Instant::now();
142         }
143     }
144 
145     /// Stop polling an individual task
end_poll(&mut self)146     pub(crate) fn end_poll(&mut self) {
147         if let Some(poll_timer) = &mut self.poll_timer {
148             let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
149             poll_timer.poll_counts.measure(elapsed, 1);
150         }
151     }
152 
inc_local_schedule_count(&mut self)153     pub(crate) fn inc_local_schedule_count(&mut self) {
154         self.local_schedule_count += 1;
155     }
156 }
157 
158 cfg_rt_multi_thread! {
159     impl MetricsBatch {
160         pub(crate) fn incr_steal_count(&mut self, by: u16) {
161             self.steal_count += by as u64;
162         }
163 
164         pub(crate) fn incr_steal_operations(&mut self) {
165             self.steal_operations += 1;
166         }
167 
168         pub(crate) fn incr_overflow_count(&mut self) {
169             self.overflow_count += 1;
170         }
171     }
172 }
173 
duration_as_u64(dur: Duration) -> u64174 pub(crate) fn duration_as_u64(dur: Duration) -> u64 {
175     u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
176 }
177