1 //! Runs `!Send` futures on the current thread. 2 use crate::loom::cell::UnsafeCell; 3 use crate::loom::sync::{Arc, Mutex}; 4 #[cfg(tokio_unstable)] 5 use crate::runtime; 6 use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task, TaskHarnessScheduleHooks}; 7 use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD}; 8 use crate::sync::AtomicWaker; 9 use crate::util::trace::SpawnMeta; 10 use crate::util::RcCell; 11 12 use std::cell::Cell; 13 use std::collections::VecDeque; 14 use std::fmt; 15 use std::future::Future; 16 use std::marker::PhantomData; 17 use std::mem; 18 use std::pin::Pin; 19 use std::rc::Rc; 20 use std::task::Poll; 21 22 use pin_project_lite::pin_project; 23 24 cfg_rt! { 25 /// A set of tasks which are executed on the same thread. 26 /// 27 /// In some cases, it is necessary to run one or more futures that do not 28 /// implement [`Send`] and thus are unsafe to send between threads. In these 29 /// cases, a [local task set] may be used to schedule one or more `!Send` 30 /// futures to run together on the same thread. 31 /// 32 /// For example, the following code will not compile: 33 /// 34 /// ```rust,compile_fail 35 /// use std::rc::Rc; 36 /// 37 /// #[tokio::main] 38 /// async fn main() { 39 /// // `Rc` does not implement `Send`, and thus may not be sent between 40 /// // threads safely. 41 /// let nonsend_data = Rc::new("my nonsend data..."); 42 /// 43 /// let nonsend_data = nonsend_data.clone(); 44 /// // Because the `async` block here moves `nonsend_data`, the future is `!Send`. 45 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this 46 /// // will not compile. 47 /// tokio::spawn(async move { 48 /// println!("{}", nonsend_data); 49 /// // ... 50 /// }).await.unwrap(); 51 /// } 52 /// ``` 53 /// 54 /// # Use with `run_until` 55 /// 56 /// To spawn `!Send` futures, we can use a local task set to schedule them 57 /// on the thread calling [`Runtime::block_on`]. When running inside of the 58 /// local task set, we can use [`task::spawn_local`], which can spawn 59 /// `!Send` futures. For example: 60 /// 61 /// ```rust 62 /// use std::rc::Rc; 63 /// use tokio::task; 64 /// 65 /// #[tokio::main] 66 /// async fn main() { 67 /// let nonsend_data = Rc::new("my nonsend data..."); 68 /// 69 /// // Construct a local task set that can run `!Send` futures. 70 /// let local = task::LocalSet::new(); 71 /// 72 /// // Run the local task set. 73 /// local.run_until(async move { 74 /// let nonsend_data = nonsend_data.clone(); 75 /// // `spawn_local` ensures that the future is spawned on the local 76 /// // task set. 77 /// task::spawn_local(async move { 78 /// println!("{}", nonsend_data); 79 /// // ... 80 /// }).await.unwrap(); 81 /// }).await; 82 /// } 83 /// ``` 84 /// **Note:** The `run_until` method can only be used in `#[tokio::main]`, 85 /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It 86 /// cannot be used inside a task spawned with `tokio::spawn`. 87 /// 88 /// ## Awaiting a `LocalSet` 89 /// 90 /// Additionally, a `LocalSet` itself implements `Future`, completing when 91 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run 92 /// several futures on a `LocalSet` and drive the whole set until they 93 /// complete. For example, 94 /// 95 /// ```rust 96 /// use tokio::{task, time}; 97 /// use std::rc::Rc; 98 /// 99 /// #[tokio::main] 100 /// async fn main() { 101 /// let nonsend_data = Rc::new("world"); 102 /// let local = task::LocalSet::new(); 103 /// 104 /// let nonsend_data2 = nonsend_data.clone(); 105 /// local.spawn_local(async move { 106 /// // ... 107 /// println!("hello {}", nonsend_data2) 108 /// }); 109 /// 110 /// local.spawn_local(async move { 111 /// time::sleep(time::Duration::from_millis(100)).await; 112 /// println!("goodbye {}", nonsend_data) 113 /// }); 114 /// 115 /// // ... 116 /// 117 /// local.await; 118 /// } 119 /// ``` 120 /// **Note:** Awaiting a `LocalSet` can only be done inside 121 /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to 122 /// [`Runtime::block_on`]. It cannot be used inside a task spawned with 123 /// `tokio::spawn`. 124 /// 125 /// ## Use inside `tokio::spawn` 126 /// 127 /// The two methods mentioned above cannot be used inside `tokio::spawn`, so 128 /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do 129 /// something else. The solution is to create the `LocalSet` somewhere else, 130 /// and communicate with it using an [`mpsc`] channel. 131 /// 132 /// The following example puts the `LocalSet` inside a new thread. 133 /// ``` 134 /// use tokio::runtime::Builder; 135 /// use tokio::sync::{mpsc, oneshot}; 136 /// use tokio::task::LocalSet; 137 /// 138 /// // This struct describes the task you want to spawn. Here we include 139 /// // some simple examples. The oneshot channel allows sending a response 140 /// // to the spawner. 141 /// #[derive(Debug)] 142 /// enum Task { 143 /// PrintNumber(u32), 144 /// AddOne(u32, oneshot::Sender<u32>), 145 /// } 146 /// 147 /// #[derive(Clone)] 148 /// struct LocalSpawner { 149 /// send: mpsc::UnboundedSender<Task>, 150 /// } 151 /// 152 /// impl LocalSpawner { 153 /// pub fn new() -> Self { 154 /// let (send, mut recv) = mpsc::unbounded_channel(); 155 /// 156 /// let rt = Builder::new_current_thread() 157 /// .enable_all() 158 /// .build() 159 /// .unwrap(); 160 /// 161 /// std::thread::spawn(move || { 162 /// let local = LocalSet::new(); 163 /// 164 /// local.spawn_local(async move { 165 /// while let Some(new_task) = recv.recv().await { 166 /// tokio::task::spawn_local(run_task(new_task)); 167 /// } 168 /// // If the while loop returns, then all the LocalSpawner 169 /// // objects have been dropped. 170 /// }); 171 /// 172 /// // This will return once all senders are dropped and all 173 /// // spawned tasks have returned. 174 /// rt.block_on(local); 175 /// }); 176 /// 177 /// Self { 178 /// send, 179 /// } 180 /// } 181 /// 182 /// pub fn spawn(&self, task: Task) { 183 /// self.send.send(task).expect("Thread with LocalSet has shut down."); 184 /// } 185 /// } 186 /// 187 /// // This task may do !Send stuff. We use printing a number as an example, 188 /// // but it could be anything. 189 /// // 190 /// // The Task struct is an enum to support spawning many different kinds 191 /// // of operations. 192 /// async fn run_task(task: Task) { 193 /// match task { 194 /// Task::PrintNumber(n) => { 195 /// println!("{}", n); 196 /// }, 197 /// Task::AddOne(n, response) => { 198 /// // We ignore failures to send the response. 199 /// let _ = response.send(n + 1); 200 /// }, 201 /// } 202 /// } 203 /// 204 /// #[tokio::main] 205 /// async fn main() { 206 /// let spawner = LocalSpawner::new(); 207 /// 208 /// let (send, response) = oneshot::channel(); 209 /// spawner.spawn(Task::AddOne(10, send)); 210 /// let eleven = response.await.unwrap(); 211 /// assert_eq!(eleven, 11); 212 /// } 213 /// ``` 214 /// 215 /// [`Send`]: trait@std::marker::Send 216 /// [local task set]: struct@LocalSet 217 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on 218 /// [`task::spawn_local`]: fn@spawn_local 219 /// [`mpsc`]: mod@crate::sync::mpsc 220 pub struct LocalSet { 221 /// Current scheduler tick. 222 tick: Cell<u8>, 223 224 /// State available from thread-local. 225 context: Rc<Context>, 226 227 /// This type should not be Send. 228 _not_send: PhantomData<*const ()>, 229 } 230 } 231 232 /// State available from the thread-local. 233 struct Context { 234 /// State shared between threads. 235 shared: Arc<Shared>, 236 237 /// True if a task panicked without being handled and the local set is 238 /// configured to shutdown on unhandled panic. 239 unhandled_panic: Cell<bool>, 240 } 241 242 /// `LocalSet` state shared between threads. 243 struct Shared { 244 /// # Safety 245 /// 246 /// This field must *only* be accessed from the thread that owns the 247 /// `LocalSet` (i.e., `Thread::current().id() == owner`). 248 local_state: LocalState, 249 250 /// Remote run queue sender. 251 queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>, 252 253 /// Wake the `LocalSet` task. 254 waker: AtomicWaker, 255 256 /// How to respond to unhandled task panics. 257 #[cfg(tokio_unstable)] 258 pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, 259 } 260 261 /// Tracks the `LocalSet` state that must only be accessed from the thread that 262 /// created the `LocalSet`. 263 struct LocalState { 264 /// The `ThreadId` of the thread that owns the `LocalSet`. 265 owner: ThreadId, 266 267 /// Local run queue sender and receiver. 268 local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>, 269 270 /// Collection of all active tasks spawned onto this executor. 271 owned: LocalOwnedTasks<Arc<Shared>>, 272 } 273 274 pin_project! { 275 #[derive(Debug)] 276 struct RunUntil<'a, F> { 277 local_set: &'a LocalSet, 278 #[pin] 279 future: F, 280 } 281 } 282 283 tokio_thread_local!(static CURRENT: LocalData = const { LocalData { 284 ctx: RcCell::new(), 285 wake_on_schedule: Cell::new(false), 286 } }); 287 288 struct LocalData { 289 ctx: RcCell<Context>, 290 wake_on_schedule: Cell<bool>, 291 } 292 293 impl LocalData { 294 /// Should be called except when we call `LocalSet::enter`. 295 /// Especially when we poll a `LocalSet`. 296 #[must_use = "dropping this guard will reset the entered state"] enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_>297 fn enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_> { 298 let ctx = self.ctx.replace(Some(ctx)); 299 let wake_on_schedule = self.wake_on_schedule.replace(false); 300 LocalDataEnterGuard { 301 local_data_ref: self, 302 ctx, 303 wake_on_schedule, 304 } 305 } 306 } 307 308 /// A guard for `LocalData::enter()` 309 struct LocalDataEnterGuard<'a> { 310 local_data_ref: &'a LocalData, 311 ctx: Option<Rc<Context>>, 312 wake_on_schedule: bool, 313 } 314 315 impl<'a> Drop for LocalDataEnterGuard<'a> { drop(&mut self)316 fn drop(&mut self) { 317 self.local_data_ref.ctx.set(self.ctx.take()); 318 self.local_data_ref 319 .wake_on_schedule 320 .set(self.wake_on_schedule) 321 } 322 } 323 324 cfg_rt! { 325 /// Spawns a `!Send` future on the current [`LocalSet`] or [`LocalRuntime`]. 326 /// 327 /// The spawned future will run on the same thread that called `spawn_local`. 328 /// 329 /// The provided future will start running in the background immediately 330 /// when `spawn_local` is called, even if you don't await the returned 331 /// `JoinHandle`. 332 /// 333 /// # Panics 334 /// 335 /// This function panics if called outside of a [`LocalSet`]. 336 /// 337 /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the 338 /// resulting new task will _not_ be inside the `LocalSet`, so you must use 339 /// `spawn_local` if you want to stay within the `LocalSet`. 340 /// 341 /// # Examples 342 /// 343 /// ```rust 344 /// use std::rc::Rc; 345 /// use tokio::task; 346 /// 347 /// #[tokio::main] 348 /// async fn main() { 349 /// let nonsend_data = Rc::new("my nonsend data..."); 350 /// 351 /// let local = task::LocalSet::new(); 352 /// 353 /// // Run the local task set. 354 /// local.run_until(async move { 355 /// let nonsend_data = nonsend_data.clone(); 356 /// task::spawn_local(async move { 357 /// println!("{}", nonsend_data); 358 /// // ... 359 /// }).await.unwrap(); 360 /// }).await; 361 /// } 362 /// ``` 363 /// 364 /// [`LocalSet`]: struct@crate::task::LocalSet 365 /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime 366 /// [`tokio::spawn`]: fn@crate::task::spawn 367 #[track_caller] 368 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output> 369 where 370 F: Future + 'static, 371 F::Output: 'static, 372 { 373 let fut_size = std::mem::size_of::<F>(); 374 if fut_size > BOX_FUTURE_THRESHOLD { 375 spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) 376 } else { 377 spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size)) 378 } 379 } 380 381 382 #[track_caller] 383 pub(super) fn spawn_local_inner<F>(future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output> 384 where F: Future + 'static, 385 F::Output: 'static 386 { 387 use crate::runtime::{context, task}; 388 389 let mut future = Some(future); 390 391 let res = context::with_current(|handle| { 392 Some(if handle.is_local() { 393 if !handle.can_spawn_local_on_local_runtime() { 394 return None; 395 } 396 397 let future = future.take().unwrap(); 398 399 #[cfg(all( 400 tokio_unstable, 401 tokio_taskdump, 402 feature = "rt", 403 target_os = "linux", 404 any( 405 target_arch = "aarch64", 406 target_arch = "x86", 407 target_arch = "x86_64" 408 ) 409 ))] 410 let future = task::trace::Trace::root(future); 411 let id = task::Id::next(); 412 let task = crate::util::trace::task(future, "task", meta, id.as_u64()); 413 414 // safety: we have verified that this is a `LocalRuntime` owned by the current thread 415 unsafe { handle.spawn_local(task, id) } 416 } else { 417 match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) { 418 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or LocalRuntime"), 419 Some(cx) => cx.spawn(future.take().unwrap(), meta) 420 } 421 }) 422 }); 423 424 match res { 425 Ok(None) => panic!("Local tasks can only be spawned on a LocalRuntime from the thread the runtime was created on"), 426 Ok(Some(join_handle)) => join_handle, 427 Err(_) => match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) { 428 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or LocalRuntime"), 429 Some(cx) => cx.spawn(future.unwrap(), meta) 430 } 431 } 432 } 433 } 434 435 /// Initial queue capacity. 436 const INITIAL_CAPACITY: usize = 64; 437 438 /// Max number of tasks to poll per tick. 439 const MAX_TASKS_PER_TICK: usize = 61; 440 441 /// How often it check the remote queue first. 442 const REMOTE_FIRST_INTERVAL: u8 = 31; 443 444 /// Context guard for `LocalSet` 445 pub struct LocalEnterGuard { 446 ctx: Option<Rc<Context>>, 447 448 /// Distinguishes whether the context was entered or being polled. 449 /// When we enter it, the value `wake_on_schedule` is set. In this case 450 /// `spawn_local` refers the context, whereas it is not being polled now. 451 wake_on_schedule: bool, 452 } 453 454 impl Drop for LocalEnterGuard { drop(&mut self)455 fn drop(&mut self) { 456 CURRENT.with( 457 |LocalData { 458 ctx, 459 wake_on_schedule, 460 }| { 461 ctx.set(self.ctx.take()); 462 wake_on_schedule.set(self.wake_on_schedule); 463 }, 464 ); 465 } 466 } 467 468 impl fmt::Debug for LocalEnterGuard { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result469 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 470 f.debug_struct("LocalEnterGuard").finish() 471 } 472 } 473 474 impl LocalSet { 475 /// Returns a new local task set. new() -> LocalSet476 pub fn new() -> LocalSet { 477 let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown"); 478 479 LocalSet { 480 tick: Cell::new(0), 481 context: Rc::new(Context { 482 shared: Arc::new(Shared { 483 local_state: LocalState { 484 owner, 485 owned: LocalOwnedTasks::new(), 486 local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)), 487 }, 488 queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), 489 waker: AtomicWaker::new(), 490 #[cfg(tokio_unstable)] 491 unhandled_panic: crate::runtime::UnhandledPanic::Ignore, 492 }), 493 unhandled_panic: Cell::new(false), 494 }), 495 _not_send: PhantomData, 496 } 497 } 498 499 /// Enters the context of this `LocalSet`. 500 /// 501 /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose 502 /// context you are inside. 503 /// 504 /// [`spawn_local`]: fn@crate::task::spawn_local enter(&self) -> LocalEnterGuard505 pub fn enter(&self) -> LocalEnterGuard { 506 CURRENT.with( 507 |LocalData { 508 ctx, 509 wake_on_schedule, 510 .. 511 }| { 512 let ctx = ctx.replace(Some(self.context.clone())); 513 let wake_on_schedule = wake_on_schedule.replace(true); 514 LocalEnterGuard { 515 ctx, 516 wake_on_schedule, 517 } 518 }, 519 ) 520 } 521 522 /// Spawns a `!Send` task onto the local task set. 523 /// 524 /// This task is guaranteed to be run on the current thread. 525 /// 526 /// Unlike the free function [`spawn_local`], this method may be used to 527 /// spawn local tasks when the `LocalSet` is _not_ running. The provided 528 /// future will start running once the `LocalSet` is next started, even if 529 /// you don't await the returned `JoinHandle`. 530 /// 531 /// # Examples 532 /// 533 /// ```rust 534 /// use tokio::task; 535 /// 536 /// #[tokio::main] 537 /// async fn main() { 538 /// let local = task::LocalSet::new(); 539 /// 540 /// // Spawn a future on the local set. This future will be run when 541 /// // we call `run_until` to drive the task set. 542 /// local.spawn_local(async { 543 /// // ... 544 /// }); 545 /// 546 /// // Run the local task set. 547 /// local.run_until(async move { 548 /// // ... 549 /// }).await; 550 /// 551 /// // When `run` finishes, we can spawn _more_ futures, which will 552 /// // run in subsequent calls to `run_until`. 553 /// local.spawn_local(async { 554 /// // ... 555 /// }); 556 /// 557 /// local.run_until(async move { 558 /// // ... 559 /// }).await; 560 /// } 561 /// ``` 562 /// [`spawn_local`]: fn@spawn_local 563 #[track_caller] spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,564 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> 565 where 566 F: Future + 'static, 567 F::Output: 'static, 568 { 569 let fut_size = mem::size_of::<F>(); 570 if fut_size > BOX_FUTURE_THRESHOLD { 571 self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) 572 } else { 573 self.spawn_named(future, SpawnMeta::new_unnamed(fut_size)) 574 } 575 } 576 577 /// Runs a future to completion on the provided runtime, driving any local 578 /// futures spawned on this task set on the current thread. 579 /// 580 /// This runs the given future on the runtime, blocking until it is 581 /// complete, and yielding its resolved result. Any tasks or timers which 582 /// the future spawns internally will be executed on the runtime. The future 583 /// may also call [`spawn_local`] to `spawn_local` additional local futures on the 584 /// current thread. 585 /// 586 /// This method should not be called from an asynchronous context. 587 /// 588 /// # Panics 589 /// 590 /// This function panics if the executor is at capacity, if the provided 591 /// future panics, or if called within an asynchronous execution context. 592 /// 593 /// # Notes 594 /// 595 /// Since this function internally calls [`Runtime::block_on`], and drives 596 /// futures in the local task set inside that call to `block_on`, the local 597 /// futures may not use [in-place blocking]. If a blocking call needs to be 598 /// issued from a local task, the [`spawn_blocking`] API may be used instead. 599 /// 600 /// For example, this will panic: 601 /// ```should_panic 602 /// use tokio::runtime::Runtime; 603 /// use tokio::task; 604 /// 605 /// let rt = Runtime::new().unwrap(); 606 /// let local = task::LocalSet::new(); 607 /// local.block_on(&rt, async { 608 /// let join = task::spawn_local(async { 609 /// let blocking_result = task::block_in_place(|| { 610 /// // ... 611 /// }); 612 /// // ... 613 /// }); 614 /// join.await.unwrap(); 615 /// }) 616 /// ``` 617 /// This, however, will not panic: 618 /// ``` 619 /// use tokio::runtime::Runtime; 620 /// use tokio::task; 621 /// 622 /// let rt = Runtime::new().unwrap(); 623 /// let local = task::LocalSet::new(); 624 /// local.block_on(&rt, async { 625 /// let join = task::spawn_local(async { 626 /// let blocking_result = task::spawn_blocking(|| { 627 /// // ... 628 /// }).await; 629 /// // ... 630 /// }); 631 /// join.await.unwrap(); 632 /// }) 633 /// ``` 634 /// 635 /// [`spawn_local`]: fn@spawn_local 636 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on 637 /// [in-place blocking]: fn@crate::task::block_in_place 638 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking 639 #[track_caller] 640 #[cfg(feature = "rt")] 641 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output where F: Future,642 pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output 643 where 644 F: Future, 645 { 646 rt.block_on(self.run_until(future)) 647 } 648 649 /// Runs a future to completion on the local set, returning its output. 650 /// 651 /// This returns a future that runs the given future with a local set, 652 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures. 653 /// Any local futures spawned on the local set will be driven in the 654 /// background until the future passed to `run_until` completes. When the future 655 /// passed to `run_until` finishes, any local futures which have not completed 656 /// will remain on the local set, and will be driven on subsequent calls to 657 /// `run_until` or when [awaiting the local set] itself. 658 /// 659 /// # Cancel safety 660 /// 661 /// This method is cancel safe when `future` is cancel safe. 662 /// 663 /// # Examples 664 /// 665 /// ```rust 666 /// use tokio::task; 667 /// 668 /// #[tokio::main] 669 /// async fn main() { 670 /// task::LocalSet::new().run_until(async { 671 /// task::spawn_local(async move { 672 /// // ... 673 /// }).await.unwrap(); 674 /// // ... 675 /// }).await; 676 /// } 677 /// ``` 678 /// 679 /// [`spawn_local`]: fn@spawn_local 680 /// [awaiting the local set]: #awaiting-a-localset run_until<F>(&self, future: F) -> F::Output where F: Future,681 pub async fn run_until<F>(&self, future: F) -> F::Output 682 where 683 F: Future, 684 { 685 let run_until = RunUntil { 686 future, 687 local_set: self, 688 }; 689 run_until.await 690 } 691 692 #[track_caller] spawn_named<F>( &self, future: F, meta: SpawnMeta<'_>, ) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,693 pub(in crate::task) fn spawn_named<F>( 694 &self, 695 future: F, 696 meta: SpawnMeta<'_>, 697 ) -> JoinHandle<F::Output> 698 where 699 F: Future + 'static, 700 F::Output: 'static, 701 { 702 self.spawn_named_inner(future, meta) 703 } 704 705 #[track_caller] spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,706 fn spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output> 707 where 708 F: Future + 'static, 709 F::Output: 'static, 710 { 711 let handle = self.context.spawn(future, meta); 712 713 // Because a task was spawned from *outside* the `LocalSet`, wake the 714 // `LocalSet` future to execute the new task, if it hasn't been woken. 715 // 716 // Spawning via the free fn `spawn` does not require this, as it can 717 // only be called from *within* a future executing on the `LocalSet` — 718 // in that case, the `LocalSet` must already be awake. 719 self.context.shared.waker.wake(); 720 handle 721 } 722 723 /// Ticks the scheduler, returning whether the local future needs to be 724 /// notified again. tick(&self) -> bool725 fn tick(&self) -> bool { 726 for _ in 0..MAX_TASKS_PER_TICK { 727 // Make sure we didn't hit an unhandled panic 728 assert!(!self.context.unhandled_panic.get(), "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic"); 729 730 match self.next_task() { 731 // Run the task 732 // 733 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be 734 // used. We are responsible for maintaining the invariant that 735 // `run_unchecked` is only called on threads that spawned the 736 // task initially. Because `LocalSet` itself is `!Send`, and 737 // `spawn_local` spawns into the `LocalSet` on the current 738 // thread, the invariant is maintained. 739 Some(task) => crate::runtime::coop::budget(|| task.run()), 740 // We have fully drained the queue of notified tasks, so the 741 // local future doesn't need to be notified again — it can wait 742 // until something else wakes a task in the local set. 743 None => return false, 744 } 745 } 746 747 true 748 } 749 next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>>750 fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> { 751 let tick = self.tick.get(); 752 self.tick.set(tick.wrapping_add(1)); 753 754 let task = if tick % REMOTE_FIRST_INTERVAL == 0 { 755 self.context 756 .shared 757 .queue 758 .lock() 759 .as_mut() 760 .and_then(|queue| queue.pop_front()) 761 .or_else(|| self.pop_local()) 762 } else { 763 self.pop_local().or_else(|| { 764 self.context 765 .shared 766 .queue 767 .lock() 768 .as_mut() 769 .and_then(VecDeque::pop_front) 770 }) 771 }; 772 773 task.map(|task| unsafe { 774 // Safety: because the `LocalSet` itself is `!Send`, we know we are 775 // on the same thread if we have access to the `LocalSet`, and can 776 // therefore access the local run queue. 777 self.context.shared.local_state.assert_owner(task) 778 }) 779 } 780 pop_local(&self) -> Option<task::Notified<Arc<Shared>>>781 fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> { 782 unsafe { 783 // Safety: because the `LocalSet` itself is `!Send`, we know we are 784 // on the same thread if we have access to the `LocalSet`, and can 785 // therefore access the local run queue. 786 self.context.shared.local_state.task_pop_front() 787 } 788 } 789 with<T>(&self, f: impl FnOnce() -> T) -> T790 fn with<T>(&self, f: impl FnOnce() -> T) -> T { 791 CURRENT.with(|local_data| { 792 let _guard = local_data.enter(self.context.clone()); 793 f() 794 }) 795 } 796 797 /// This method is like `with`, but it just calls `f` without setting the thread-local if that 798 /// fails. with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T799 fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T { 800 let mut f = Some(f); 801 802 let res = CURRENT.try_with(|local_data| { 803 let _guard = local_data.enter(self.context.clone()); 804 (f.take().unwrap())() 805 }); 806 807 match res { 808 Ok(res) => res, 809 Err(_access_error) => (f.take().unwrap())(), 810 } 811 } 812 } 813 814 cfg_unstable! { 815 impl LocalSet { 816 /// Configure how the `LocalSet` responds to an unhandled panic on a 817 /// spawned task. 818 /// 819 /// By default, an unhandled panic (i.e. a panic not caught by 820 /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s 821 /// execution. The panic is error value is forwarded to the task's 822 /// [`JoinHandle`] and all other spawned tasks continue running. 823 /// 824 /// The `unhandled_panic` option enables configuring this behavior. 825 /// 826 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on 827 /// spawned tasks have no impact on the `LocalSet`'s execution. 828 /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to 829 /// shutdown immediately when a spawned task panics even if that 830 /// task's `JoinHandle` has not been dropped. All other spawned tasks 831 /// will immediately terminate and further calls to 832 /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic. 833 /// 834 /// # Panics 835 /// 836 /// This method panics if called after the `LocalSet` has started 837 /// running. 838 /// 839 /// # Unstable 840 /// 841 /// This option is currently unstable and its implementation is 842 /// incomplete. The API may change or be removed in the future. See 843 /// tokio-rs/tokio#4516 for more details. 844 /// 845 /// # Examples 846 /// 847 /// The following demonstrates a `LocalSet` configured to shutdown on 848 /// panic. The first spawned task panics and results in the `LocalSet` 849 /// shutting down. The second spawned task never has a chance to 850 /// execute. The call to `run_until` will panic due to the runtime being 851 /// forcibly shutdown. 852 /// 853 /// ```should_panic 854 /// use tokio::runtime::UnhandledPanic; 855 /// 856 /// # #[tokio::main] 857 /// # async fn main() { 858 /// tokio::task::LocalSet::new() 859 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) 860 /// .run_until(async { 861 /// tokio::task::spawn_local(async { panic!("boom"); }); 862 /// tokio::task::spawn_local(async { 863 /// // This task never completes 864 /// }); 865 /// 866 /// // Do some work, but `run_until` will panic before it completes 867 /// # loop { tokio::task::yield_now().await; } 868 /// }) 869 /// .await; 870 /// # } 871 /// ``` 872 /// 873 /// [`JoinHandle`]: struct@crate::task::JoinHandle 874 pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self { 875 // TODO: This should be set as a builder 876 Rc::get_mut(&mut self.context) 877 .and_then(|ctx| Arc::get_mut(&mut ctx.shared)) 878 .expect("Unhandled Panic behavior modified after starting LocalSet") 879 .unhandled_panic = behavior; 880 self 881 } 882 883 /// Returns the [`Id`] of the current `LocalSet` runtime. 884 /// 885 /// # Examples 886 /// 887 /// ```rust 888 /// use tokio::task; 889 /// 890 /// #[tokio::main] 891 /// async fn main() { 892 /// let local_set = task::LocalSet::new(); 893 /// println!("Local set id: {}", local_set.id()); 894 /// } 895 /// ``` 896 /// 897 /// **Note**: This is an [unstable API][unstable]. The public API of this type 898 /// may break in 1.x releases. See [the documentation on unstable 899 /// features][unstable] for details. 900 /// 901 /// [unstable]: crate#unstable-features 902 /// [`Id`]: struct@crate::runtime::Id 903 pub fn id(&self) -> runtime::Id { 904 self.context.shared.local_state.owned.id.into() 905 } 906 } 907 } 908 909 impl fmt::Debug for LocalSet { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result910 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 911 fmt.debug_struct("LocalSet").finish() 912 } 913 } 914 915 impl Future for LocalSet { 916 type Output = (); 917 poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>918 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { 919 // Register the waker before starting to work 920 self.context.shared.waker.register_by_ref(cx.waker()); 921 922 if self.with(|| self.tick()) { 923 // If `tick` returns true, we need to notify the local future again: 924 // there are still tasks remaining in the run queue. 925 cx.waker().wake_by_ref(); 926 Poll::Pending 927 928 // Safety: called from the thread that owns `LocalSet`. Because 929 // `LocalSet` is `!Send`, this is safe. 930 } else if unsafe { self.context.shared.local_state.owned_is_empty() } { 931 // If the scheduler has no remaining futures, we're done! 932 Poll::Ready(()) 933 } else { 934 // There are still futures in the local set, but we've polled all the 935 // futures in the run queue. Therefore, we can just return Pending 936 // since the remaining futures will be woken from somewhere else. 937 Poll::Pending 938 } 939 } 940 } 941 942 impl Default for LocalSet { default() -> LocalSet943 fn default() -> LocalSet { 944 LocalSet::new() 945 } 946 } 947 948 impl Drop for LocalSet { drop(&mut self)949 fn drop(&mut self) { 950 self.with_if_possible(|| { 951 // Shut down all tasks in the LocalOwnedTasks and close it to 952 // prevent new tasks from ever being added. 953 unsafe { 954 // Safety: called from the thread that owns `LocalSet` 955 self.context.shared.local_state.close_and_shutdown_all(); 956 } 957 958 // We already called shutdown on all tasks above, so there is no 959 // need to call shutdown. 960 961 // Safety: note that this *intentionally* bypasses the unsafe 962 // `Shared::local_queue()` method. This is in order to avoid the 963 // debug assertion that we are on the thread that owns the 964 // `LocalSet`, because on some systems (e.g. at least some macOS 965 // versions), attempting to get the current thread ID can panic due 966 // to the thread's local data that stores the thread ID being 967 // dropped *before* the `LocalSet`. 968 // 969 // Despite avoiding the assertion here, it is safe for us to access 970 // the local queue in `Drop`, because the `LocalSet` itself is 971 // `!Send`, so we can reasonably guarantee that it will not be 972 // `Drop`ped from another thread. 973 let local_queue = unsafe { 974 // Safety: called from the thread that owns `LocalSet` 975 self.context.shared.local_state.take_local_queue() 976 }; 977 for task in local_queue { 978 drop(task); 979 } 980 981 // Take the queue from the Shared object to prevent pushing 982 // notifications to it in the future. 983 let queue = self.context.shared.queue.lock().take().unwrap(); 984 for task in queue { 985 drop(task); 986 } 987 988 // Safety: called from the thread that owns `LocalSet` 989 assert!(unsafe { self.context.shared.local_state.owned_is_empty() }); 990 }); 991 } 992 } 993 994 // === impl Context === 995 996 impl Context { 997 #[track_caller] spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,998 fn spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output> 999 where 1000 F: Future + 'static, 1001 F::Output: 'static, 1002 { 1003 let id = crate::runtime::task::Id::next(); 1004 let future = crate::util::trace::task(future, "local", meta, id.as_u64()); 1005 1006 // Safety: called from the thread that owns the `LocalSet` 1007 let (handle, notified) = { 1008 self.shared.local_state.assert_called_from_owner_thread(); 1009 self.shared 1010 .local_state 1011 .owned 1012 .bind(future, self.shared.clone(), id) 1013 }; 1014 1015 if let Some(notified) = notified { 1016 self.shared.schedule(notified); 1017 } 1018 1019 handle 1020 } 1021 } 1022 1023 // === impl LocalFuture === 1024 1025 impl<T: Future> Future for RunUntil<'_, T> { 1026 type Output = T::Output; 1027 poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>1028 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { 1029 let me = self.project(); 1030 1031 me.local_set.with(|| { 1032 me.local_set 1033 .context 1034 .shared 1035 .waker 1036 .register_by_ref(cx.waker()); 1037 1038 let _no_blocking = crate::runtime::context::disallow_block_in_place(); 1039 let f = me.future; 1040 1041 if let Poll::Ready(output) = f.poll(cx) { 1042 return Poll::Ready(output); 1043 } 1044 1045 if me.local_set.tick() { 1046 // If `tick` returns `true`, we need to notify the local future again: 1047 // there are still tasks remaining in the run queue. 1048 cx.waker().wake_by_ref(); 1049 } 1050 1051 Poll::Pending 1052 }) 1053 } 1054 } 1055 1056 impl Shared { 1057 /// Schedule the provided task on the scheduler. schedule(&self, task: task::Notified<Arc<Self>>)1058 fn schedule(&self, task: task::Notified<Arc<Self>>) { 1059 CURRENT.with(|localdata| { 1060 match localdata.ctx.get() { 1061 // If the current `LocalSet` is being polled, we don't need to wake it. 1062 // When we `enter` it, then the value `wake_on_schedule` is set to be true. 1063 // In this case it is not being polled, so we need to wake it. 1064 Some(cx) if cx.shared.ptr_eq(self) && !localdata.wake_on_schedule.get() => unsafe { 1065 // Safety: if the current `LocalSet` context points to this 1066 // `LocalSet`, then we are on the thread that owns it. 1067 cx.shared.local_state.task_push_back(task); 1068 }, 1069 1070 // We are on the thread that owns the `LocalSet`, so we can 1071 // wake to the local queue. 1072 _ if context::thread_id().ok() == Some(self.local_state.owner) => { 1073 unsafe { 1074 // Safety: we just checked that the thread ID matches 1075 // the localset's owner, so this is safe. 1076 self.local_state.task_push_back(task); 1077 } 1078 // We still have to wake the `LocalSet`, because it isn't 1079 // currently being polled. 1080 self.waker.wake(); 1081 } 1082 1083 // We are *not* on the thread that owns the `LocalSet`, so we 1084 // have to wake to the remote queue. 1085 _ => { 1086 // First, check whether the queue is still there (if not, the 1087 // LocalSet is dropped). Then push to it if so, and if not, 1088 // do nothing. 1089 let mut lock = self.queue.lock(); 1090 1091 if let Some(queue) = lock.as_mut() { 1092 queue.push_back(task); 1093 drop(lock); 1094 self.waker.wake(); 1095 } 1096 } 1097 } 1098 }); 1099 } 1100 ptr_eq(&self, other: &Shared) -> bool1101 fn ptr_eq(&self, other: &Shared) -> bool { 1102 std::ptr::eq(self, other) 1103 } 1104 } 1105 1106 // This is safe because (and only because) we *pinky pwomise* to never touch the 1107 // local run queue except from the thread that owns the `LocalSet`. 1108 unsafe impl Sync for Shared {} 1109 1110 impl task::Schedule for Arc<Shared> { release(&self, task: &Task<Self>) -> Option<Task<Self>>1111 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { 1112 // Safety, this is always called from the thread that owns `LocalSet` 1113 unsafe { self.local_state.task_remove(task) } 1114 } 1115 schedule(&self, task: task::Notified<Self>)1116 fn schedule(&self, task: task::Notified<Self>) { 1117 Shared::schedule(self, task); 1118 } 1119 1120 // localset does not currently support task hooks hooks(&self) -> TaskHarnessScheduleHooks1121 fn hooks(&self) -> TaskHarnessScheduleHooks { 1122 TaskHarnessScheduleHooks { 1123 task_terminate_callback: None, 1124 } 1125 } 1126 1127 cfg_unstable! { 1128 fn unhandled_panic(&self) { 1129 use crate::runtime::UnhandledPanic; 1130 1131 match self.unhandled_panic { 1132 UnhandledPanic::Ignore => { 1133 // Do nothing 1134 } 1135 UnhandledPanic::ShutdownRuntime => { 1136 // This hook is only called from within the runtime, so 1137 // `CURRENT` should match with `&self`, i.e. there is no 1138 // opportunity for a nested scheduler to be called. 1139 CURRENT.with(|LocalData { ctx, .. }| match ctx.get() { 1140 Some(cx) if Arc::ptr_eq(self, &cx.shared) => { 1141 cx.unhandled_panic.set(true); 1142 // Safety: this is always called from the thread that owns `LocalSet` 1143 unsafe { cx.shared.local_state.close_and_shutdown_all(); } 1144 } 1145 _ => unreachable!("runtime core not set in CURRENT thread-local"), 1146 }) 1147 } 1148 } 1149 } 1150 } 1151 } 1152 1153 impl LocalState { task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>>1154 unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> { 1155 // The caller ensures it is called from the same thread that owns 1156 // the LocalSet. 1157 self.assert_called_from_owner_thread(); 1158 1159 self.local_queue.with_mut(|ptr| (*ptr).pop_front()) 1160 } 1161 task_push_back(&self, task: task::Notified<Arc<Shared>>)1162 unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) { 1163 // The caller ensures it is called from the same thread that owns 1164 // the LocalSet. 1165 self.assert_called_from_owner_thread(); 1166 1167 self.local_queue.with_mut(|ptr| (*ptr).push_back(task)); 1168 } 1169 take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>>1170 unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> { 1171 // The caller ensures it is called from the same thread that owns 1172 // the LocalSet. 1173 self.assert_called_from_owner_thread(); 1174 1175 self.local_queue.with_mut(|ptr| std::mem::take(&mut (*ptr))) 1176 } 1177 task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>>1178 unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> { 1179 // The caller ensures it is called from the same thread that owns 1180 // the LocalSet. 1181 self.assert_called_from_owner_thread(); 1182 1183 self.owned.remove(task) 1184 } 1185 1186 /// Returns true if the `LocalSet` does not have any spawned tasks owned_is_empty(&self) -> bool1187 unsafe fn owned_is_empty(&self) -> bool { 1188 // The caller ensures it is called from the same thread that owns 1189 // the LocalSet. 1190 self.assert_called_from_owner_thread(); 1191 1192 self.owned.is_empty() 1193 } 1194 assert_owner( &self, task: task::Notified<Arc<Shared>>, ) -> task::LocalNotified<Arc<Shared>>1195 unsafe fn assert_owner( 1196 &self, 1197 task: task::Notified<Arc<Shared>>, 1198 ) -> task::LocalNotified<Arc<Shared>> { 1199 // The caller ensures it is called from the same thread that owns 1200 // the LocalSet. 1201 self.assert_called_from_owner_thread(); 1202 1203 self.owned.assert_owner(task) 1204 } 1205 close_and_shutdown_all(&self)1206 unsafe fn close_and_shutdown_all(&self) { 1207 // The caller ensures it is called from the same thread that owns 1208 // the LocalSet. 1209 self.assert_called_from_owner_thread(); 1210 1211 self.owned.close_and_shutdown_all(); 1212 } 1213 1214 #[track_caller] assert_called_from_owner_thread(&self)1215 fn assert_called_from_owner_thread(&self) { 1216 // FreeBSD has some weirdness around thread-local destruction. 1217 // TODO: remove this hack when thread id is cleaned up 1218 #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))] 1219 debug_assert!( 1220 // if we couldn't get the thread ID because we're dropping the local 1221 // data, skip the assertion --- the `Drop` impl is not going to be 1222 // called from another thread, because `LocalSet` is `!Send` 1223 context::thread_id() 1224 .map(|id| id == self.owner) 1225 .unwrap_or(true), 1226 "`LocalSet`'s local run queue must not be accessed by another thread!" 1227 ); 1228 } 1229 } 1230 1231 // This is `Send` because it is stored in `Shared`. It is up to the caller to 1232 // ensure they are on the same thread that owns the `LocalSet`. 1233 unsafe impl Send for LocalState {} 1234 1235 #[cfg(all(test, not(loom)))] 1236 mod tests { 1237 use super::*; 1238 1239 // Does a `LocalSet` running on a current-thread runtime...basically work? 1240 // 1241 // This duplicates a test in `tests/task_local_set.rs`, but because this is 1242 // a lib test, it will run under Miri, so this is necessary to catch stacked 1243 // borrows violations in the `LocalSet` implementation. 1244 #[test] local_current_thread_scheduler()1245 fn local_current_thread_scheduler() { 1246 let f = async { 1247 LocalSet::new() 1248 .run_until(async { 1249 spawn_local(async {}).await.unwrap(); 1250 }) 1251 .await; 1252 }; 1253 crate::runtime::Builder::new_current_thread() 1254 .build() 1255 .expect("rt") 1256 .block_on(f) 1257 } 1258 1259 // Tests that when a task on a `LocalSet` is woken by an io driver on the 1260 // same thread, the task is woken to the localset's local queue rather than 1261 // its remote queue. 1262 // 1263 // This test has to be defined in the `local.rs` file as a lib test, rather 1264 // than in `tests/`, because it makes assertions about the local set's 1265 // internal state. 1266 #[test] wakes_to_local_queue()1267 fn wakes_to_local_queue() { 1268 use super::*; 1269 use crate::sync::Notify; 1270 let rt = crate::runtime::Builder::new_current_thread() 1271 .build() 1272 .expect("rt"); 1273 rt.block_on(async { 1274 let local = LocalSet::new(); 1275 let notify = Arc::new(Notify::new()); 1276 let task = local.spawn_local({ 1277 let notify = notify.clone(); 1278 async move { 1279 notify.notified().await; 1280 } 1281 }); 1282 let mut run_until = Box::pin(local.run_until(async move { 1283 task.await.unwrap(); 1284 })); 1285 1286 // poll the run until future once 1287 std::future::poll_fn(|cx| { 1288 let _ = run_until.as_mut().poll(cx); 1289 Poll::Ready(()) 1290 }) 1291 .await; 1292 1293 notify.notify_one(); 1294 let task = unsafe { local.context.shared.local_state.task_pop_front() }; 1295 // TODO(eliza): it would be nice to be able to assert that this is 1296 // the local task. 1297 assert!( 1298 task.is_some(), 1299 "task should have been notified to the LocalSet's local queue" 1300 ); 1301 }) 1302 } 1303 } 1304