1 use super::task::Task; 2 use super::FuturesUnordered; 3 use core::marker::PhantomData; 4 use core::pin::Pin; 5 use core::ptr; 6 use core::sync::atomic::Ordering::Relaxed; 7 8 /// Mutable iterator over all futures in the unordered set. 9 #[derive(Debug)] 10 pub struct IterPinMut<'a, Fut> { 11 pub(super) task: *const Task<Fut>, 12 pub(super) len: usize, 13 pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>, 14 } 15 16 /// Mutable iterator over all futures in the unordered set. 17 #[derive(Debug)] 18 pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); 19 20 /// Immutable iterator over all futures in the unordered set. 21 #[derive(Debug)] 22 pub struct IterPinRef<'a, Fut> { 23 pub(super) task: *const Task<Fut>, 24 pub(super) len: usize, 25 pub(super) pending_next_all: *mut Task<Fut>, 26 pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>, 27 } 28 29 /// Immutable iterator over all the futures in the unordered set. 30 #[derive(Debug)] 31 pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); 32 33 /// Owned iterator over all futures in the unordered set. 34 #[derive(Debug)] 35 pub struct IntoIter<Fut: Unpin> { 36 pub(super) len: usize, 37 pub(super) inner: FuturesUnordered<Fut>, 38 } 39 40 impl<Fut: Unpin> Iterator for IntoIter<Fut> { 41 type Item = Fut; 42 next(&mut self) -> Option<Self::Item>43 fn next(&mut self) -> Option<Self::Item> { 44 // `head_all` can be accessed directly and we don't need to spin on 45 // `Task::next_all` since we have exclusive access to the set. 46 let task = self.inner.head_all.get_mut(); 47 48 if (*task).is_null() { 49 return None; 50 } 51 52 unsafe { 53 // Moving out of the future is safe because it is `Unpin` 54 let future = (*(**task).future.get()).take().unwrap(); 55 56 // Mutable access to a previously shared `FuturesUnordered` implies 57 // that the other threads already released the object before the 58 // current thread acquired it, so relaxed ordering can be used and 59 // valid `next_all` checks can be skipped. 60 let next = (**task).next_all.load(Relaxed); 61 *task = next; 62 if !task.is_null() { 63 *(**task).prev_all.get() = ptr::null_mut(); 64 } 65 self.len -= 1; 66 Some(future) 67 } 68 } 69 size_hint(&self) -> (usize, Option<usize>)70 fn size_hint(&self) -> (usize, Option<usize>) { 71 (self.len, Some(self.len)) 72 } 73 } 74 75 impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {} 76 77 impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { 78 type Item = Pin<&'a mut Fut>; 79 next(&mut self) -> Option<Self::Item>80 fn next(&mut self) -> Option<Self::Item> { 81 if self.task.is_null() { 82 return None; 83 } 84 85 unsafe { 86 let future = (*(*self.task).future.get()).as_mut().unwrap(); 87 88 // Mutable access to a previously shared `FuturesUnordered` implies 89 // that the other threads already released the object before the 90 // current thread acquired it, so relaxed ordering can be used and 91 // valid `next_all` checks can be skipped. 92 let next = (*self.task).next_all.load(Relaxed); 93 self.task = next; 94 self.len -= 1; 95 Some(Pin::new_unchecked(future)) 96 } 97 } 98 size_hint(&self) -> (usize, Option<usize>)99 fn size_hint(&self) -> (usize, Option<usize>) { 100 (self.len, Some(self.len)) 101 } 102 } 103 104 impl<Fut> ExactSizeIterator for IterPinMut<'_, Fut> {} 105 106 impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { 107 type Item = &'a mut Fut; 108 next(&mut self) -> Option<Self::Item>109 fn next(&mut self) -> Option<Self::Item> { 110 self.0.next().map(Pin::get_mut) 111 } 112 size_hint(&self) -> (usize, Option<usize>)113 fn size_hint(&self) -> (usize, Option<usize>) { 114 self.0.size_hint() 115 } 116 } 117 118 impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {} 119 120 impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { 121 type Item = Pin<&'a Fut>; 122 next(&mut self) -> Option<Self::Item>123 fn next(&mut self) -> Option<Self::Item> { 124 if self.task.is_null() { 125 return None; 126 } 127 128 unsafe { 129 let future = (*(*self.task).future.get()).as_ref().unwrap(); 130 131 // Relaxed ordering can be used since acquire ordering when 132 // `head_all` was initially read for this iterator implies acquire 133 // ordering for all previously inserted nodes (and we don't need to 134 // read `len_all` again for any other nodes). 135 let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed); 136 self.task = next; 137 self.len -= 1; 138 Some(Pin::new_unchecked(future)) 139 } 140 } 141 size_hint(&self) -> (usize, Option<usize>)142 fn size_hint(&self) -> (usize, Option<usize>) { 143 (self.len, Some(self.len)) 144 } 145 } 146 147 impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {} 148 149 impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { 150 type Item = &'a Fut; 151 next(&mut self) -> Option<Self::Item>152 fn next(&mut self) -> Option<Self::Item> { 153 self.0.next().map(Pin::get_ref) 154 } 155 size_hint(&self) -> (usize, Option<usize>)156 fn size_hint(&self) -> (usize, Option<usize>) { 157 self.0.size_hint() 158 } 159 } 160 161 impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {} 162 163 // SAFETY: we do nothing thread-local and there is no interior mutability, 164 // so the usual structural `Send`/`Sync` apply. 165 unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {} 166 unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {} 167 168 unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {} 169 unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {} 170 171 unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {} 172 unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {} 173