1 //! Runs `!Send` futures on the current thread.
2 use crate::loom::cell::UnsafeCell;
3 use crate::loom::sync::{Arc, Mutex};
4 #[cfg(tokio_unstable)]
5 use crate::runtime;
6 use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task, TaskHarnessScheduleHooks};
7 use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD};
8 use crate::sync::AtomicWaker;
9 use crate::util::trace::SpawnMeta;
10 use crate::util::RcCell;
11 
12 use std::cell::Cell;
13 use std::collections::VecDeque;
14 use std::fmt;
15 use std::future::Future;
16 use std::marker::PhantomData;
17 use std::mem;
18 use std::pin::Pin;
19 use std::rc::Rc;
20 use std::task::Poll;
21 
22 use pin_project_lite::pin_project;
23 
24 cfg_rt! {
25     /// A set of tasks which are executed on the same thread.
26     ///
27     /// In some cases, it is necessary to run one or more futures that do not
28     /// implement [`Send`] and thus are unsafe to send between threads. In these
29     /// cases, a [local task set] may be used to schedule one or more `!Send`
30     /// futures to run together on the same thread.
31     ///
32     /// For example, the following code will not compile:
33     ///
34     /// ```rust,compile_fail
35     /// use std::rc::Rc;
36     ///
37     /// #[tokio::main]
38     /// async fn main() {
39     ///     // `Rc` does not implement `Send`, and thus may not be sent between
40     ///     // threads safely.
41     ///     let nonsend_data = Rc::new("my nonsend data...");
42     ///
43     ///     let nonsend_data = nonsend_data.clone();
44     ///     // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
45     ///     // Since `tokio::spawn` requires the spawned future to implement `Send`, this
46     ///     // will not compile.
47     ///     tokio::spawn(async move {
48     ///         println!("{}", nonsend_data);
49     ///         // ...
50     ///     }).await.unwrap();
51     /// }
52     /// ```
53     ///
54     /// # Use with `run_until`
55     ///
56     /// To spawn `!Send` futures, we can use a local task set to schedule them
57     /// on the thread calling [`Runtime::block_on`]. When running inside of the
58     /// local task set, we can use [`task::spawn_local`], which can spawn
59     /// `!Send` futures. For example:
60     ///
61     /// ```rust
62     /// use std::rc::Rc;
63     /// use tokio::task;
64     ///
65     /// #[tokio::main]
66     /// async fn main() {
67     ///     let nonsend_data = Rc::new("my nonsend data...");
68     ///
69     ///     // Construct a local task set that can run `!Send` futures.
70     ///     let local = task::LocalSet::new();
71     ///
72     ///     // Run the local task set.
73     ///     local.run_until(async move {
74     ///         let nonsend_data = nonsend_data.clone();
75     ///         // `spawn_local` ensures that the future is spawned on the local
76     ///         // task set.
77     ///         task::spawn_local(async move {
78     ///             println!("{}", nonsend_data);
79     ///             // ...
80     ///         }).await.unwrap();
81     ///     }).await;
82     /// }
83     /// ```
84     /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
85     /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
86     /// cannot be used inside a task spawned with `tokio::spawn`.
87     ///
88     /// ## Awaiting a `LocalSet`
89     ///
90     /// Additionally, a `LocalSet` itself implements `Future`, completing when
91     /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
92     /// several futures on a `LocalSet` and drive the whole set until they
93     /// complete. For example,
94     ///
95     /// ```rust
96     /// use tokio::{task, time};
97     /// use std::rc::Rc;
98     ///
99     /// #[tokio::main]
100     /// async fn main() {
101     ///     let nonsend_data = Rc::new("world");
102     ///     let local = task::LocalSet::new();
103     ///
104     ///     let nonsend_data2 = nonsend_data.clone();
105     ///     local.spawn_local(async move {
106     ///         // ...
107     ///         println!("hello {}", nonsend_data2)
108     ///     });
109     ///
110     ///     local.spawn_local(async move {
111     ///         time::sleep(time::Duration::from_millis(100)).await;
112     ///         println!("goodbye {}", nonsend_data)
113     ///     });
114     ///
115     ///     // ...
116     ///
117     ///     local.await;
118     /// }
119     /// ```
120     /// **Note:** Awaiting a `LocalSet` can only be done inside
121     /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
122     /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
123     /// `tokio::spawn`.
124     ///
125     /// ## Use inside `tokio::spawn`
126     ///
127     /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
128     /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
129     /// something else. The solution is to create the `LocalSet` somewhere else,
130     /// and communicate with it using an [`mpsc`] channel.
131     ///
132     /// The following example puts the `LocalSet` inside a new thread.
133     /// ```
134     /// use tokio::runtime::Builder;
135     /// use tokio::sync::{mpsc, oneshot};
136     /// use tokio::task::LocalSet;
137     ///
138     /// // This struct describes the task you want to spawn. Here we include
139     /// // some simple examples. The oneshot channel allows sending a response
140     /// // to the spawner.
141     /// #[derive(Debug)]
142     /// enum Task {
143     ///     PrintNumber(u32),
144     ///     AddOne(u32, oneshot::Sender<u32>),
145     /// }
146     ///
147     /// #[derive(Clone)]
148     /// struct LocalSpawner {
149     ///    send: mpsc::UnboundedSender<Task>,
150     /// }
151     ///
152     /// impl LocalSpawner {
153     ///     pub fn new() -> Self {
154     ///         let (send, mut recv) = mpsc::unbounded_channel();
155     ///
156     ///         let rt = Builder::new_current_thread()
157     ///             .enable_all()
158     ///             .build()
159     ///             .unwrap();
160     ///
161     ///         std::thread::spawn(move || {
162     ///             let local = LocalSet::new();
163     ///
164     ///             local.spawn_local(async move {
165     ///                 while let Some(new_task) = recv.recv().await {
166     ///                     tokio::task::spawn_local(run_task(new_task));
167     ///                 }
168     ///                 // If the while loop returns, then all the LocalSpawner
169     ///                 // objects have been dropped.
170     ///             });
171     ///
172     ///             // This will return once all senders are dropped and all
173     ///             // spawned tasks have returned.
174     ///             rt.block_on(local);
175     ///         });
176     ///
177     ///         Self {
178     ///             send,
179     ///         }
180     ///     }
181     ///
182     ///     pub fn spawn(&self, task: Task) {
183     ///         self.send.send(task).expect("Thread with LocalSet has shut down.");
184     ///     }
185     /// }
186     ///
187     /// // This task may do !Send stuff. We use printing a number as an example,
188     /// // but it could be anything.
189     /// //
190     /// // The Task struct is an enum to support spawning many different kinds
191     /// // of operations.
192     /// async fn run_task(task: Task) {
193     ///     match task {
194     ///         Task::PrintNumber(n) => {
195     ///             println!("{}", n);
196     ///         },
197     ///         Task::AddOne(n, response) => {
198     ///             // We ignore failures to send the response.
199     ///             let _ = response.send(n + 1);
200     ///         },
201     ///     }
202     /// }
203     ///
204     /// #[tokio::main]
205     /// async fn main() {
206     ///     let spawner = LocalSpawner::new();
207     ///
208     ///     let (send, response) = oneshot::channel();
209     ///     spawner.spawn(Task::AddOne(10, send));
210     ///     let eleven = response.await.unwrap();
211     ///     assert_eq!(eleven, 11);
212     /// }
213     /// ```
214     ///
215     /// [`Send`]: trait@std::marker::Send
216     /// [local task set]: struct@LocalSet
217     /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
218     /// [`task::spawn_local`]: fn@spawn_local
219     /// [`mpsc`]: mod@crate::sync::mpsc
220     pub struct LocalSet {
221         /// Current scheduler tick.
222         tick: Cell<u8>,
223 
224         /// State available from thread-local.
225         context: Rc<Context>,
226 
227         /// This type should not be Send.
228         _not_send: PhantomData<*const ()>,
229     }
230 }
231 
232 /// State available from the thread-local.
233 struct Context {
234     /// State shared between threads.
235     shared: Arc<Shared>,
236 
237     /// True if a task panicked without being handled and the local set is
238     /// configured to shutdown on unhandled panic.
239     unhandled_panic: Cell<bool>,
240 }
241 
242 /// `LocalSet` state shared between threads.
243 struct Shared {
244     /// # Safety
245     ///
246     /// This field must *only* be accessed from the thread that owns the
247     /// `LocalSet` (i.e., `Thread::current().id() == owner`).
248     local_state: LocalState,
249 
250     /// Remote run queue sender.
251     queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
252 
253     /// Wake the `LocalSet` task.
254     waker: AtomicWaker,
255 
256     /// How to respond to unhandled task panics.
257     #[cfg(tokio_unstable)]
258     pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
259 }
260 
261 /// Tracks the `LocalSet` state that must only be accessed from the thread that
262 /// created the `LocalSet`.
263 struct LocalState {
264     /// The `ThreadId` of the thread that owns the `LocalSet`.
265     owner: ThreadId,
266 
267     /// Local run queue sender and receiver.
268     local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>,
269 
270     /// Collection of all active tasks spawned onto this executor.
271     owned: LocalOwnedTasks<Arc<Shared>>,
272 }
273 
274 pin_project! {
275     #[derive(Debug)]
276     struct RunUntil<'a, F> {
277         local_set: &'a LocalSet,
278         #[pin]
279         future: F,
280     }
281 }
282 
283 tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
284     ctx: RcCell::new(),
285     wake_on_schedule: Cell::new(false),
286 } });
287 
288 struct LocalData {
289     ctx: RcCell<Context>,
290     wake_on_schedule: Cell<bool>,
291 }
292 
293 impl LocalData {
294     /// Should be called except when we call `LocalSet::enter`.
295     /// Especially when we poll a `LocalSet`.
296     #[must_use = "dropping this guard will reset the entered state"]
enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_>297     fn enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_> {
298         let ctx = self.ctx.replace(Some(ctx));
299         let wake_on_schedule = self.wake_on_schedule.replace(false);
300         LocalDataEnterGuard {
301             local_data_ref: self,
302             ctx,
303             wake_on_schedule,
304         }
305     }
306 }
307 
308 /// A guard for `LocalData::enter()`
309 struct LocalDataEnterGuard<'a> {
310     local_data_ref: &'a LocalData,
311     ctx: Option<Rc<Context>>,
312     wake_on_schedule: bool,
313 }
314 
315 impl<'a> Drop for LocalDataEnterGuard<'a> {
drop(&mut self)316     fn drop(&mut self) {
317         self.local_data_ref.ctx.set(self.ctx.take());
318         self.local_data_ref
319             .wake_on_schedule
320             .set(self.wake_on_schedule)
321     }
322 }
323 
324 cfg_rt! {
325     /// Spawns a `!Send` future on the current [`LocalSet`] or [`LocalRuntime`].
326     ///
327     /// The spawned future will run on the same thread that called `spawn_local`.
328     ///
329     /// The provided future will start running in the background immediately
330     /// when `spawn_local` is called, even if you don't await the returned
331     /// `JoinHandle`.
332     ///
333     /// # Panics
334     ///
335     /// This function panics if called outside of a [`LocalSet`].
336     ///
337     /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the
338     /// resulting new task will _not_ be inside the `LocalSet`, so you must use
339     /// `spawn_local` if you want to stay within the `LocalSet`.
340     ///
341     /// # Examples
342     ///
343     /// ```rust
344     /// use std::rc::Rc;
345     /// use tokio::task;
346     ///
347     /// #[tokio::main]
348     /// async fn main() {
349     ///     let nonsend_data = Rc::new("my nonsend data...");
350     ///
351     ///     let local = task::LocalSet::new();
352     ///
353     ///     // Run the local task set.
354     ///     local.run_until(async move {
355     ///         let nonsend_data = nonsend_data.clone();
356     ///         task::spawn_local(async move {
357     ///             println!("{}", nonsend_data);
358     ///             // ...
359     ///         }).await.unwrap();
360     ///     }).await;
361     /// }
362     /// ```
363     ///
364     /// [`LocalSet`]: struct@crate::task::LocalSet
365     /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
366     /// [`tokio::spawn`]: fn@crate::task::spawn
367     #[track_caller]
368     pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
369     where
370         F: Future + 'static,
371         F::Output: 'static,
372     {
373         let fut_size = std::mem::size_of::<F>();
374         if fut_size > BOX_FUTURE_THRESHOLD {
375             spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
376         } else {
377             spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size))
378         }
379     }
380 
381 
382     #[track_caller]
383     pub(super) fn spawn_local_inner<F>(future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
384     where F: Future + 'static,
385           F::Output: 'static
386     {
387         use crate::runtime::{context, task};
388 
389         let mut future = Some(future);
390 
391         let res = context::with_current(|handle| {
392             Some(if handle.is_local() {
393                 if !handle.can_spawn_local_on_local_runtime() {
394                     return None;
395                 }
396 
397                 let future = future.take().unwrap();
398 
399                 #[cfg(all(
400                     tokio_unstable,
401                     tokio_taskdump,
402                     feature = "rt",
403                     target_os = "linux",
404                     any(
405                         target_arch = "aarch64",
406                         target_arch = "x86",
407                         target_arch = "x86_64"
408                     )
409                 ))]
410                 let future = task::trace::Trace::root(future);
411                 let id = task::Id::next();
412                 let task = crate::util::trace::task(future, "task", meta, id.as_u64());
413 
414                 // safety: we have verified that this is a `LocalRuntime` owned by the current thread
415                 unsafe { handle.spawn_local(task, id) }
416             } else {
417                 match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
418                     None => panic!("`spawn_local` called from outside of a `task::LocalSet` or LocalRuntime"),
419                     Some(cx) => cx.spawn(future.take().unwrap(), meta)
420                 }
421             })
422         });
423 
424         match res {
425             Ok(None) => panic!("Local tasks can only be spawned on a LocalRuntime from the thread the runtime was created on"),
426             Ok(Some(join_handle)) => join_handle,
427             Err(_) => match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
428                 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or LocalRuntime"),
429                 Some(cx) => cx.spawn(future.unwrap(), meta)
430             }
431         }
432     }
433 }
434 
435 /// Initial queue capacity.
436 const INITIAL_CAPACITY: usize = 64;
437 
438 /// Max number of tasks to poll per tick.
439 const MAX_TASKS_PER_TICK: usize = 61;
440 
441 /// How often it check the remote queue first.
442 const REMOTE_FIRST_INTERVAL: u8 = 31;
443 
444 /// Context guard for `LocalSet`
445 pub struct LocalEnterGuard {
446     ctx: Option<Rc<Context>>,
447 
448     /// Distinguishes whether the context was entered or being polled.
449     /// When we enter it, the value `wake_on_schedule` is set. In this case
450     /// `spawn_local` refers the context, whereas it is not being polled now.
451     wake_on_schedule: bool,
452 }
453 
454 impl Drop for LocalEnterGuard {
drop(&mut self)455     fn drop(&mut self) {
456         CURRENT.with(
457             |LocalData {
458                  ctx,
459                  wake_on_schedule,
460              }| {
461                 ctx.set(self.ctx.take());
462                 wake_on_schedule.set(self.wake_on_schedule);
463             },
464         );
465     }
466 }
467 
468 impl fmt::Debug for LocalEnterGuard {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result469     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470         f.debug_struct("LocalEnterGuard").finish()
471     }
472 }
473 
474 impl LocalSet {
475     /// Returns a new local task set.
new() -> LocalSet476     pub fn new() -> LocalSet {
477         let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
478 
479         LocalSet {
480             tick: Cell::new(0),
481             context: Rc::new(Context {
482                 shared: Arc::new(Shared {
483                     local_state: LocalState {
484                         owner,
485                         owned: LocalOwnedTasks::new(),
486                         local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
487                     },
488                     queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
489                     waker: AtomicWaker::new(),
490                     #[cfg(tokio_unstable)]
491                     unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
492                 }),
493                 unhandled_panic: Cell::new(false),
494             }),
495             _not_send: PhantomData,
496         }
497     }
498 
499     /// Enters the context of this `LocalSet`.
500     ///
501     /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
502     /// context you are inside.
503     ///
504     /// [`spawn_local`]: fn@crate::task::spawn_local
enter(&self) -> LocalEnterGuard505     pub fn enter(&self) -> LocalEnterGuard {
506         CURRENT.with(
507             |LocalData {
508                  ctx,
509                  wake_on_schedule,
510                  ..
511              }| {
512                 let ctx = ctx.replace(Some(self.context.clone()));
513                 let wake_on_schedule = wake_on_schedule.replace(true);
514                 LocalEnterGuard {
515                     ctx,
516                     wake_on_schedule,
517                 }
518             },
519         )
520     }
521 
522     /// Spawns a `!Send` task onto the local task set.
523     ///
524     /// This task is guaranteed to be run on the current thread.
525     ///
526     /// Unlike the free function [`spawn_local`], this method may be used to
527     /// spawn local tasks when the `LocalSet` is _not_ running. The provided
528     /// future will start running once the `LocalSet` is next started, even if
529     /// you don't await the returned `JoinHandle`.
530     ///
531     /// # Examples
532     ///
533     /// ```rust
534     /// use tokio::task;
535     ///
536     /// #[tokio::main]
537     /// async fn main() {
538     ///     let local = task::LocalSet::new();
539     ///
540     ///     // Spawn a future on the local set. This future will be run when
541     ///     // we call `run_until` to drive the task set.
542     ///     local.spawn_local(async {
543     ///        // ...
544     ///     });
545     ///
546     ///     // Run the local task set.
547     ///     local.run_until(async move {
548     ///         // ...
549     ///     }).await;
550     ///
551     ///     // When `run` finishes, we can spawn _more_ futures, which will
552     ///     // run in subsequent calls to `run_until`.
553     ///     local.spawn_local(async {
554     ///        // ...
555     ///     });
556     ///
557     ///     local.run_until(async move {
558     ///         // ...
559     ///     }).await;
560     /// }
561     /// ```
562     /// [`spawn_local`]: fn@spawn_local
563     #[track_caller]
spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,564     pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
565     where
566         F: Future + 'static,
567         F::Output: 'static,
568     {
569         let fut_size = mem::size_of::<F>();
570         if fut_size > BOX_FUTURE_THRESHOLD {
571             self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
572         } else {
573             self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
574         }
575     }
576 
577     /// Runs a future to completion on the provided runtime, driving any local
578     /// futures spawned on this task set on the current thread.
579     ///
580     /// This runs the given future on the runtime, blocking until it is
581     /// complete, and yielding its resolved result. Any tasks or timers which
582     /// the future spawns internally will be executed on the runtime. The future
583     /// may also call [`spawn_local`] to `spawn_local` additional local futures on the
584     /// current thread.
585     ///
586     /// This method should not be called from an asynchronous context.
587     ///
588     /// # Panics
589     ///
590     /// This function panics if the executor is at capacity, if the provided
591     /// future panics, or if called within an asynchronous execution context.
592     ///
593     /// # Notes
594     ///
595     /// Since this function internally calls [`Runtime::block_on`], and drives
596     /// futures in the local task set inside that call to `block_on`, the local
597     /// futures may not use [in-place blocking]. If a blocking call needs to be
598     /// issued from a local task, the [`spawn_blocking`] API may be used instead.
599     ///
600     /// For example, this will panic:
601     /// ```should_panic
602     /// use tokio::runtime::Runtime;
603     /// use tokio::task;
604     ///
605     /// let rt  = Runtime::new().unwrap();
606     /// let local = task::LocalSet::new();
607     /// local.block_on(&rt, async {
608     ///     let join = task::spawn_local(async {
609     ///         let blocking_result = task::block_in_place(|| {
610     ///             // ...
611     ///         });
612     ///         // ...
613     ///     });
614     ///     join.await.unwrap();
615     /// })
616     /// ```
617     /// This, however, will not panic:
618     /// ```
619     /// use tokio::runtime::Runtime;
620     /// use tokio::task;
621     ///
622     /// let rt  = Runtime::new().unwrap();
623     /// let local = task::LocalSet::new();
624     /// local.block_on(&rt, async {
625     ///     let join = task::spawn_local(async {
626     ///         let blocking_result = task::spawn_blocking(|| {
627     ///             // ...
628     ///         }).await;
629     ///         // ...
630     ///     });
631     ///     join.await.unwrap();
632     /// })
633     /// ```
634     ///
635     /// [`spawn_local`]: fn@spawn_local
636     /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
637     /// [in-place blocking]: fn@crate::task::block_in_place
638     /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
639     #[track_caller]
640     #[cfg(feature = "rt")]
641     #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output where F: Future,642     pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
643     where
644         F: Future,
645     {
646         rt.block_on(self.run_until(future))
647     }
648 
649     /// Runs a future to completion on the local set, returning its output.
650     ///
651     /// This returns a future that runs the given future with a local set,
652     /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
653     /// Any local futures spawned on the local set will be driven in the
654     /// background until the future passed to `run_until` completes. When the future
655     /// passed to `run_until` finishes, any local futures which have not completed
656     /// will remain on the local set, and will be driven on subsequent calls to
657     /// `run_until` or when [awaiting the local set] itself.
658     ///
659     /// # Cancel safety
660     ///
661     /// This method is cancel safe when `future` is cancel safe.
662     ///
663     /// # Examples
664     ///
665     /// ```rust
666     /// use tokio::task;
667     ///
668     /// #[tokio::main]
669     /// async fn main() {
670     ///     task::LocalSet::new().run_until(async {
671     ///         task::spawn_local(async move {
672     ///             // ...
673     ///         }).await.unwrap();
674     ///         // ...
675     ///     }).await;
676     /// }
677     /// ```
678     ///
679     /// [`spawn_local`]: fn@spawn_local
680     /// [awaiting the local set]: #awaiting-a-localset
run_until<F>(&self, future: F) -> F::Output where F: Future,681     pub async fn run_until<F>(&self, future: F) -> F::Output
682     where
683         F: Future,
684     {
685         let run_until = RunUntil {
686             future,
687             local_set: self,
688         };
689         run_until.await
690     }
691 
692     #[track_caller]
spawn_named<F>( &self, future: F, meta: SpawnMeta<'_>, ) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,693     pub(in crate::task) fn spawn_named<F>(
694         &self,
695         future: F,
696         meta: SpawnMeta<'_>,
697     ) -> JoinHandle<F::Output>
698     where
699         F: Future + 'static,
700         F::Output: 'static,
701     {
702         self.spawn_named_inner(future, meta)
703     }
704 
705     #[track_caller]
spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,706     fn spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
707     where
708         F: Future + 'static,
709         F::Output: 'static,
710     {
711         let handle = self.context.spawn(future, meta);
712 
713         // Because a task was spawned from *outside* the `LocalSet`, wake the
714         // `LocalSet` future to execute the new task, if it hasn't been woken.
715         //
716         // Spawning via the free fn `spawn` does not require this, as it can
717         // only be called from *within* a future executing on the `LocalSet` —
718         // in that case, the `LocalSet` must already be awake.
719         self.context.shared.waker.wake();
720         handle
721     }
722 
723     /// Ticks the scheduler, returning whether the local future needs to be
724     /// notified again.
tick(&self) -> bool725     fn tick(&self) -> bool {
726         for _ in 0..MAX_TASKS_PER_TICK {
727             // Make sure we didn't hit an unhandled panic
728             assert!(!self.context.unhandled_panic.get(), "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
729 
730             match self.next_task() {
731                 // Run the task
732                 //
733                 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
734                 // used. We are responsible for maintaining the invariant that
735                 // `run_unchecked` is only called on threads that spawned the
736                 // task initially. Because `LocalSet` itself is `!Send`, and
737                 // `spawn_local` spawns into the `LocalSet` on the current
738                 // thread, the invariant is maintained.
739                 Some(task) => crate::runtime::coop::budget(|| task.run()),
740                 // We have fully drained the queue of notified tasks, so the
741                 // local future doesn't need to be notified again — it can wait
742                 // until something else wakes a task in the local set.
743                 None => return false,
744             }
745         }
746 
747         true
748     }
749 
next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>>750     fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
751         let tick = self.tick.get();
752         self.tick.set(tick.wrapping_add(1));
753 
754         let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
755             self.context
756                 .shared
757                 .queue
758                 .lock()
759                 .as_mut()
760                 .and_then(|queue| queue.pop_front())
761                 .or_else(|| self.pop_local())
762         } else {
763             self.pop_local().or_else(|| {
764                 self.context
765                     .shared
766                     .queue
767                     .lock()
768                     .as_mut()
769                     .and_then(VecDeque::pop_front)
770             })
771         };
772 
773         task.map(|task| unsafe {
774             // Safety: because the `LocalSet` itself is `!Send`, we know we are
775             // on the same thread if we have access to the `LocalSet`, and can
776             // therefore access the local run queue.
777             self.context.shared.local_state.assert_owner(task)
778         })
779     }
780 
pop_local(&self) -> Option<task::Notified<Arc<Shared>>>781     fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> {
782         unsafe {
783             // Safety: because the `LocalSet` itself is `!Send`, we know we are
784             // on the same thread if we have access to the `LocalSet`, and can
785             // therefore access the local run queue.
786             self.context.shared.local_state.task_pop_front()
787         }
788     }
789 
with<T>(&self, f: impl FnOnce() -> T) -> T790     fn with<T>(&self, f: impl FnOnce() -> T) -> T {
791         CURRENT.with(|local_data| {
792             let _guard = local_data.enter(self.context.clone());
793             f()
794         })
795     }
796 
797     /// This method is like `with`, but it just calls `f` without setting the thread-local if that
798     /// fails.
with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T799     fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
800         let mut f = Some(f);
801 
802         let res = CURRENT.try_with(|local_data| {
803             let _guard = local_data.enter(self.context.clone());
804             (f.take().unwrap())()
805         });
806 
807         match res {
808             Ok(res) => res,
809             Err(_access_error) => (f.take().unwrap())(),
810         }
811     }
812 }
813 
814 cfg_unstable! {
815     impl LocalSet {
816         /// Configure how the `LocalSet` responds to an unhandled panic on a
817         /// spawned task.
818         ///
819         /// By default, an unhandled panic (i.e. a panic not caught by
820         /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s
821         /// execution. The panic is error value is forwarded to the task's
822         /// [`JoinHandle`] and all other spawned tasks continue running.
823         ///
824         /// The `unhandled_panic` option enables configuring this behavior.
825         ///
826         /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
827         ///   spawned tasks have no impact on the `LocalSet`'s execution.
828         /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
829         ///   shutdown immediately when a spawned task panics even if that
830         ///   task's `JoinHandle` has not been dropped. All other spawned tasks
831         ///   will immediately terminate and further calls to
832         ///   [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
833         ///
834         /// # Panics
835         ///
836         /// This method panics if called after the `LocalSet` has started
837         /// running.
838         ///
839         /// # Unstable
840         ///
841         /// This option is currently unstable and its implementation is
842         /// incomplete. The API may change or be removed in the future. See
843         /// tokio-rs/tokio#4516 for more details.
844         ///
845         /// # Examples
846         ///
847         /// The following demonstrates a `LocalSet` configured to shutdown on
848         /// panic. The first spawned task panics and results in the `LocalSet`
849         /// shutting down. The second spawned task never has a chance to
850         /// execute. The call to `run_until` will panic due to the runtime being
851         /// forcibly shutdown.
852         ///
853         /// ```should_panic
854         /// use tokio::runtime::UnhandledPanic;
855         ///
856         /// # #[tokio::main]
857         /// # async fn main() {
858         /// tokio::task::LocalSet::new()
859         ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
860         ///     .run_until(async {
861         ///         tokio::task::spawn_local(async { panic!("boom"); });
862         ///         tokio::task::spawn_local(async {
863         ///             // This task never completes
864         ///         });
865         ///
866         ///         // Do some work, but `run_until` will panic before it completes
867         /// # loop { tokio::task::yield_now().await; }
868         ///     })
869         ///     .await;
870         /// # }
871         /// ```
872         ///
873         /// [`JoinHandle`]: struct@crate::task::JoinHandle
874         pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
875             // TODO: This should be set as a builder
876             Rc::get_mut(&mut self.context)
877                 .and_then(|ctx| Arc::get_mut(&mut ctx.shared))
878                 .expect("Unhandled Panic behavior modified after starting LocalSet")
879                 .unhandled_panic = behavior;
880             self
881         }
882 
883         /// Returns the [`Id`] of the current `LocalSet` runtime.
884         ///
885         /// # Examples
886         ///
887         /// ```rust
888         /// use tokio::task;
889         ///
890         /// #[tokio::main]
891         /// async fn main() {
892         ///     let local_set = task::LocalSet::new();
893         ///     println!("Local set id: {}", local_set.id());
894         /// }
895         /// ```
896         ///
897         /// **Note**: This is an [unstable API][unstable]. The public API of this type
898         /// may break in 1.x releases. See [the documentation on unstable
899         /// features][unstable] for details.
900         ///
901         /// [unstable]: crate#unstable-features
902         /// [`Id`]: struct@crate::runtime::Id
903         pub fn id(&self) -> runtime::Id {
904             self.context.shared.local_state.owned.id.into()
905         }
906     }
907 }
908 
909 impl fmt::Debug for LocalSet {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result910     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
911         fmt.debug_struct("LocalSet").finish()
912     }
913 }
914 
915 impl Future for LocalSet {
916     type Output = ();
917 
poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>918     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
919         // Register the waker before starting to work
920         self.context.shared.waker.register_by_ref(cx.waker());
921 
922         if self.with(|| self.tick()) {
923             // If `tick` returns true, we need to notify the local future again:
924             // there are still tasks remaining in the run queue.
925             cx.waker().wake_by_ref();
926             Poll::Pending
927 
928         // Safety: called from the thread that owns `LocalSet`. Because
929         // `LocalSet` is `!Send`, this is safe.
930         } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
931             // If the scheduler has no remaining futures, we're done!
932             Poll::Ready(())
933         } else {
934             // There are still futures in the local set, but we've polled all the
935             // futures in the run queue. Therefore, we can just return Pending
936             // since the remaining futures will be woken from somewhere else.
937             Poll::Pending
938         }
939     }
940 }
941 
942 impl Default for LocalSet {
default() -> LocalSet943     fn default() -> LocalSet {
944         LocalSet::new()
945     }
946 }
947 
948 impl Drop for LocalSet {
drop(&mut self)949     fn drop(&mut self) {
950         self.with_if_possible(|| {
951             // Shut down all tasks in the LocalOwnedTasks and close it to
952             // prevent new tasks from ever being added.
953             unsafe {
954                 // Safety: called from the thread that owns `LocalSet`
955                 self.context.shared.local_state.close_and_shutdown_all();
956             }
957 
958             // We already called shutdown on all tasks above, so there is no
959             // need to call shutdown.
960 
961             // Safety: note that this *intentionally* bypasses the unsafe
962             // `Shared::local_queue()` method. This is in order to avoid the
963             // debug assertion that we are on the thread that owns the
964             // `LocalSet`, because on some systems (e.g. at least some macOS
965             // versions), attempting to get the current thread ID can panic due
966             // to the thread's local data that stores the thread ID being
967             // dropped *before* the `LocalSet`.
968             //
969             // Despite avoiding the assertion here, it is safe for us to access
970             // the local queue in `Drop`, because the `LocalSet` itself is
971             // `!Send`, so we can reasonably guarantee that it will not be
972             // `Drop`ped from another thread.
973             let local_queue = unsafe {
974                 // Safety: called from the thread that owns `LocalSet`
975                 self.context.shared.local_state.take_local_queue()
976             };
977             for task in local_queue {
978                 drop(task);
979             }
980 
981             // Take the queue from the Shared object to prevent pushing
982             // notifications to it in the future.
983             let queue = self.context.shared.queue.lock().take().unwrap();
984             for task in queue {
985                 drop(task);
986             }
987 
988             // Safety: called from the thread that owns `LocalSet`
989             assert!(unsafe { self.context.shared.local_state.owned_is_empty() });
990         });
991     }
992 }
993 
994 // === impl Context ===
995 
996 impl Context {
997     #[track_caller]
spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,998     fn spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
999     where
1000         F: Future + 'static,
1001         F::Output: 'static,
1002     {
1003         let id = crate::runtime::task::Id::next();
1004         let future = crate::util::trace::task(future, "local", meta, id.as_u64());
1005 
1006         // Safety: called from the thread that owns the `LocalSet`
1007         let (handle, notified) = {
1008             self.shared.local_state.assert_called_from_owner_thread();
1009             self.shared
1010                 .local_state
1011                 .owned
1012                 .bind(future, self.shared.clone(), id)
1013         };
1014 
1015         if let Some(notified) = notified {
1016             self.shared.schedule(notified);
1017         }
1018 
1019         handle
1020     }
1021 }
1022 
1023 // === impl LocalFuture ===
1024 
1025 impl<T: Future> Future for RunUntil<'_, T> {
1026     type Output = T::Output;
1027 
poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>1028     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1029         let me = self.project();
1030 
1031         me.local_set.with(|| {
1032             me.local_set
1033                 .context
1034                 .shared
1035                 .waker
1036                 .register_by_ref(cx.waker());
1037 
1038             let _no_blocking = crate::runtime::context::disallow_block_in_place();
1039             let f = me.future;
1040 
1041             if let Poll::Ready(output) = f.poll(cx) {
1042                 return Poll::Ready(output);
1043             }
1044 
1045             if me.local_set.tick() {
1046                 // If `tick` returns `true`, we need to notify the local future again:
1047                 // there are still tasks remaining in the run queue.
1048                 cx.waker().wake_by_ref();
1049             }
1050 
1051             Poll::Pending
1052         })
1053     }
1054 }
1055 
1056 impl Shared {
1057     /// Schedule the provided task on the scheduler.
schedule(&self, task: task::Notified<Arc<Self>>)1058     fn schedule(&self, task: task::Notified<Arc<Self>>) {
1059         CURRENT.with(|localdata| {
1060             match localdata.ctx.get() {
1061                 // If the current `LocalSet` is being polled, we don't need to wake it.
1062                 // When we `enter` it, then the value `wake_on_schedule` is set to be true.
1063                 // In this case it is not being polled, so we need to wake it.
1064                 Some(cx) if cx.shared.ptr_eq(self) && !localdata.wake_on_schedule.get() => unsafe {
1065                     // Safety: if the current `LocalSet` context points to this
1066                     // `LocalSet`, then we are on the thread that owns it.
1067                     cx.shared.local_state.task_push_back(task);
1068                 },
1069 
1070                 // We are on the thread that owns the `LocalSet`, so we can
1071                 // wake to the local queue.
1072                 _ if context::thread_id().ok() == Some(self.local_state.owner) => {
1073                     unsafe {
1074                         // Safety: we just checked that the thread ID matches
1075                         // the localset's owner, so this is safe.
1076                         self.local_state.task_push_back(task);
1077                     }
1078                     // We still have to wake the `LocalSet`, because it isn't
1079                     // currently being polled.
1080                     self.waker.wake();
1081                 }
1082 
1083                 // We are *not* on the thread that owns the `LocalSet`, so we
1084                 // have to wake to the remote queue.
1085                 _ => {
1086                     // First, check whether the queue is still there (if not, the
1087                     // LocalSet is dropped). Then push to it if so, and if not,
1088                     // do nothing.
1089                     let mut lock = self.queue.lock();
1090 
1091                     if let Some(queue) = lock.as_mut() {
1092                         queue.push_back(task);
1093                         drop(lock);
1094                         self.waker.wake();
1095                     }
1096                 }
1097             }
1098         });
1099     }
1100 
ptr_eq(&self, other: &Shared) -> bool1101     fn ptr_eq(&self, other: &Shared) -> bool {
1102         std::ptr::eq(self, other)
1103     }
1104 }
1105 
1106 // This is safe because (and only because) we *pinky pwomise* to never touch the
1107 // local run queue except from the thread that owns the `LocalSet`.
1108 unsafe impl Sync for Shared {}
1109 
1110 impl task::Schedule for Arc<Shared> {
release(&self, task: &Task<Self>) -> Option<Task<Self>>1111     fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
1112         // Safety, this is always called from the thread that owns `LocalSet`
1113         unsafe { self.local_state.task_remove(task) }
1114     }
1115 
schedule(&self, task: task::Notified<Self>)1116     fn schedule(&self, task: task::Notified<Self>) {
1117         Shared::schedule(self, task);
1118     }
1119 
1120     // localset does not currently support task hooks
hooks(&self) -> TaskHarnessScheduleHooks1121     fn hooks(&self) -> TaskHarnessScheduleHooks {
1122         TaskHarnessScheduleHooks {
1123             task_terminate_callback: None,
1124         }
1125     }
1126 
1127     cfg_unstable! {
1128         fn unhandled_panic(&self) {
1129             use crate::runtime::UnhandledPanic;
1130 
1131             match self.unhandled_panic {
1132                 UnhandledPanic::Ignore => {
1133                     // Do nothing
1134                 }
1135                 UnhandledPanic::ShutdownRuntime => {
1136                     // This hook is only called from within the runtime, so
1137                     // `CURRENT` should match with `&self`, i.e. there is no
1138                     // opportunity for a nested scheduler to be called.
1139                     CURRENT.with(|LocalData { ctx, .. }| match ctx.get() {
1140                         Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
1141                             cx.unhandled_panic.set(true);
1142                             // Safety: this is always called from the thread that owns `LocalSet`
1143                             unsafe { cx.shared.local_state.close_and_shutdown_all(); }
1144                         }
1145                         _ => unreachable!("runtime core not set in CURRENT thread-local"),
1146                     })
1147                 }
1148             }
1149         }
1150     }
1151 }
1152 
1153 impl LocalState {
task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>>1154     unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> {
1155         // The caller ensures it is called from the same thread that owns
1156         // the LocalSet.
1157         self.assert_called_from_owner_thread();
1158 
1159         self.local_queue.with_mut(|ptr| (*ptr).pop_front())
1160     }
1161 
task_push_back(&self, task: task::Notified<Arc<Shared>>)1162     unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) {
1163         // The caller ensures it is called from the same thread that owns
1164         // the LocalSet.
1165         self.assert_called_from_owner_thread();
1166 
1167         self.local_queue.with_mut(|ptr| (*ptr).push_back(task));
1168     }
1169 
take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>>1170     unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> {
1171         // The caller ensures it is called from the same thread that owns
1172         // the LocalSet.
1173         self.assert_called_from_owner_thread();
1174 
1175         self.local_queue.with_mut(|ptr| std::mem::take(&mut (*ptr)))
1176     }
1177 
task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>>1178     unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> {
1179         // The caller ensures it is called from the same thread that owns
1180         // the LocalSet.
1181         self.assert_called_from_owner_thread();
1182 
1183         self.owned.remove(task)
1184     }
1185 
1186     /// Returns true if the `LocalSet` does not have any spawned tasks
owned_is_empty(&self) -> bool1187     unsafe fn owned_is_empty(&self) -> bool {
1188         // The caller ensures it is called from the same thread that owns
1189         // the LocalSet.
1190         self.assert_called_from_owner_thread();
1191 
1192         self.owned.is_empty()
1193     }
1194 
assert_owner( &self, task: task::Notified<Arc<Shared>>, ) -> task::LocalNotified<Arc<Shared>>1195     unsafe fn assert_owner(
1196         &self,
1197         task: task::Notified<Arc<Shared>>,
1198     ) -> task::LocalNotified<Arc<Shared>> {
1199         // The caller ensures it is called from the same thread that owns
1200         // the LocalSet.
1201         self.assert_called_from_owner_thread();
1202 
1203         self.owned.assert_owner(task)
1204     }
1205 
close_and_shutdown_all(&self)1206     unsafe fn close_and_shutdown_all(&self) {
1207         // The caller ensures it is called from the same thread that owns
1208         // the LocalSet.
1209         self.assert_called_from_owner_thread();
1210 
1211         self.owned.close_and_shutdown_all();
1212     }
1213 
1214     #[track_caller]
assert_called_from_owner_thread(&self)1215     fn assert_called_from_owner_thread(&self) {
1216         // FreeBSD has some weirdness around thread-local destruction.
1217         // TODO: remove this hack when thread id is cleaned up
1218         #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
1219         debug_assert!(
1220             // if we couldn't get the thread ID because we're dropping the local
1221             // data, skip the assertion --- the `Drop` impl is not going to be
1222             // called from another thread, because `LocalSet` is `!Send`
1223             context::thread_id()
1224                 .map(|id| id == self.owner)
1225                 .unwrap_or(true),
1226             "`LocalSet`'s local run queue must not be accessed by another thread!"
1227         );
1228     }
1229 }
1230 
1231 // This is `Send` because it is stored in `Shared`. It is up to the caller to
1232 // ensure they are on the same thread that owns the `LocalSet`.
1233 unsafe impl Send for LocalState {}
1234 
1235 #[cfg(all(test, not(loom)))]
1236 mod tests {
1237     use super::*;
1238 
1239     // Does a `LocalSet` running on a current-thread runtime...basically work?
1240     //
1241     // This duplicates a test in `tests/task_local_set.rs`, but because this is
1242     // a lib test, it will run under Miri, so this is necessary to catch stacked
1243     // borrows violations in the `LocalSet` implementation.
1244     #[test]
local_current_thread_scheduler()1245     fn local_current_thread_scheduler() {
1246         let f = async {
1247             LocalSet::new()
1248                 .run_until(async {
1249                     spawn_local(async {}).await.unwrap();
1250                 })
1251                 .await;
1252         };
1253         crate::runtime::Builder::new_current_thread()
1254             .build()
1255             .expect("rt")
1256             .block_on(f)
1257     }
1258 
1259     // Tests that when a task on a `LocalSet` is woken by an io driver on the
1260     // same thread, the task is woken to the localset's local queue rather than
1261     // its remote queue.
1262     //
1263     // This test has to be defined in the `local.rs` file as a lib test, rather
1264     // than in `tests/`, because it makes assertions about the local set's
1265     // internal state.
1266     #[test]
wakes_to_local_queue()1267     fn wakes_to_local_queue() {
1268         use super::*;
1269         use crate::sync::Notify;
1270         let rt = crate::runtime::Builder::new_current_thread()
1271             .build()
1272             .expect("rt");
1273         rt.block_on(async {
1274             let local = LocalSet::new();
1275             let notify = Arc::new(Notify::new());
1276             let task = local.spawn_local({
1277                 let notify = notify.clone();
1278                 async move {
1279                     notify.notified().await;
1280                 }
1281             });
1282             let mut run_until = Box::pin(local.run_until(async move {
1283                 task.await.unwrap();
1284             }));
1285 
1286             // poll the run until future once
1287             std::future::poll_fn(|cx| {
1288                 let _ = run_until.as_mut().poll(cx);
1289                 Poll::Ready(())
1290             })
1291             .await;
1292 
1293             notify.notify_one();
1294             let task = unsafe { local.context.shared.local_state.task_pop_front() };
1295             // TODO(eliza): it would be nice to be able to assert that this is
1296             // the local task.
1297             assert!(
1298                 task.is_some(),
1299                 "task should have been notified to the LocalSet's local queue"
1300             );
1301         })
1302     }
1303 }
1304