1 use crate::io::interest::Interest;
2 use crate::io::ready::Ready;
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Mutex;
5 use crate::runtime::io::{Direction, ReadyEvent, Tick};
6 use crate::util::bit;
7 use crate::util::linked_list::{self, LinkedList};
8 use crate::util::WakeList;
9 
10 use std::cell::UnsafeCell;
11 use std::future::Future;
12 use std::marker::PhantomPinned;
13 use std::pin::Pin;
14 use std::ptr::NonNull;
15 use std::sync::atomic::Ordering::{AcqRel, Acquire};
16 use std::task::{Context, Poll, Waker};
17 
18 /// Stored in the I/O driver resource slab.
19 #[derive(Debug)]
20 // # This struct should be cache padded to avoid false sharing. The cache padding rules are copied
21 // from crossbeam-utils/src/cache_padded.rs
22 //
23 // Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
24 // lines at a time, so we have to align to 128 bytes rather than 64.
25 //
26 // Sources:
27 // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
28 // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
29 //
30 // ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
31 //
32 // Sources:
33 // - https://www.mono-project.com/news/2016/09/12/arm64-icache/
34 //
35 // powerpc64 has 128-byte cache line size.
36 //
37 // Sources:
38 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
39 #[cfg_attr(
40     any(
41         target_arch = "x86_64",
42         target_arch = "aarch64",
43         target_arch = "powerpc64",
44     ),
45     repr(align(128))
46 )]
47 // arm, mips, mips64, sparc, and hexagon have 32-byte cache line size.
48 //
49 // Sources:
50 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
51 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
52 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
53 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
54 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
55 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
56 #[cfg_attr(
57     any(
58         target_arch = "arm",
59         target_arch = "mips",
60         target_arch = "mips64",
61         target_arch = "sparc",
62         target_arch = "hexagon",
63     ),
64     repr(align(32))
65 )]
66 // m68k has 16-byte cache line size.
67 //
68 // Sources:
69 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
70 #[cfg_attr(target_arch = "m68k", repr(align(16)))]
71 // s390x has 256-byte cache line size.
72 //
73 // Sources:
74 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
75 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
76 #[cfg_attr(target_arch = "s390x", repr(align(256)))]
77 // x86, riscv, wasm, and sparc64 have 64-byte cache line size.
78 //
79 // Sources:
80 // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
81 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
82 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
83 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10
84 //
85 // All others are assumed to have 64-byte cache line size.
86 #[cfg_attr(
87     not(any(
88         target_arch = "x86_64",
89         target_arch = "aarch64",
90         target_arch = "powerpc64",
91         target_arch = "arm",
92         target_arch = "mips",
93         target_arch = "mips64",
94         target_arch = "sparc",
95         target_arch = "hexagon",
96         target_arch = "m68k",
97         target_arch = "s390x",
98     )),
99     repr(align(64))
100 )]
101 pub(crate) struct ScheduledIo {
102     pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
103 
104     /// Packs the resource's readiness and I/O driver latest tick.
105     readiness: AtomicUsize,
106 
107     waiters: Mutex<Waiters>,
108 }
109 
110 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
111 
112 #[derive(Debug, Default)]
113 struct Waiters {
114     /// List of all current waiters.
115     list: WaitList,
116 
117     /// Waker used for `AsyncRead`.
118     reader: Option<Waker>,
119 
120     /// Waker used for `AsyncWrite`.
121     writer: Option<Waker>,
122 }
123 
124 #[derive(Debug)]
125 struct Waiter {
126     pointers: linked_list::Pointers<Waiter>,
127 
128     /// The waker for this task.
129     waker: Option<Waker>,
130 
131     /// The interest this waiter is waiting on.
132     interest: Interest,
133 
134     is_ready: bool,
135 
136     /// Should never be `!Unpin`.
137     _p: PhantomPinned,
138 }
139 
140 generate_addr_of_methods! {
141     impl<> Waiter {
142         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
143             &self.pointers
144         }
145     }
146 }
147 
148 /// Future returned by `readiness()`.
149 struct Readiness<'a> {
150     scheduled_io: &'a ScheduledIo,
151 
152     state: State,
153 
154     /// Entry in the waiter `LinkedList`.
155     waiter: UnsafeCell<Waiter>,
156 }
157 
158 enum State {
159     Init,
160     Waiting,
161     Done,
162 }
163 
164 // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
165 //
166 // | shutdown | driver tick | readiness |
167 // |----------+-------------+-----------|
168 // |   1 bit  |  15 bits    +   16 bits |
169 
170 const READINESS: bit::Pack = bit::Pack::least_significant(16);
171 
172 const TICK: bit::Pack = READINESS.then(15);
173 
174 const SHUTDOWN: bit::Pack = TICK.then(1);
175 
176 // ===== impl ScheduledIo =====
177 
178 impl Default for ScheduledIo {
default() -> ScheduledIo179     fn default() -> ScheduledIo {
180         ScheduledIo {
181             linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()),
182             readiness: AtomicUsize::new(0),
183             waiters: Mutex::new(Waiters::default()),
184         }
185     }
186 }
187 
188 impl ScheduledIo {
token(&self) -> mio::Token189     pub(crate) fn token(&self) -> mio::Token {
190         mio::Token(super::EXPOSE_IO.expose_provenance(self))
191     }
192 
193     /// Invoked when the IO driver is shut down; forces this `ScheduledIo` into a
194     /// permanently shutdown state.
shutdown(&self)195     pub(super) fn shutdown(&self) {
196         let mask = SHUTDOWN.pack(1, 0);
197         self.readiness.fetch_or(mask, AcqRel);
198         self.wake(Ready::ALL);
199     }
200 
201     /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
202     /// the current value, returning the previous readiness value.
203     ///
204     /// # Arguments
205     /// - `tick`: whether setting the tick or trying to clear readiness for a
206     ///    specific tick.
207     /// - `f`: a closure returning a new readiness value given the previous
208     ///   readiness.
set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready)209     pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) {
210         let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| {
211             // If the io driver is shut down, then you are only allowed to clear readiness.
212             debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_)));
213 
214             const MAX_TICK: usize = TICK.max_value() + 1;
215             let tick = TICK.unpack(curr);
216 
217             let new_tick = match tick_op {
218                 // Trying to clear readiness with an old event!
219                 Tick::Clear(t) if tick as u8 != t => return None,
220                 Tick::Clear(t) => t as usize,
221                 Tick::Set => tick.wrapping_add(1) % MAX_TICK,
222             };
223             let ready = Ready::from_usize(READINESS.unpack(curr));
224             Some(TICK.pack(new_tick, f(ready).as_usize()))
225         });
226     }
227 
228     /// Notifies all pending waiters that have registered interest in `ready`.
229     ///
230     /// There may be many waiters to notify. Waking the pending task **must** be
231     /// done from outside of the lock otherwise there is a potential for a
232     /// deadlock.
233     ///
234     /// A stack array of wakers is created and filled with wakers to notify, the
235     /// lock is released, and the wakers are notified. Because there may be more
236     /// than 32 wakers to notify, if the stack array fills up, the lock is
237     /// released, the array is cleared, and the iteration continues.
wake(&self, ready: Ready)238     pub(super) fn wake(&self, ready: Ready) {
239         let mut wakers = WakeList::new();
240 
241         let mut waiters = self.waiters.lock();
242 
243         // check for AsyncRead slot
244         if ready.is_readable() {
245             if let Some(waker) = waiters.reader.take() {
246                 wakers.push(waker);
247             }
248         }
249 
250         // check for AsyncWrite slot
251         if ready.is_writable() {
252             if let Some(waker) = waiters.writer.take() {
253                 wakers.push(waker);
254             }
255         }
256 
257         'outer: loop {
258             let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
259 
260             while wakers.can_push() {
261                 match iter.next() {
262                     Some(waiter) => {
263                         let waiter = unsafe { &mut *waiter.as_ptr() };
264 
265                         if let Some(waker) = waiter.waker.take() {
266                             waiter.is_ready = true;
267                             wakers.push(waker);
268                         }
269                     }
270                     None => {
271                         break 'outer;
272                     }
273                 }
274             }
275 
276             drop(waiters);
277 
278             wakers.wake_all();
279 
280             // Acquire the lock again.
281             waiters = self.waiters.lock();
282         }
283 
284         // Release the lock before notifying
285         drop(waiters);
286 
287         wakers.wake_all();
288     }
289 
ready_event(&self, interest: Interest) -> ReadyEvent290     pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
291         let curr = self.readiness.load(Acquire);
292 
293         ReadyEvent {
294             tick: TICK.unpack(curr) as u8,
295             ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
296             is_shutdown: SHUTDOWN.unpack(curr) != 0,
297         }
298     }
299 
300     /// Polls for readiness events in a given direction.
301     ///
302     /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
303     /// which cannot use the `async fn` version. This uses reserved reader
304     /// and writer slots.
poll_readiness( &self, cx: &mut Context<'_>, direction: Direction, ) -> Poll<ReadyEvent>305     pub(super) fn poll_readiness(
306         &self,
307         cx: &mut Context<'_>,
308         direction: Direction,
309     ) -> Poll<ReadyEvent> {
310         let curr = self.readiness.load(Acquire);
311 
312         let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
313         let is_shutdown = SHUTDOWN.unpack(curr) != 0;
314 
315         if ready.is_empty() && !is_shutdown {
316             // Update the task info
317             let mut waiters = self.waiters.lock();
318             let waker = match direction {
319                 Direction::Read => &mut waiters.reader,
320                 Direction::Write => &mut waiters.writer,
321             };
322 
323             // Avoid cloning the waker if one is already stored that matches the
324             // current task.
325             match waker {
326                 Some(waker) => waker.clone_from(cx.waker()),
327                 None => *waker = Some(cx.waker().clone()),
328             }
329 
330             // Try again, in case the readiness was changed while we were
331             // taking the waiters lock
332             let curr = self.readiness.load(Acquire);
333             let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
334             let is_shutdown = SHUTDOWN.unpack(curr) != 0;
335             if is_shutdown {
336                 Poll::Ready(ReadyEvent {
337                     tick: TICK.unpack(curr) as u8,
338                     ready: direction.mask(),
339                     is_shutdown,
340                 })
341             } else if ready.is_empty() {
342                 Poll::Pending
343             } else {
344                 Poll::Ready(ReadyEvent {
345                     tick: TICK.unpack(curr) as u8,
346                     ready,
347                     is_shutdown,
348                 })
349             }
350         } else {
351             Poll::Ready(ReadyEvent {
352                 tick: TICK.unpack(curr) as u8,
353                 ready,
354                 is_shutdown,
355             })
356         }
357     }
358 
clear_readiness(&self, event: ReadyEvent)359     pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
360         // This consumes the current readiness state **except** for closed
361         // states. Closed states are excluded because they are final states.
362         let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
363         self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed);
364     }
365 
clear_wakers(&self)366     pub(crate) fn clear_wakers(&self) {
367         let mut waiters = self.waiters.lock();
368         waiters.reader.take();
369         waiters.writer.take();
370     }
371 }
372 
373 impl Drop for ScheduledIo {
drop(&mut self)374     fn drop(&mut self) {
375         self.wake(Ready::ALL);
376     }
377 }
378 
379 unsafe impl Send for ScheduledIo {}
380 unsafe impl Sync for ScheduledIo {}
381 
382 impl ScheduledIo {
383     /// An async version of `poll_readiness` which uses a linked list of wakers.
readiness(&self, interest: Interest) -> ReadyEvent384     pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
385         self.readiness_fut(interest).await
386     }
387 
388     // This is in a separate function so that the borrow checker doesn't think
389     // we are borrowing the `UnsafeCell` possibly over await boundaries.
390     //
391     // Go figure.
readiness_fut(&self, interest: Interest) -> Readiness<'_>392     fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
393         Readiness {
394             scheduled_io: self,
395             state: State::Init,
396             waiter: UnsafeCell::new(Waiter {
397                 pointers: linked_list::Pointers::new(),
398                 waker: None,
399                 is_ready: false,
400                 interest,
401                 _p: PhantomPinned,
402             }),
403         }
404     }
405 }
406 
407 unsafe impl linked_list::Link for Waiter {
408     type Handle = NonNull<Waiter>;
409     type Target = Waiter;
410 
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>411     fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
412         *handle
413     }
414 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>415     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
416         ptr
417     }
418 
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>419     unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
420         Waiter::addr_of_pointers(target)
421     }
422 }
423 
424 // ===== impl Readiness =====
425 
426 impl Future for Readiness<'_> {
427     type Output = ReadyEvent;
428 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>429     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
430         use std::sync::atomic::Ordering::SeqCst;
431 
432         let (scheduled_io, state, waiter) = unsafe {
433             let me = self.get_unchecked_mut();
434             (&me.scheduled_io, &mut me.state, &me.waiter)
435         };
436 
437         loop {
438             match *state {
439                 State::Init => {
440                     // Optimistically check existing readiness
441                     let curr = scheduled_io.readiness.load(SeqCst);
442                     let is_shutdown = SHUTDOWN.unpack(curr) != 0;
443 
444                     // Safety: `waiter.interest` never changes
445                     let interest = unsafe { (*waiter.get()).interest };
446                     let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest);
447 
448                     if !ready.is_empty() || is_shutdown {
449                         // Currently ready!
450                         let tick = TICK.unpack(curr) as u8;
451                         *state = State::Done;
452                         return Poll::Ready(ReadyEvent {
453                             tick,
454                             ready,
455                             is_shutdown,
456                         });
457                     }
458 
459                     // Wasn't ready, take the lock (and check again while locked).
460                     let mut waiters = scheduled_io.waiters.lock();
461 
462                     let curr = scheduled_io.readiness.load(SeqCst);
463                     let mut ready = Ready::from_usize(READINESS.unpack(curr));
464                     let is_shutdown = SHUTDOWN.unpack(curr) != 0;
465 
466                     if is_shutdown {
467                         ready = Ready::ALL;
468                     }
469 
470                     let ready = ready.intersection(interest);
471 
472                     if !ready.is_empty() || is_shutdown {
473                         // Currently ready!
474                         let tick = TICK.unpack(curr) as u8;
475                         *state = State::Done;
476                         return Poll::Ready(ReadyEvent {
477                             tick,
478                             ready,
479                             is_shutdown,
480                         });
481                     }
482 
483                     // Not ready even after locked, insert into list...
484 
485                     // Safety: called while locked
486                     unsafe {
487                         (*waiter.get()).waker = Some(cx.waker().clone());
488                     }
489 
490                     // Insert the waiter into the linked list
491                     //
492                     // safety: pointers from `UnsafeCell` are never null.
493                     waiters
494                         .list
495                         .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
496                     *state = State::Waiting;
497                 }
498                 State::Waiting => {
499                     // Currently in the "Waiting" state, implying the caller has
500                     // a waiter stored in the waiter list (guarded by
501                     // `notify.waiters`). In order to access the waker fields,
502                     // we must hold the lock.
503 
504                     let waiters = scheduled_io.waiters.lock();
505 
506                     // Safety: called while locked
507                     let w = unsafe { &mut *waiter.get() };
508 
509                     if w.is_ready {
510                         // Our waker has been notified.
511                         *state = State::Done;
512                     } else {
513                         // Update the waker, if necessary.
514                         w.waker.as_mut().unwrap().clone_from(cx.waker());
515                         return Poll::Pending;
516                     }
517 
518                     // Explicit drop of the lock to indicate the scope that the
519                     // lock is held. Because holding the lock is required to
520                     // ensure safe access to fields not held within the lock, it
521                     // is helpful to visualize the scope of the critical
522                     // section.
523                     drop(waiters);
524                 }
525                 State::Done => {
526                     // Safety: State::Done means it is no longer shared
527                     let w = unsafe { &mut *waiter.get() };
528 
529                     let curr = scheduled_io.readiness.load(Acquire);
530                     let is_shutdown = SHUTDOWN.unpack(curr) != 0;
531 
532                     // The returned tick might be newer than the event
533                     // which notified our waker. This is ok because the future
534                     // still didn't return `Poll::Ready`.
535                     let tick = TICK.unpack(curr) as u8;
536 
537                     // The readiness state could have been cleared in the meantime,
538                     // but we allow the returned ready set to be empty.
539                     let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(w.interest);
540 
541                     return Poll::Ready(ReadyEvent {
542                         tick,
543                         ready,
544                         is_shutdown,
545                     });
546                 }
547             }
548         }
549     }
550 }
551 
552 impl Drop for Readiness<'_> {
drop(&mut self)553     fn drop(&mut self) {
554         let mut waiters = self.scheduled_io.waiters.lock();
555 
556         // Safety: `waiter` is only ever stored in `waiters`
557         unsafe {
558             waiters
559                 .list
560                 .remove(NonNull::new_unchecked(self.waiter.get()))
561         };
562     }
563 }
564 
565 unsafe impl Send for Readiness<'_> {}
566 unsafe impl Sync for Readiness<'_> {}
567