1 use crate::loom::sync::Arc;
2 use crate::runtime::context;
3 use crate::runtime::scheduler::{self, current_thread, Inject};
4 use crate::task::Id;
5 
6 use backtrace::BacktraceFrame;
7 use std::cell::Cell;
8 use std::collections::VecDeque;
9 use std::ffi::c_void;
10 use std::fmt;
11 use std::future::Future;
12 use std::pin::Pin;
13 use std::ptr::{self, NonNull};
14 use std::task::{self, Poll};
15 
16 mod symbol;
17 mod tree;
18 
19 use symbol::Symbol;
20 use tree::Tree;
21 
22 use super::{Notified, OwnedTasks, Schedule};
23 
24 type Backtrace = Vec<BacktraceFrame>;
25 type SymbolTrace = Vec<Symbol>;
26 
27 /// The ambiant backtracing context.
28 pub(crate) struct Context {
29     /// The address of [`Trace::root`] establishes an upper unwinding bound on
30     /// the backtraces in `Trace`.
31     active_frame: Cell<Option<NonNull<Frame>>>,
32     /// The place to stash backtraces.
33     collector: Cell<Option<Trace>>,
34 }
35 
36 /// A [`Frame`] in an intrusive, doubly-linked tree of [`Frame`]s.
37 struct Frame {
38     /// The location associated with this frame.
39     inner_addr: *const c_void,
40 
41     /// The parent frame, if any.
42     parent: Option<NonNull<Frame>>,
43 }
44 
45 /// An tree execution trace.
46 ///
47 /// Traces are captured with [`Trace::capture`], rooted with [`Trace::root`]
48 /// and leaved with [`trace_leaf`].
49 #[derive(Clone, Debug)]
50 pub(crate) struct Trace {
51     // The linear backtraces that comprise this trace. These linear traces can
52     // be re-knitted into a tree.
53     backtraces: Vec<Backtrace>,
54 }
55 
56 pin_project_lite::pin_project! {
57     #[derive(Debug, Clone)]
58     #[must_use = "futures do nothing unless you `.await` or poll them"]
59     pub(crate) struct Root<T> {
60         #[pin]
61         future: T,
62     }
63 }
64 
65 const FAIL_NO_THREAD_LOCAL: &str = "The Tokio thread-local has been destroyed \
66                                     as part of shutting down the current \
67                                     thread, so collecting a taskdump is not \
68                                     possible.";
69 
70 impl Context {
new() -> Self71     pub(crate) const fn new() -> Self {
72         Context {
73             active_frame: Cell::new(None),
74             collector: Cell::new(None),
75         }
76     }
77 
78     /// SAFETY: Callers of this function must ensure that trace frames always
79     /// form a valid linked list.
try_with_current<F, R>(f: F) -> Option<R> where F: FnOnce(&Self) -> R,80     unsafe fn try_with_current<F, R>(f: F) -> Option<R>
81     where
82         F: FnOnce(&Self) -> R,
83     {
84         crate::runtime::context::with_trace(f)
85     }
86 
with_current_frame<F, R>(f: F) -> R where F: FnOnce(&Cell<Option<NonNull<Frame>>>) -> R,87     unsafe fn with_current_frame<F, R>(f: F) -> R
88     where
89         F: FnOnce(&Cell<Option<NonNull<Frame>>>) -> R,
90     {
91         Self::try_with_current(|context| f(&context.active_frame)).expect(FAIL_NO_THREAD_LOCAL)
92     }
93 
with_current_collector<F, R>(f: F) -> R where F: FnOnce(&Cell<Option<Trace>>) -> R,94     fn with_current_collector<F, R>(f: F) -> R
95     where
96         F: FnOnce(&Cell<Option<Trace>>) -> R,
97     {
98         // SAFETY: This call can only access the collector field, so it cannot
99         // break the trace frame linked list.
100         unsafe {
101             Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL)
102         }
103     }
104 
105     /// Produces `true` if the current task is being traced; otherwise false.
is_tracing() -> bool106     pub(crate) fn is_tracing() -> bool {
107         Self::with_current_collector(|maybe_collector| {
108             let collector = maybe_collector.take();
109             let result = collector.is_some();
110             maybe_collector.set(collector);
111             result
112         })
113     }
114 }
115 
116 impl Trace {
117     /// Invokes `f`, returning both its result and the collection of backtraces
118     /// captured at each sub-invocation of [`trace_leaf`].
119     #[inline(never)]
capture<F, R>(f: F) -> (R, Trace) where F: FnOnce() -> R,120     pub(crate) fn capture<F, R>(f: F) -> (R, Trace)
121     where
122         F: FnOnce() -> R,
123     {
124         let collector = Trace { backtraces: vec![] };
125 
126         let previous = Context::with_current_collector(|current| current.replace(Some(collector)));
127 
128         let result = f();
129 
130         let collector =
131             Context::with_current_collector(|current| current.replace(previous)).unwrap();
132 
133         (result, collector)
134     }
135 
136     /// The root of a trace.
137     #[inline(never)]
root<F>(future: F) -> Root<F>138     pub(crate) fn root<F>(future: F) -> Root<F> {
139         Root { future }
140     }
141 }
142 
143 /// If this is a sub-invocation of [`Trace::capture`], capture a backtrace.
144 ///
145 /// The captured backtrace will be returned by [`Trace::capture`].
146 ///
147 /// Invoking this function does nothing when it is not a sub-invocation
148 /// [`Trace::capture`].
149 // This function is marked `#[inline(never)]` to ensure that it gets a distinct `Frame` in the
150 // backtrace, below which frames should not be included in the backtrace (since they reflect the
151 // internal implementation details of this crate).
152 #[inline(never)]
trace_leaf(cx: &mut task::Context<'_>) -> Poll<()>153 pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
154     // Safety: We don't manipulate the current context's active frame.
155     let did_trace = unsafe {
156         Context::try_with_current(|context_cell| {
157             if let Some(mut collector) = context_cell.collector.take() {
158                 let mut frames = vec![];
159                 let mut above_leaf = false;
160 
161                 if let Some(active_frame) = context_cell.active_frame.get() {
162                     let active_frame = active_frame.as_ref();
163 
164                     backtrace::trace(|frame| {
165                         let below_root = !ptr::eq(frame.symbol_address(), active_frame.inner_addr);
166 
167                         // only capture frames above `Trace::leaf` and below
168                         // `Trace::root`.
169                         if above_leaf && below_root {
170                             frames.push(frame.to_owned().into());
171                         }
172 
173                         if ptr::eq(frame.symbol_address(), trace_leaf as *const _) {
174                             above_leaf = true;
175                         }
176 
177                         // only continue unwinding if we're below `Trace::root`
178                         below_root
179                     });
180                 }
181                 collector.backtraces.push(frames);
182                 context_cell.collector.set(Some(collector));
183                 true
184             } else {
185                 false
186             }
187         })
188         .unwrap_or(false)
189     };
190 
191     if did_trace {
192         // Use the same logic that `yield_now` uses to send out wakeups after
193         // the task yields.
194         context::with_scheduler(|scheduler| {
195             if let Some(scheduler) = scheduler {
196                 match scheduler {
197                     scheduler::Context::CurrentThread(s) => s.defer.defer(cx.waker()),
198                     #[cfg(feature = "rt-multi-thread")]
199                     scheduler::Context::MultiThread(s) => s.defer.defer(cx.waker()),
200                     #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
201                     scheduler::Context::MultiThreadAlt(_) => unimplemented!(),
202                 }
203             }
204         });
205 
206         Poll::Pending
207     } else {
208         Poll::Ready(())
209     }
210 }
211 
212 impl fmt::Display for Trace {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result213     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214         Tree::from_trace(self.clone()).fmt(f)
215     }
216 }
217 
defer<F: FnOnce() -> R, R>(f: F) -> impl Drop218 fn defer<F: FnOnce() -> R, R>(f: F) -> impl Drop {
219     use std::mem::ManuallyDrop;
220 
221     struct Defer<F: FnOnce() -> R, R>(ManuallyDrop<F>);
222 
223     impl<F: FnOnce() -> R, R> Drop for Defer<F, R> {
224         #[inline(always)]
225         fn drop(&mut self) {
226             unsafe {
227                 ManuallyDrop::take(&mut self.0)();
228             }
229         }
230     }
231 
232     Defer(ManuallyDrop::new(f))
233 }
234 
235 impl<T: Future> Future for Root<T> {
236     type Output = T::Output;
237 
238     #[inline(never)]
poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>239     fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
240         // SAFETY: The context's current frame is restored to its original state
241         // before `frame` is dropped.
242         unsafe {
243             let mut frame = Frame {
244                 inner_addr: Self::poll as *const c_void,
245                 parent: None,
246             };
247 
248             Context::with_current_frame(|current| {
249                 frame.parent = current.take();
250                 current.set(Some(NonNull::from(&frame)));
251             });
252 
253             let _restore = defer(|| {
254                 Context::with_current_frame(|current| {
255                     current.set(frame.parent);
256                 });
257             });
258 
259             let this = self.project();
260             this.future.poll(cx)
261         }
262     }
263 }
264 
265 /// Trace and poll all tasks of the `current_thread` runtime.
trace_current_thread( owned: &OwnedTasks<Arc<current_thread::Handle>>, local: &mut VecDeque<Notified<Arc<current_thread::Handle>>>, injection: &Inject<Arc<current_thread::Handle>>, ) -> Vec<(Id, Trace)>266 pub(in crate::runtime) fn trace_current_thread(
267     owned: &OwnedTasks<Arc<current_thread::Handle>>,
268     local: &mut VecDeque<Notified<Arc<current_thread::Handle>>>,
269     injection: &Inject<Arc<current_thread::Handle>>,
270 ) -> Vec<(Id, Trace)> {
271     // clear the local and injection queues
272 
273     let mut dequeued = Vec::new();
274 
275     while let Some(task) = local.pop_back() {
276         dequeued.push(task);
277     }
278 
279     while let Some(task) = injection.pop() {
280         dequeued.push(task);
281     }
282 
283     // precondition: We have drained the tasks from the injection queue.
284     trace_owned(owned, dequeued)
285 }
286 
287 cfg_rt_multi_thread! {
288     use crate::loom::sync::Mutex;
289     use crate::runtime::scheduler::multi_thread;
290     use crate::runtime::scheduler::multi_thread::Synced;
291     use crate::runtime::scheduler::inject::Shared;
292 
293     /// Trace and poll all tasks of the `current_thread` runtime.
294     ///
295     /// ## Safety
296     ///
297     /// Must be called with the same `synced` that `injection` was created with.
298     pub(in crate::runtime) unsafe fn trace_multi_thread(
299         owned: &OwnedTasks<Arc<multi_thread::Handle>>,
300         local: &mut multi_thread::queue::Local<Arc<multi_thread::Handle>>,
301         synced: &Mutex<Synced>,
302         injection: &Shared<Arc<multi_thread::Handle>>,
303     ) -> Vec<(Id, Trace)> {
304         let mut dequeued = Vec::new();
305 
306         // clear the local queue
307         while let Some(notified) = local.pop() {
308             dequeued.push(notified);
309         }
310 
311         // clear the injection queue
312         let mut synced = synced.lock();
313         while let Some(notified) = injection.pop(&mut synced.inject) {
314             dequeued.push(notified);
315         }
316 
317         drop(synced);
318 
319         // precondition: we have drained the tasks from the local and injection
320         // queues.
321         trace_owned(owned, dequeued)
322     }
323 }
324 
325 /// Trace the `OwnedTasks`.
326 ///
327 /// # Preconditions
328 ///
329 /// This helper presumes exclusive access to each task. The tasks must not exist
330 /// in any other queue.
trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<(Id, Trace)>331 fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<(Id, Trace)> {
332     let mut tasks = dequeued;
333     // Notify and trace all un-notified tasks. The dequeued tasks are already
334     // notified and so do not need to be re-notified.
335     owned.for_each(|task| {
336         // Notify the task (and thus make it poll-able) and stash it. This fails
337         // if the task is already notified. In these cases, we skip tracing the
338         // task.
339         if let Some(notified) = task.notify_for_tracing() {
340             tasks.push(notified);
341         }
342         // We do not poll tasks here, since we hold a lock on `owned` and the
343         // task may complete and need to remove itself from `owned`. Polling
344         // such a task here would result in a deadlock.
345     });
346 
347     tasks
348         .into_iter()
349         .map(|task| {
350             let local_notified = owned.assert_owner(task);
351             let id = local_notified.task.id();
352             let ((), trace) = Trace::capture(|| local_notified.run());
353             (id, trace)
354         })
355         .collect()
356 }
357