1 use alloc::alloc::Layout as StdLayout; 2 use core::cell::UnsafeCell; 3 use core::future::Future; 4 use core::mem::{self, ManuallyDrop}; 5 use core::pin::Pin; 6 use core::ptr::NonNull; 7 use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; 8 9 #[cfg(not(feature = "portable-atomic"))] 10 use core::sync::atomic::AtomicUsize; 11 use core::sync::atomic::Ordering; 12 #[cfg(feature = "portable-atomic")] 13 use portable_atomic::AtomicUsize; 14 15 use crate::header::Header; 16 use crate::runnable::{Schedule, ScheduleInfo}; 17 use crate::state::*; 18 use crate::utils::{abort, abort_on_panic, max, Layout}; 19 use crate::Runnable; 20 21 #[cfg(feature = "std")] 22 pub(crate) type Panic = alloc::boxed::Box<dyn core::any::Any + Send + 'static>; 23 24 #[cfg(not(feature = "std"))] 25 pub(crate) type Panic = core::convert::Infallible; 26 27 /// The vtable for a task. 28 pub(crate) struct TaskVTable { 29 /// Schedules the task. 30 pub(crate) schedule: unsafe fn(*const (), ScheduleInfo), 31 32 /// Drops the future inside the task. 33 pub(crate) drop_future: unsafe fn(*const ()), 34 35 /// Returns a pointer to the output stored after completion. 36 pub(crate) get_output: unsafe fn(*const ()) -> *const (), 37 38 /// Drops the task reference (`Runnable` or `Waker`). 39 pub(crate) drop_ref: unsafe fn(ptr: *const ()), 40 41 /// Destroys the task. 42 pub(crate) destroy: unsafe fn(*const ()), 43 44 /// Runs the task. 45 pub(crate) run: unsafe fn(*const ()) -> bool, 46 47 /// Creates a new waker associated with the task. 48 pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker, 49 50 /// The memory layout of the task. This information enables 51 /// debuggers to decode raw task memory blobs. Do not remove 52 /// the field, even if it appears to be unused. 53 #[allow(unused)] 54 pub(crate) layout_info: &'static TaskLayout, 55 } 56 57 /// Memory layout of a task. 58 /// 59 /// This struct contains the following information: 60 /// 61 /// 1. How to allocate and deallocate the task. 62 /// 2. How to access the fields inside the task. 63 #[derive(Clone, Copy)] 64 pub(crate) struct TaskLayout { 65 /// Memory layout of the whole task. 66 pub(crate) layout: StdLayout, 67 68 /// Offset into the task at which the schedule function is stored. 69 pub(crate) offset_s: usize, 70 71 /// Offset into the task at which the future is stored. 72 pub(crate) offset_f: usize, 73 74 /// Offset into the task at which the output is stored. 75 pub(crate) offset_r: usize, 76 } 77 78 /// Raw pointers to the fields inside a task. 79 pub(crate) struct RawTask<F, T, S, M> { 80 /// The task header. 81 pub(crate) header: *const Header<M>, 82 83 /// The schedule function. 84 pub(crate) schedule: *const S, 85 86 /// The future. 87 pub(crate) future: *mut F, 88 89 /// The output of the future. 90 pub(crate) output: *mut Result<T, Panic>, 91 } 92 93 impl<F, T, S, M> Copy for RawTask<F, T, S, M> {} 94 95 impl<F, T, S, M> Clone for RawTask<F, T, S, M> { clone(&self) -> Self96 fn clone(&self) -> Self { 97 *self 98 } 99 } 100 101 impl<F, T, S, M> RawTask<F, T, S, M> { 102 const TASK_LAYOUT: TaskLayout = Self::eval_task_layout(); 103 104 /// Computes the memory layout for a task. 105 #[inline] eval_task_layout() -> TaskLayout106 const fn eval_task_layout() -> TaskLayout { 107 // Compute the layouts for `Header`, `S`, `F`, and `T`. 108 let layout_header = Layout::new::<Header<M>>(); 109 let layout_s = Layout::new::<S>(); 110 let layout_f = Layout::new::<F>(); 111 let layout_r = Layout::new::<Result<T, Panic>>(); 112 113 // Compute the layout for `union { F, T }`. 114 let size_union = max(layout_f.size(), layout_r.size()); 115 let align_union = max(layout_f.align(), layout_r.align()); 116 let layout_union = Layout::from_size_align(size_union, align_union); 117 118 // Compute the layout for `Header` followed `S` and `union { F, T }`. 119 let layout = layout_header; 120 let (layout, offset_s) = leap_unwrap!(layout.extend(layout_s)); 121 let (layout, offset_union) = leap_unwrap!(layout.extend(layout_union)); 122 let offset_f = offset_union; 123 let offset_r = offset_union; 124 125 TaskLayout { 126 layout: unsafe { layout.into_std() }, 127 offset_s, 128 offset_f, 129 offset_r, 130 } 131 } 132 } 133 134 impl<F, T, S, M> RawTask<F, T, S, M> 135 where 136 F: Future<Output = T>, 137 S: Schedule<M>, 138 { 139 const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( 140 Self::clone_waker, 141 Self::wake, 142 Self::wake_by_ref, 143 Self::drop_waker, 144 ); 145 146 /// Allocates a task with the given `future` and `schedule` function. 147 /// 148 /// It is assumed that initially only the `Runnable` and the `Task` exist. allocate<'a, Gen: FnOnce(&'a M) -> F>( future: Gen, schedule: S, builder: crate::Builder<M>, ) -> NonNull<()> where F: 'a, M: 'a,149 pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>( 150 future: Gen, 151 schedule: S, 152 builder: crate::Builder<M>, 153 ) -> NonNull<()> 154 where 155 F: 'a, 156 M: 'a, 157 { 158 // Compute the layout of the task for allocation. Abort if the computation fails. 159 // 160 // n.b. notgull: task_layout now automatically aborts instead of panicking 161 let task_layout = Self::task_layout(); 162 163 unsafe { 164 // Allocate enough space for the entire task. 165 let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) { 166 None => abort(), 167 Some(p) => p, 168 }; 169 170 let raw = Self::from_ptr(ptr.as_ptr()); 171 172 let crate::Builder { 173 metadata, 174 #[cfg(feature = "std")] 175 propagate_panic, 176 } = builder; 177 178 // Write the header as the first field of the task. 179 (raw.header as *mut Header<M>).write(Header { 180 state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), 181 awaiter: UnsafeCell::new(None), 182 vtable: &TaskVTable { 183 schedule: Self::schedule, 184 drop_future: Self::drop_future, 185 get_output: Self::get_output, 186 drop_ref: Self::drop_ref, 187 destroy: Self::destroy, 188 run: Self::run, 189 clone_waker: Self::clone_waker, 190 layout_info: &Self::TASK_LAYOUT, 191 }, 192 metadata, 193 #[cfg(feature = "std")] 194 propagate_panic, 195 }); 196 197 // Write the schedule function as the third field of the task. 198 (raw.schedule as *mut S).write(schedule); 199 200 // Generate the future, now that the metadata has been pinned in place. 201 let future = abort_on_panic(|| future(&(*raw.header).metadata)); 202 203 // Write the future as the fourth field of the task. 204 raw.future.write(future); 205 206 ptr 207 } 208 } 209 210 /// Creates a `RawTask` from a raw task pointer. 211 #[inline] from_ptr(ptr: *const ()) -> Self212 pub(crate) fn from_ptr(ptr: *const ()) -> Self { 213 let task_layout = Self::task_layout(); 214 let p = ptr as *const u8; 215 216 unsafe { 217 Self { 218 header: p as *const Header<M>, 219 schedule: p.add(task_layout.offset_s) as *const S, 220 future: p.add(task_layout.offset_f) as *mut F, 221 output: p.add(task_layout.offset_r) as *mut Result<T, Panic>, 222 } 223 } 224 } 225 226 /// Returns the layout of the task. 227 #[inline] task_layout() -> TaskLayout228 fn task_layout() -> TaskLayout { 229 Self::TASK_LAYOUT 230 } 231 /// Wakes a waker. wake(ptr: *const ())232 unsafe fn wake(ptr: *const ()) { 233 // This is just an optimization. If the schedule function has captured variables, then 234 // we'll do less reference counting if we wake the waker by reference and then drop it. 235 if mem::size_of::<S>() > 0 { 236 Self::wake_by_ref(ptr); 237 Self::drop_waker(ptr); 238 return; 239 } 240 241 let raw = Self::from_ptr(ptr); 242 243 let mut state = (*raw.header).state.load(Ordering::Acquire); 244 245 loop { 246 // If the task is completed or closed, it can't be woken up. 247 if state & (COMPLETED | CLOSED) != 0 { 248 // Drop the waker. 249 Self::drop_waker(ptr); 250 break; 251 } 252 253 // If the task is already scheduled, we just need to synchronize with the thread that 254 // will run the task by "publishing" our current view of the memory. 255 if state & SCHEDULED != 0 { 256 // Update the state without actually modifying it. 257 match (*raw.header).state.compare_exchange_weak( 258 state, 259 state, 260 Ordering::AcqRel, 261 Ordering::Acquire, 262 ) { 263 Ok(_) => { 264 // Drop the waker. 265 Self::drop_waker(ptr); 266 break; 267 } 268 Err(s) => state = s, 269 } 270 } else { 271 // Mark the task as scheduled. 272 match (*raw.header).state.compare_exchange_weak( 273 state, 274 state | SCHEDULED, 275 Ordering::AcqRel, 276 Ordering::Acquire, 277 ) { 278 Ok(_) => { 279 // If the task is not yet scheduled and isn't currently running, now is the 280 // time to schedule it. 281 if state & RUNNING == 0 { 282 // Schedule the task. 283 Self::schedule(ptr, ScheduleInfo::new(false)); 284 } else { 285 // Drop the waker. 286 Self::drop_waker(ptr); 287 } 288 289 break; 290 } 291 Err(s) => state = s, 292 } 293 } 294 } 295 } 296 297 /// Wakes a waker by reference. wake_by_ref(ptr: *const ())298 unsafe fn wake_by_ref(ptr: *const ()) { 299 let raw = Self::from_ptr(ptr); 300 301 let mut state = (*raw.header).state.load(Ordering::Acquire); 302 303 loop { 304 // If the task is completed or closed, it can't be woken up. 305 if state & (COMPLETED | CLOSED) != 0 { 306 break; 307 } 308 309 // If the task is already scheduled, we just need to synchronize with the thread that 310 // will run the task by "publishing" our current view of the memory. 311 if state & SCHEDULED != 0 { 312 // Update the state without actually modifying it. 313 match (*raw.header).state.compare_exchange_weak( 314 state, 315 state, 316 Ordering::AcqRel, 317 Ordering::Acquire, 318 ) { 319 Ok(_) => break, 320 Err(s) => state = s, 321 } 322 } else { 323 // If the task is not running, we can schedule right away. 324 let new = if state & RUNNING == 0 { 325 (state | SCHEDULED) + REFERENCE 326 } else { 327 state | SCHEDULED 328 }; 329 330 // Mark the task as scheduled. 331 match (*raw.header).state.compare_exchange_weak( 332 state, 333 new, 334 Ordering::AcqRel, 335 Ordering::Acquire, 336 ) { 337 Ok(_) => { 338 // If the task is not running, now is the time to schedule. 339 if state & RUNNING == 0 { 340 // If the reference count overflowed, abort. 341 if state > isize::MAX as usize { 342 abort(); 343 } 344 345 // Schedule the task. There is no need to call `Self::schedule(ptr)` 346 // because the schedule function cannot be destroyed while the waker is 347 // still alive. 348 let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); 349 (*raw.schedule).schedule(task, ScheduleInfo::new(false)); 350 } 351 352 break; 353 } 354 Err(s) => state = s, 355 } 356 } 357 } 358 } 359 360 /// Clones a waker. clone_waker(ptr: *const ()) -> RawWaker361 unsafe fn clone_waker(ptr: *const ()) -> RawWaker { 362 let raw = Self::from_ptr(ptr); 363 364 // Increment the reference count. With any kind of reference-counted data structure, 365 // relaxed ordering is appropriate when incrementing the counter. 366 let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); 367 368 // If the reference count overflowed, abort. 369 if state > isize::MAX as usize { 370 abort(); 371 } 372 373 RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) 374 } 375 376 /// Drops a waker. 377 /// 378 /// This function will decrement the reference count. If it drops down to zero, the associated 379 /// `Task` has been dropped too, and the task has not been completed, then it will get 380 /// scheduled one more time so that its future gets dropped by the executor. 381 #[inline] drop_waker(ptr: *const ())382 unsafe fn drop_waker(ptr: *const ()) { 383 let raw = Self::from_ptr(ptr); 384 385 // Decrement the reference count. 386 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; 387 388 // If this was the last reference to the task and the `Task` has been dropped too, 389 // then we need to decide how to destroy the task. 390 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { 391 if new & (COMPLETED | CLOSED) == 0 { 392 // If the task was not completed nor closed, close it and schedule one more time so 393 // that its future gets dropped by the executor. 394 (*raw.header) 395 .state 396 .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); 397 Self::schedule(ptr, ScheduleInfo::new(false)); 398 } else { 399 // Otherwise, destroy the task right away. 400 Self::destroy(ptr); 401 } 402 } 403 } 404 405 /// Drops a task reference (`Runnable` or `Waker`). 406 /// 407 /// This function will decrement the reference count. If it drops down to zero and the 408 /// associated `Task` handle has been dropped too, then the task gets destroyed. 409 #[inline] drop_ref(ptr: *const ())410 unsafe fn drop_ref(ptr: *const ()) { 411 let raw = Self::from_ptr(ptr); 412 413 // Decrement the reference count. 414 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; 415 416 // If this was the last reference to the task and the `Task` has been dropped too, 417 // then destroy the task. 418 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { 419 Self::destroy(ptr); 420 } 421 } 422 423 /// Schedules a task for running. 424 /// 425 /// This function doesn't modify the state of the task. It only passes the task reference to 426 /// its schedule function. schedule(ptr: *const (), info: ScheduleInfo)427 unsafe fn schedule(ptr: *const (), info: ScheduleInfo) { 428 let raw = Self::from_ptr(ptr); 429 430 // If the schedule function has captured variables, create a temporary waker that prevents 431 // the task from getting deallocated while the function is being invoked. 432 let _waker; 433 if mem::size_of::<S>() > 0 { 434 _waker = Waker::from_raw(Self::clone_waker(ptr)); 435 } 436 437 let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); 438 (*raw.schedule).schedule(task, info); 439 } 440 441 /// Drops the future inside a task. 442 #[inline] drop_future(ptr: *const ())443 unsafe fn drop_future(ptr: *const ()) { 444 let raw = Self::from_ptr(ptr); 445 446 // We need a safeguard against panics because the destructor can panic. 447 abort_on_panic(|| { 448 raw.future.drop_in_place(); 449 }) 450 } 451 452 /// Returns a pointer to the output inside a task. get_output(ptr: *const ()) -> *const ()453 unsafe fn get_output(ptr: *const ()) -> *const () { 454 let raw = Self::from_ptr(ptr); 455 raw.output as *const () 456 } 457 458 /// Cleans up task's resources and deallocates it. 459 /// 460 /// The schedule function will be dropped, and the task will then get deallocated. 461 /// The task must be closed before this function is called. 462 #[inline] destroy(ptr: *const ())463 unsafe fn destroy(ptr: *const ()) { 464 let raw = Self::from_ptr(ptr); 465 let task_layout = Self::task_layout(); 466 467 // We need a safeguard against panics because destructors can panic. 468 abort_on_panic(|| { 469 // Drop the header along with the metadata. 470 (raw.header as *mut Header<M>).drop_in_place(); 471 472 // Drop the schedule function. 473 (raw.schedule as *mut S).drop_in_place(); 474 }); 475 476 // Finally, deallocate the memory reserved by the task. 477 alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); 478 } 479 480 /// Runs a task. 481 /// 482 /// If polling its future panics, the task will be closed and the panic will be propagated into 483 /// the caller. run(ptr: *const ()) -> bool484 unsafe fn run(ptr: *const ()) -> bool { 485 let raw = Self::from_ptr(ptr); 486 487 // Create a context from the raw task pointer and the vtable inside the its header. 488 let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE))); 489 let cx = &mut Context::from_waker(&waker); 490 491 let mut state = (*raw.header).state.load(Ordering::Acquire); 492 493 // Update the task's state before polling its future. 494 loop { 495 // If the task has already been closed, drop the task reference and return. 496 if state & CLOSED != 0 { 497 // Drop the future. 498 Self::drop_future(ptr); 499 500 // Mark the task as unscheduled. 501 let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); 502 503 // Take the awaiter out. 504 let mut awaiter = None; 505 if state & AWAITER != 0 { 506 awaiter = (*raw.header).take(None); 507 } 508 509 // Drop the task reference. 510 Self::drop_ref(ptr); 511 512 // Notify the awaiter that the future has been dropped. 513 if let Some(w) = awaiter { 514 abort_on_panic(|| w.wake()); 515 } 516 return false; 517 } 518 519 // Mark the task as unscheduled and running. 520 match (*raw.header).state.compare_exchange_weak( 521 state, 522 (state & !SCHEDULED) | RUNNING, 523 Ordering::AcqRel, 524 Ordering::Acquire, 525 ) { 526 Ok(_) => { 527 // Update the state because we're continuing with polling the future. 528 state = (state & !SCHEDULED) | RUNNING; 529 break; 530 } 531 Err(s) => state = s, 532 } 533 } 534 535 // Poll the inner future, but surround it with a guard that closes the task in case polling 536 // panics. 537 // If available, we should also try to catch the panic so that it is propagated correctly. 538 let guard = Guard(raw); 539 540 // Panic propagation is not available for no_std. 541 #[cfg(not(feature = "std"))] 542 let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok); 543 544 #[cfg(feature = "std")] 545 let poll = { 546 // Check if we should propagate panics. 547 if (*raw.header).propagate_panic { 548 // Use catch_unwind to catch the panic. 549 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { 550 <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx) 551 })) { 552 Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)), 553 Ok(Poll::Pending) => Poll::Pending, 554 Err(e) => Poll::Ready(Err(e)), 555 } 556 } else { 557 <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok) 558 } 559 }; 560 561 mem::forget(guard); 562 563 match poll { 564 Poll::Ready(out) => { 565 // Replace the future with its output. 566 Self::drop_future(ptr); 567 raw.output.write(out); 568 569 // The task is now completed. 570 loop { 571 // If the `Task` is dropped, we'll need to close it and drop the output. 572 let new = if state & TASK == 0 { 573 (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED 574 } else { 575 (state & !RUNNING & !SCHEDULED) | COMPLETED 576 }; 577 578 // Mark the task as not running and completed. 579 match (*raw.header).state.compare_exchange_weak( 580 state, 581 new, 582 Ordering::AcqRel, 583 Ordering::Acquire, 584 ) { 585 Ok(_) => { 586 // If the `Task` is dropped or if the task was closed while running, 587 // now it's time to drop the output. 588 if state & TASK == 0 || state & CLOSED != 0 { 589 // Drop the output. 590 abort_on_panic(|| raw.output.drop_in_place()); 591 } 592 593 // Take the awaiter out. 594 let mut awaiter = None; 595 if state & AWAITER != 0 { 596 awaiter = (*raw.header).take(None); 597 } 598 599 // Drop the task reference. 600 Self::drop_ref(ptr); 601 602 // Notify the awaiter that the future has been dropped. 603 if let Some(w) = awaiter { 604 abort_on_panic(|| w.wake()); 605 } 606 break; 607 } 608 Err(s) => state = s, 609 } 610 } 611 } 612 Poll::Pending => { 613 let mut future_dropped = false; 614 615 // The task is still not completed. 616 loop { 617 // If the task was closed while running, we'll need to unschedule in case it 618 // was woken up and then destroy it. 619 let new = if state & CLOSED != 0 { 620 state & !RUNNING & !SCHEDULED 621 } else { 622 state & !RUNNING 623 }; 624 625 if state & CLOSED != 0 && !future_dropped { 626 // The thread that closed the task didn't drop the future because it was 627 // running so now it's our responsibility to do so. 628 Self::drop_future(ptr); 629 future_dropped = true; 630 } 631 632 // Mark the task as not running. 633 match (*raw.header).state.compare_exchange_weak( 634 state, 635 new, 636 Ordering::AcqRel, 637 Ordering::Acquire, 638 ) { 639 Ok(state) => { 640 // If the task was closed while running, we need to notify the awaiter. 641 // If the task was woken up while running, we need to schedule it. 642 // Otherwise, we just drop the task reference. 643 if state & CLOSED != 0 { 644 // Take the awaiter out. 645 let mut awaiter = None; 646 if state & AWAITER != 0 { 647 awaiter = (*raw.header).take(None); 648 } 649 650 // Drop the task reference. 651 Self::drop_ref(ptr); 652 653 // Notify the awaiter that the future has been dropped. 654 if let Some(w) = awaiter { 655 abort_on_panic(|| w.wake()); 656 } 657 } else if state & SCHEDULED != 0 { 658 // The thread that woke the task up didn't reschedule it because 659 // it was running so now it's our responsibility to do so. 660 Self::schedule(ptr, ScheduleInfo::new(true)); 661 return true; 662 } else { 663 // Drop the task reference. 664 Self::drop_ref(ptr); 665 } 666 break; 667 } 668 Err(s) => state = s, 669 } 670 } 671 } 672 } 673 674 return false; 675 676 /// A guard that closes the task if polling its future panics. 677 struct Guard<F, T, S, M>(RawTask<F, T, S, M>) 678 where 679 F: Future<Output = T>, 680 S: Schedule<M>; 681 682 impl<F, T, S, M> Drop for Guard<F, T, S, M> 683 where 684 F: Future<Output = T>, 685 S: Schedule<M>, 686 { 687 fn drop(&mut self) { 688 let raw = self.0; 689 let ptr = raw.header as *const (); 690 691 unsafe { 692 let mut state = (*raw.header).state.load(Ordering::Acquire); 693 694 loop { 695 // If the task was closed while running, then unschedule it, drop its 696 // future, and drop the task reference. 697 if state & CLOSED != 0 { 698 // The thread that closed the task didn't drop the future because it 699 // was running so now it's our responsibility to do so. 700 RawTask::<F, T, S, M>::drop_future(ptr); 701 702 // Mark the task as not running and not scheduled. 703 (*raw.header) 704 .state 705 .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); 706 707 // Take the awaiter out. 708 let mut awaiter = None; 709 if state & AWAITER != 0 { 710 awaiter = (*raw.header).take(None); 711 } 712 713 // Drop the task reference. 714 RawTask::<F, T, S, M>::drop_ref(ptr); 715 716 // Notify the awaiter that the future has been dropped. 717 if let Some(w) = awaiter { 718 abort_on_panic(|| w.wake()); 719 } 720 break; 721 } 722 723 // Mark the task as not running, not scheduled, and closed. 724 match (*raw.header).state.compare_exchange_weak( 725 state, 726 (state & !RUNNING & !SCHEDULED) | CLOSED, 727 Ordering::AcqRel, 728 Ordering::Acquire, 729 ) { 730 Ok(state) => { 731 // Drop the future because the task is now closed. 732 RawTask::<F, T, S, M>::drop_future(ptr); 733 734 // Take the awaiter out. 735 let mut awaiter = None; 736 if state & AWAITER != 0 { 737 awaiter = (*raw.header).take(None); 738 } 739 740 // Drop the task reference. 741 RawTask::<F, T, S, M>::drop_ref(ptr); 742 743 // Notify the awaiter that the future has been dropped. 744 if let Some(w) = awaiter { 745 abort_on_panic(|| w.wake()); 746 } 747 break; 748 } 749 Err(s) => state = s, 750 } 751 } 752 } 753 } 754 } 755 } 756 } 757