1 use crate::runtime::metrics::{HistogramBatch, WorkerMetrics};
2
3 use std::sync::atomic::Ordering::Relaxed;
4 use std::time::{Duration, Instant};
5
6 pub(crate) struct MetricsBatch {
7 /// Number of times the worker parked.
8 park_count: u64,
9
10 /// Number of times the worker parked and unparked.
11 park_unpark_count: u64,
12
13 /// Number of times the worker woke w/o doing work.
14 noop_count: u64,
15
16 /// Number of tasks stolen.
17 steal_count: u64,
18
19 /// Number of times tasks where stolen.
20 steal_operations: u64,
21
22 /// Number of tasks that were polled by the worker.
23 poll_count: u64,
24
25 /// Number of tasks polled when the worker entered park. This is used to
26 /// track the noop count.
27 poll_count_on_last_park: u64,
28
29 /// Number of tasks that were scheduled locally on this worker.
30 local_schedule_count: u64,
31
32 /// Number of tasks moved to the global queue to make space in the local
33 /// queue
34 overflow_count: u64,
35
36 /// The total busy duration in nanoseconds.
37 busy_duration_total: u64,
38
39 /// Instant at which work last resumed (continued after park).
40 processing_scheduled_tasks_started_at: Instant,
41
42 /// If `Some`, tracks poll times in nanoseconds
43 poll_timer: Option<PollTimer>,
44 }
45
46 struct PollTimer {
47 /// Histogram of poll counts within each band.
48 poll_counts: HistogramBatch,
49
50 /// Instant when the most recent task started polling.
51 poll_started_at: Instant,
52 }
53
54 impl MetricsBatch {
new(worker_metrics: &WorkerMetrics) -> MetricsBatch55 pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
56 let now = Instant::now();
57
58 MetricsBatch {
59 park_count: 0,
60 park_unpark_count: 0,
61 noop_count: 0,
62 steal_count: 0,
63 steal_operations: 0,
64 poll_count: 0,
65 poll_count_on_last_park: 0,
66 local_schedule_count: 0,
67 overflow_count: 0,
68 busy_duration_total: 0,
69 processing_scheduled_tasks_started_at: now,
70 poll_timer: worker_metrics
71 .poll_count_histogram
72 .as_ref()
73 .map(|worker_poll_counts| PollTimer {
74 poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
75 poll_started_at: now,
76 }),
77 }
78 }
79
submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64)80 pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
81 worker.mean_poll_time.store(mean_poll_time, Relaxed);
82 worker.park_count.store(self.park_count, Relaxed);
83 worker
84 .park_unpark_count
85 .store(self.park_unpark_count, Relaxed);
86 worker.noop_count.store(self.noop_count, Relaxed);
87 worker.steal_count.store(self.steal_count, Relaxed);
88 worker
89 .steal_operations
90 .store(self.steal_operations, Relaxed);
91 worker.poll_count.store(self.poll_count, Relaxed);
92
93 worker
94 .busy_duration_total
95 .store(self.busy_duration_total, Relaxed);
96
97 worker
98 .local_schedule_count
99 .store(self.local_schedule_count, Relaxed);
100 worker.overflow_count.store(self.overflow_count, Relaxed);
101
102 if let Some(poll_timer) = &self.poll_timer {
103 let dst = worker.poll_count_histogram.as_ref().unwrap();
104 poll_timer.poll_counts.submit(dst);
105 }
106 }
107
108 /// The worker is about to park.
about_to_park(&mut self)109 pub(crate) fn about_to_park(&mut self) {
110 self.park_count += 1;
111 self.park_unpark_count += 1;
112
113 if self.poll_count_on_last_park == self.poll_count {
114 self.noop_count += 1;
115 } else {
116 self.poll_count_on_last_park = self.poll_count;
117 }
118 }
119
120 /// The worker was unparked.
unparked(&mut self)121 pub(crate) fn unparked(&mut self) {
122 self.park_unpark_count += 1;
123 }
124
125 /// Start processing a batch of tasks
start_processing_scheduled_tasks(&mut self)126 pub(crate) fn start_processing_scheduled_tasks(&mut self) {
127 self.processing_scheduled_tasks_started_at = Instant::now();
128 }
129
130 /// Stop processing a batch of tasks
end_processing_scheduled_tasks(&mut self)131 pub(crate) fn end_processing_scheduled_tasks(&mut self) {
132 let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
133 self.busy_duration_total += duration_as_u64(busy_duration);
134 }
135
136 /// Start polling an individual task
start_poll(&mut self)137 pub(crate) fn start_poll(&mut self) {
138 self.poll_count += 1;
139
140 if let Some(poll_timer) = &mut self.poll_timer {
141 poll_timer.poll_started_at = Instant::now();
142 }
143 }
144
145 /// Stop polling an individual task
end_poll(&mut self)146 pub(crate) fn end_poll(&mut self) {
147 if let Some(poll_timer) = &mut self.poll_timer {
148 let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
149 poll_timer.poll_counts.measure(elapsed, 1);
150 }
151 }
152
inc_local_schedule_count(&mut self)153 pub(crate) fn inc_local_schedule_count(&mut self) {
154 self.local_schedule_count += 1;
155 }
156 }
157
158 cfg_rt_multi_thread! {
159 impl MetricsBatch {
160 pub(crate) fn incr_steal_count(&mut self, by: u16) {
161 self.steal_count += by as u64;
162 }
163
164 pub(crate) fn incr_steal_operations(&mut self) {
165 self.steal_operations += 1;
166 }
167
168 pub(crate) fn incr_overflow_count(&mut self) {
169 self.overflow_count += 1;
170 }
171 }
172 }
173
duration_as_u64(dur: Duration) -> u64174 pub(crate) fn duration_as_u64(dur: Duration) -> u64 {
175 u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
176 }
177