1 use crate::io::interest::Interest; 2 use crate::io::ready::Ready; 3 use crate::loom::sync::atomic::AtomicUsize; 4 use crate::loom::sync::Mutex; 5 use crate::runtime::io::{Direction, ReadyEvent, Tick}; 6 use crate::util::bit; 7 use crate::util::linked_list::{self, LinkedList}; 8 use crate::util::WakeList; 9 10 use std::cell::UnsafeCell; 11 use std::future::Future; 12 use std::marker::PhantomPinned; 13 use std::pin::Pin; 14 use std::ptr::NonNull; 15 use std::sync::atomic::Ordering::{AcqRel, Acquire}; 16 use std::task::{Context, Poll, Waker}; 17 18 /// Stored in the I/O driver resource slab. 19 #[derive(Debug)] 20 // # This struct should be cache padded to avoid false sharing. The cache padding rules are copied 21 // from crossbeam-utils/src/cache_padded.rs 22 // 23 // Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache 24 // lines at a time, so we have to align to 128 bytes rather than 64. 25 // 26 // Sources: 27 // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf 28 // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 29 // 30 // ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. 31 // 32 // Sources: 33 // - https://www.mono-project.com/news/2016/09/12/arm64-icache/ 34 // 35 // powerpc64 has 128-byte cache line size. 36 // 37 // Sources: 38 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 39 #[cfg_attr( 40 any( 41 target_arch = "x86_64", 42 target_arch = "aarch64", 43 target_arch = "powerpc64", 44 ), 45 repr(align(128)) 46 )] 47 // arm, mips, mips64, sparc, and hexagon have 32-byte cache line size. 48 // 49 // Sources: 50 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 51 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 52 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 53 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 54 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 55 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 56 #[cfg_attr( 57 any( 58 target_arch = "arm", 59 target_arch = "mips", 60 target_arch = "mips64", 61 target_arch = "sparc", 62 target_arch = "hexagon", 63 ), 64 repr(align(32)) 65 )] 66 // m68k has 16-byte cache line size. 67 // 68 // Sources: 69 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9 70 #[cfg_attr(target_arch = "m68k", repr(align(16)))] 71 // s390x has 256-byte cache line size. 72 // 73 // Sources: 74 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 75 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 76 #[cfg_attr(target_arch = "s390x", repr(align(256)))] 77 // x86, riscv, wasm, and sparc64 have 64-byte cache line size. 78 // 79 // Sources: 80 // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 81 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 82 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 83 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10 84 // 85 // All others are assumed to have 64-byte cache line size. 86 #[cfg_attr( 87 not(any( 88 target_arch = "x86_64", 89 target_arch = "aarch64", 90 target_arch = "powerpc64", 91 target_arch = "arm", 92 target_arch = "mips", 93 target_arch = "mips64", 94 target_arch = "sparc", 95 target_arch = "hexagon", 96 target_arch = "m68k", 97 target_arch = "s390x", 98 )), 99 repr(align(64)) 100 )] 101 pub(crate) struct ScheduledIo { 102 pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>, 103 104 /// Packs the resource's readiness and I/O driver latest tick. 105 readiness: AtomicUsize, 106 107 waiters: Mutex<Waiters>, 108 } 109 110 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; 111 112 #[derive(Debug, Default)] 113 struct Waiters { 114 /// List of all current waiters. 115 list: WaitList, 116 117 /// Waker used for `AsyncRead`. 118 reader: Option<Waker>, 119 120 /// Waker used for `AsyncWrite`. 121 writer: Option<Waker>, 122 } 123 124 #[derive(Debug)] 125 struct Waiter { 126 pointers: linked_list::Pointers<Waiter>, 127 128 /// The waker for this task. 129 waker: Option<Waker>, 130 131 /// The interest this waiter is waiting on. 132 interest: Interest, 133 134 is_ready: bool, 135 136 /// Should never be `!Unpin`. 137 _p: PhantomPinned, 138 } 139 140 generate_addr_of_methods! { 141 impl<> Waiter { 142 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { 143 &self.pointers 144 } 145 } 146 } 147 148 /// Future returned by `readiness()`. 149 struct Readiness<'a> { 150 scheduled_io: &'a ScheduledIo, 151 152 state: State, 153 154 /// Entry in the waiter `LinkedList`. 155 waiter: UnsafeCell<Waiter>, 156 } 157 158 enum State { 159 Init, 160 Waiting, 161 Done, 162 } 163 164 // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. 165 // 166 // | shutdown | driver tick | readiness | 167 // |----------+-------------+-----------| 168 // | 1 bit | 15 bits + 16 bits | 169 170 const READINESS: bit::Pack = bit::Pack::least_significant(16); 171 172 const TICK: bit::Pack = READINESS.then(15); 173 174 const SHUTDOWN: bit::Pack = TICK.then(1); 175 176 // ===== impl ScheduledIo ===== 177 178 impl Default for ScheduledIo { default() -> ScheduledIo179 fn default() -> ScheduledIo { 180 ScheduledIo { 181 linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()), 182 readiness: AtomicUsize::new(0), 183 waiters: Mutex::new(Waiters::default()), 184 } 185 } 186 } 187 188 impl ScheduledIo { token(&self) -> mio::Token189 pub(crate) fn token(&self) -> mio::Token { 190 mio::Token(super::EXPOSE_IO.expose_provenance(self)) 191 } 192 193 /// Invoked when the IO driver is shut down; forces this `ScheduledIo` into a 194 /// permanently shutdown state. shutdown(&self)195 pub(super) fn shutdown(&self) { 196 let mask = SHUTDOWN.pack(1, 0); 197 self.readiness.fetch_or(mask, AcqRel); 198 self.wake(Ready::ALL); 199 } 200 201 /// Sets the readiness on this `ScheduledIo` by invoking the given closure on 202 /// the current value, returning the previous readiness value. 203 /// 204 /// # Arguments 205 /// - `tick`: whether setting the tick or trying to clear readiness for a 206 /// specific tick. 207 /// - `f`: a closure returning a new readiness value given the previous 208 /// readiness. set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready)209 pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) { 210 let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| { 211 // If the io driver is shut down, then you are only allowed to clear readiness. 212 debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_))); 213 214 const MAX_TICK: usize = TICK.max_value() + 1; 215 let tick = TICK.unpack(curr); 216 217 let new_tick = match tick_op { 218 // Trying to clear readiness with an old event! 219 Tick::Clear(t) if tick as u8 != t => return None, 220 Tick::Clear(t) => t as usize, 221 Tick::Set => tick.wrapping_add(1) % MAX_TICK, 222 }; 223 let ready = Ready::from_usize(READINESS.unpack(curr)); 224 Some(TICK.pack(new_tick, f(ready).as_usize())) 225 }); 226 } 227 228 /// Notifies all pending waiters that have registered interest in `ready`. 229 /// 230 /// There may be many waiters to notify. Waking the pending task **must** be 231 /// done from outside of the lock otherwise there is a potential for a 232 /// deadlock. 233 /// 234 /// A stack array of wakers is created and filled with wakers to notify, the 235 /// lock is released, and the wakers are notified. Because there may be more 236 /// than 32 wakers to notify, if the stack array fills up, the lock is 237 /// released, the array is cleared, and the iteration continues. wake(&self, ready: Ready)238 pub(super) fn wake(&self, ready: Ready) { 239 let mut wakers = WakeList::new(); 240 241 let mut waiters = self.waiters.lock(); 242 243 // check for AsyncRead slot 244 if ready.is_readable() { 245 if let Some(waker) = waiters.reader.take() { 246 wakers.push(waker); 247 } 248 } 249 250 // check for AsyncWrite slot 251 if ready.is_writable() { 252 if let Some(waker) = waiters.writer.take() { 253 wakers.push(waker); 254 } 255 } 256 257 'outer: loop { 258 let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); 259 260 while wakers.can_push() { 261 match iter.next() { 262 Some(waiter) => { 263 let waiter = unsafe { &mut *waiter.as_ptr() }; 264 265 if let Some(waker) = waiter.waker.take() { 266 waiter.is_ready = true; 267 wakers.push(waker); 268 } 269 } 270 None => { 271 break 'outer; 272 } 273 } 274 } 275 276 drop(waiters); 277 278 wakers.wake_all(); 279 280 // Acquire the lock again. 281 waiters = self.waiters.lock(); 282 } 283 284 // Release the lock before notifying 285 drop(waiters); 286 287 wakers.wake_all(); 288 } 289 ready_event(&self, interest: Interest) -> ReadyEvent290 pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { 291 let curr = self.readiness.load(Acquire); 292 293 ReadyEvent { 294 tick: TICK.unpack(curr) as u8, 295 ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), 296 is_shutdown: SHUTDOWN.unpack(curr) != 0, 297 } 298 } 299 300 /// Polls for readiness events in a given direction. 301 /// 302 /// These are to support `AsyncRead` and `AsyncWrite` polling methods, 303 /// which cannot use the `async fn` version. This uses reserved reader 304 /// and writer slots. poll_readiness( &self, cx: &mut Context<'_>, direction: Direction, ) -> Poll<ReadyEvent>305 pub(super) fn poll_readiness( 306 &self, 307 cx: &mut Context<'_>, 308 direction: Direction, 309 ) -> Poll<ReadyEvent> { 310 let curr = self.readiness.load(Acquire); 311 312 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); 313 let is_shutdown = SHUTDOWN.unpack(curr) != 0; 314 315 if ready.is_empty() && !is_shutdown { 316 // Update the task info 317 let mut waiters = self.waiters.lock(); 318 let waker = match direction { 319 Direction::Read => &mut waiters.reader, 320 Direction::Write => &mut waiters.writer, 321 }; 322 323 // Avoid cloning the waker if one is already stored that matches the 324 // current task. 325 match waker { 326 Some(waker) => waker.clone_from(cx.waker()), 327 None => *waker = Some(cx.waker().clone()), 328 } 329 330 // Try again, in case the readiness was changed while we were 331 // taking the waiters lock 332 let curr = self.readiness.load(Acquire); 333 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); 334 let is_shutdown = SHUTDOWN.unpack(curr) != 0; 335 if is_shutdown { 336 Poll::Ready(ReadyEvent { 337 tick: TICK.unpack(curr) as u8, 338 ready: direction.mask(), 339 is_shutdown, 340 }) 341 } else if ready.is_empty() { 342 Poll::Pending 343 } else { 344 Poll::Ready(ReadyEvent { 345 tick: TICK.unpack(curr) as u8, 346 ready, 347 is_shutdown, 348 }) 349 } 350 } else { 351 Poll::Ready(ReadyEvent { 352 tick: TICK.unpack(curr) as u8, 353 ready, 354 is_shutdown, 355 }) 356 } 357 } 358 clear_readiness(&self, event: ReadyEvent)359 pub(crate) fn clear_readiness(&self, event: ReadyEvent) { 360 // This consumes the current readiness state **except** for closed 361 // states. Closed states are excluded because they are final states. 362 let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; 363 self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed); 364 } 365 clear_wakers(&self)366 pub(crate) fn clear_wakers(&self) { 367 let mut waiters = self.waiters.lock(); 368 waiters.reader.take(); 369 waiters.writer.take(); 370 } 371 } 372 373 impl Drop for ScheduledIo { drop(&mut self)374 fn drop(&mut self) { 375 self.wake(Ready::ALL); 376 } 377 } 378 379 unsafe impl Send for ScheduledIo {} 380 unsafe impl Sync for ScheduledIo {} 381 382 impl ScheduledIo { 383 /// An async version of `poll_readiness` which uses a linked list of wakers. readiness(&self, interest: Interest) -> ReadyEvent384 pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { 385 self.readiness_fut(interest).await 386 } 387 388 // This is in a separate function so that the borrow checker doesn't think 389 // we are borrowing the `UnsafeCell` possibly over await boundaries. 390 // 391 // Go figure. readiness_fut(&self, interest: Interest) -> Readiness<'_>392 fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { 393 Readiness { 394 scheduled_io: self, 395 state: State::Init, 396 waiter: UnsafeCell::new(Waiter { 397 pointers: linked_list::Pointers::new(), 398 waker: None, 399 is_ready: false, 400 interest, 401 _p: PhantomPinned, 402 }), 403 } 404 } 405 } 406 407 unsafe impl linked_list::Link for Waiter { 408 type Handle = NonNull<Waiter>; 409 type Target = Waiter; 410 as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>411 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { 412 *handle 413 } 414 from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>415 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { 416 ptr 417 } 418 pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>419 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { 420 Waiter::addr_of_pointers(target) 421 } 422 } 423 424 // ===== impl Readiness ===== 425 426 impl Future for Readiness<'_> { 427 type Output = ReadyEvent; 428 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>429 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 430 use std::sync::atomic::Ordering::SeqCst; 431 432 let (scheduled_io, state, waiter) = unsafe { 433 let me = self.get_unchecked_mut(); 434 (&me.scheduled_io, &mut me.state, &me.waiter) 435 }; 436 437 loop { 438 match *state { 439 State::Init => { 440 // Optimistically check existing readiness 441 let curr = scheduled_io.readiness.load(SeqCst); 442 let is_shutdown = SHUTDOWN.unpack(curr) != 0; 443 444 // Safety: `waiter.interest` never changes 445 let interest = unsafe { (*waiter.get()).interest }; 446 let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest); 447 448 if !ready.is_empty() || is_shutdown { 449 // Currently ready! 450 let tick = TICK.unpack(curr) as u8; 451 *state = State::Done; 452 return Poll::Ready(ReadyEvent { 453 tick, 454 ready, 455 is_shutdown, 456 }); 457 } 458 459 // Wasn't ready, take the lock (and check again while locked). 460 let mut waiters = scheduled_io.waiters.lock(); 461 462 let curr = scheduled_io.readiness.load(SeqCst); 463 let mut ready = Ready::from_usize(READINESS.unpack(curr)); 464 let is_shutdown = SHUTDOWN.unpack(curr) != 0; 465 466 if is_shutdown { 467 ready = Ready::ALL; 468 } 469 470 let ready = ready.intersection(interest); 471 472 if !ready.is_empty() || is_shutdown { 473 // Currently ready! 474 let tick = TICK.unpack(curr) as u8; 475 *state = State::Done; 476 return Poll::Ready(ReadyEvent { 477 tick, 478 ready, 479 is_shutdown, 480 }); 481 } 482 483 // Not ready even after locked, insert into list... 484 485 // Safety: called while locked 486 unsafe { 487 (*waiter.get()).waker = Some(cx.waker().clone()); 488 } 489 490 // Insert the waiter into the linked list 491 // 492 // safety: pointers from `UnsafeCell` are never null. 493 waiters 494 .list 495 .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); 496 *state = State::Waiting; 497 } 498 State::Waiting => { 499 // Currently in the "Waiting" state, implying the caller has 500 // a waiter stored in the waiter list (guarded by 501 // `notify.waiters`). In order to access the waker fields, 502 // we must hold the lock. 503 504 let waiters = scheduled_io.waiters.lock(); 505 506 // Safety: called while locked 507 let w = unsafe { &mut *waiter.get() }; 508 509 if w.is_ready { 510 // Our waker has been notified. 511 *state = State::Done; 512 } else { 513 // Update the waker, if necessary. 514 w.waker.as_mut().unwrap().clone_from(cx.waker()); 515 return Poll::Pending; 516 } 517 518 // Explicit drop of the lock to indicate the scope that the 519 // lock is held. Because holding the lock is required to 520 // ensure safe access to fields not held within the lock, it 521 // is helpful to visualize the scope of the critical 522 // section. 523 drop(waiters); 524 } 525 State::Done => { 526 // Safety: State::Done means it is no longer shared 527 let w = unsafe { &mut *waiter.get() }; 528 529 let curr = scheduled_io.readiness.load(Acquire); 530 let is_shutdown = SHUTDOWN.unpack(curr) != 0; 531 532 // The returned tick might be newer than the event 533 // which notified our waker. This is ok because the future 534 // still didn't return `Poll::Ready`. 535 let tick = TICK.unpack(curr) as u8; 536 537 // The readiness state could have been cleared in the meantime, 538 // but we allow the returned ready set to be empty. 539 let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(w.interest); 540 541 return Poll::Ready(ReadyEvent { 542 tick, 543 ready, 544 is_shutdown, 545 }); 546 } 547 } 548 } 549 } 550 } 551 552 impl Drop for Readiness<'_> { drop(&mut self)553 fn drop(&mut self) { 554 let mut waiters = self.scheduled_io.waiters.lock(); 555 556 // Safety: `waiter` is only ever stored in `waiters` 557 unsafe { 558 waiters 559 .list 560 .remove(NonNull::new_unchecked(self.waiter.get())) 561 }; 562 } 563 } 564 565 unsafe impl Send for Readiness<'_> {} 566 unsafe impl Sync for Readiness<'_> {} 567