1 use super::task::Task;
2 use super::FuturesUnordered;
3 use core::marker::PhantomData;
4 use core::pin::Pin;
5 use core::ptr;
6 use core::sync::atomic::Ordering::Relaxed;
7 
8 /// Mutable iterator over all futures in the unordered set.
9 #[derive(Debug)]
10 pub struct IterPinMut<'a, Fut> {
11     pub(super) task: *const Task<Fut>,
12     pub(super) len: usize,
13     pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>,
14 }
15 
16 /// Mutable iterator over all futures in the unordered set.
17 #[derive(Debug)]
18 pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>);
19 
20 /// Immutable iterator over all futures in the unordered set.
21 #[derive(Debug)]
22 pub struct IterPinRef<'a, Fut> {
23     pub(super) task: *const Task<Fut>,
24     pub(super) len: usize,
25     pub(super) pending_next_all: *mut Task<Fut>,
26     pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>,
27 }
28 
29 /// Immutable iterator over all the futures in the unordered set.
30 #[derive(Debug)]
31 pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);
32 
33 /// Owned iterator over all futures in the unordered set.
34 #[derive(Debug)]
35 pub struct IntoIter<Fut: Unpin> {
36     pub(super) len: usize,
37     pub(super) inner: FuturesUnordered<Fut>,
38 }
39 
40 impl<Fut: Unpin> Iterator for IntoIter<Fut> {
41     type Item = Fut;
42 
next(&mut self) -> Option<Self::Item>43     fn next(&mut self) -> Option<Self::Item> {
44         // `head_all` can be accessed directly and we don't need to spin on
45         // `Task::next_all` since we have exclusive access to the set.
46         let task = self.inner.head_all.get_mut();
47 
48         if (*task).is_null() {
49             return None;
50         }
51 
52         unsafe {
53             // Moving out of the future is safe because it is `Unpin`
54             let future = (*(**task).future.get()).take().unwrap();
55 
56             // Mutable access to a previously shared `FuturesUnordered` implies
57             // that the other threads already released the object before the
58             // current thread acquired it, so relaxed ordering can be used and
59             // valid `next_all` checks can be skipped.
60             let next = (**task).next_all.load(Relaxed);
61             *task = next;
62             if !task.is_null() {
63                 *(**task).prev_all.get() = ptr::null_mut();
64             }
65             self.len -= 1;
66             Some(future)
67         }
68     }
69 
size_hint(&self) -> (usize, Option<usize>)70     fn size_hint(&self) -> (usize, Option<usize>) {
71         (self.len, Some(self.len))
72     }
73 }
74 
75 impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {}
76 
77 impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
78     type Item = Pin<&'a mut Fut>;
79 
next(&mut self) -> Option<Self::Item>80     fn next(&mut self) -> Option<Self::Item> {
81         if self.task.is_null() {
82             return None;
83         }
84 
85         unsafe {
86             let future = (*(*self.task).future.get()).as_mut().unwrap();
87 
88             // Mutable access to a previously shared `FuturesUnordered` implies
89             // that the other threads already released the object before the
90             // current thread acquired it, so relaxed ordering can be used and
91             // valid `next_all` checks can be skipped.
92             let next = (*self.task).next_all.load(Relaxed);
93             self.task = next;
94             self.len -= 1;
95             Some(Pin::new_unchecked(future))
96         }
97     }
98 
size_hint(&self) -> (usize, Option<usize>)99     fn size_hint(&self) -> (usize, Option<usize>) {
100         (self.len, Some(self.len))
101     }
102 }
103 
104 impl<Fut> ExactSizeIterator for IterPinMut<'_, Fut> {}
105 
106 impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> {
107     type Item = &'a mut Fut;
108 
next(&mut self) -> Option<Self::Item>109     fn next(&mut self) -> Option<Self::Item> {
110         self.0.next().map(Pin::get_mut)
111     }
112 
size_hint(&self) -> (usize, Option<usize>)113     fn size_hint(&self) -> (usize, Option<usize>) {
114         self.0.size_hint()
115     }
116 }
117 
118 impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {}
119 
120 impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
121     type Item = Pin<&'a Fut>;
122 
next(&mut self) -> Option<Self::Item>123     fn next(&mut self) -> Option<Self::Item> {
124         if self.task.is_null() {
125             return None;
126         }
127 
128         unsafe {
129             let future = (*(*self.task).future.get()).as_ref().unwrap();
130 
131             // Relaxed ordering can be used since acquire ordering when
132             // `head_all` was initially read for this iterator implies acquire
133             // ordering for all previously inserted nodes (and we don't need to
134             // read `len_all` again for any other nodes).
135             let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed);
136             self.task = next;
137             self.len -= 1;
138             Some(Pin::new_unchecked(future))
139         }
140     }
141 
size_hint(&self) -> (usize, Option<usize>)142     fn size_hint(&self) -> (usize, Option<usize>) {
143         (self.len, Some(self.len))
144     }
145 }
146 
147 impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {}
148 
149 impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
150     type Item = &'a Fut;
151 
next(&mut self) -> Option<Self::Item>152     fn next(&mut self) -> Option<Self::Item> {
153         self.0.next().map(Pin::get_ref)
154     }
155 
size_hint(&self) -> (usize, Option<usize>)156     fn size_hint(&self) -> (usize, Option<usize>) {
157         self.0.size_hint()
158     }
159 }
160 
161 impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {}
162 
163 // SAFETY: we do nothing thread-local and there is no interior mutability,
164 // so the usual structural `Send`/`Sync` apply.
165 unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {}
166 unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {}
167 
168 unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {}
169 unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {}
170 
171 unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {}
172 unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {}
173