1 //! Parallel iterator types for [vectors][std::vec] (`Vec<T>`)
2 //!
3 //! You will rarely need to interact with this module directly unless you need
4 //! to name one of the iterator types.
5 //!
6 //! [std::vec]: https://doc.rust-lang.org/stable/std/vec/
7 
8 use crate::iter::plumbing::*;
9 use crate::iter::*;
10 use crate::math::simplify_range;
11 use crate::slice::{Iter, IterMut};
12 use std::iter;
13 use std::mem;
14 use std::ops::{Range, RangeBounds};
15 use std::ptr;
16 use std::slice;
17 
18 impl<'data, T: Sync + 'data> IntoParallelIterator for &'data Vec<T> {
19     type Item = &'data T;
20     type Iter = Iter<'data, T>;
21 
into_par_iter(self) -> Self::Iter22     fn into_par_iter(self) -> Self::Iter {
23         <&[T]>::into_par_iter(self)
24     }
25 }
26 
27 impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut Vec<T> {
28     type Item = &'data mut T;
29     type Iter = IterMut<'data, T>;
30 
into_par_iter(self) -> Self::Iter31     fn into_par_iter(self) -> Self::Iter {
32         <&mut [T]>::into_par_iter(self)
33     }
34 }
35 
36 /// Parallel iterator that moves out of a vector.
37 #[derive(Debug, Clone)]
38 pub struct IntoIter<T: Send> {
39     vec: Vec<T>,
40 }
41 
42 impl<T: Send> IntoParallelIterator for Vec<T> {
43     type Item = T;
44     type Iter = IntoIter<T>;
45 
into_par_iter(self) -> Self::Iter46     fn into_par_iter(self) -> Self::Iter {
47         IntoIter { vec: self }
48     }
49 }
50 
51 impl<T: Send> ParallelIterator for IntoIter<T> {
52     type Item = T;
53 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,54     fn drive_unindexed<C>(self, consumer: C) -> C::Result
55     where
56         C: UnindexedConsumer<Self::Item>,
57     {
58         bridge(self, consumer)
59     }
60 
opt_len(&self) -> Option<usize>61     fn opt_len(&self) -> Option<usize> {
62         Some(self.len())
63     }
64 }
65 
66 impl<T: Send> IndexedParallelIterator for IntoIter<T> {
drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>,67     fn drive<C>(self, consumer: C) -> C::Result
68     where
69         C: Consumer<Self::Item>,
70     {
71         bridge(self, consumer)
72     }
73 
len(&self) -> usize74     fn len(&self) -> usize {
75         self.vec.len()
76     }
77 
with_producer<CB>(mut self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>,78     fn with_producer<CB>(mut self, callback: CB) -> CB::Output
79     where
80         CB: ProducerCallback<Self::Item>,
81     {
82         // Drain every item, and then the vector only needs to free its buffer.
83         self.vec.par_drain(..).with_producer(callback)
84     }
85 }
86 
87 impl<'data, T: Send> ParallelDrainRange<usize> for &'data mut Vec<T> {
88     type Iter = Drain<'data, T>;
89     type Item = T;
90 
par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter91     fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter {
92         Drain {
93             orig_len: self.len(),
94             range: simplify_range(range, self.len()),
95             vec: self,
96         }
97     }
98 }
99 
100 /// Draining parallel iterator that moves a range out of a vector, but keeps the total capacity.
101 #[derive(Debug)]
102 pub struct Drain<'data, T: Send> {
103     vec: &'data mut Vec<T>,
104     range: Range<usize>,
105     orig_len: usize,
106 }
107 
108 impl<'data, T: Send> ParallelIterator for Drain<'data, T> {
109     type Item = T;
110 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,111     fn drive_unindexed<C>(self, consumer: C) -> C::Result
112     where
113         C: UnindexedConsumer<Self::Item>,
114     {
115         bridge(self, consumer)
116     }
117 
opt_len(&self) -> Option<usize>118     fn opt_len(&self) -> Option<usize> {
119         Some(self.len())
120     }
121 }
122 
123 impl<'data, T: Send> IndexedParallelIterator for Drain<'data, T> {
drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>,124     fn drive<C>(self, consumer: C) -> C::Result
125     where
126         C: Consumer<Self::Item>,
127     {
128         bridge(self, consumer)
129     }
130 
len(&self) -> usize131     fn len(&self) -> usize {
132         self.range.len()
133     }
134 
with_producer<CB>(self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>,135     fn with_producer<CB>(self, callback: CB) -> CB::Output
136     where
137         CB: ProducerCallback<Self::Item>,
138     {
139         unsafe {
140             // Make the vector forget about the drained items, and temporarily the tail too.
141             self.vec.set_len(self.range.start);
142 
143             // Create the producer as the exclusive "owner" of the slice.
144             let producer = DrainProducer::from_vec(self.vec, self.range.len());
145 
146             // The producer will move or drop each item from the drained range.
147             callback.callback(producer)
148         }
149     }
150 }
151 
152 impl<'data, T: Send> Drop for Drain<'data, T> {
drop(&mut self)153     fn drop(&mut self) {
154         let Range { start, end } = self.range;
155         if self.vec.len() == self.orig_len {
156             // We must not have produced, so just call a normal drain to remove the items.
157             self.vec.drain(start..end);
158         } else if start == end {
159             // Empty range, so just restore the length to its original state
160             unsafe {
161                 self.vec.set_len(self.orig_len);
162             }
163         } else if end < self.orig_len {
164             // The producer was responsible for consuming the drained items.
165             // Move the tail items to their new place, then set the length to include them.
166             unsafe {
167                 let ptr = self.vec.as_mut_ptr().add(start);
168                 let tail_ptr = self.vec.as_ptr().add(end);
169                 let tail_len = self.orig_len - end;
170                 ptr::copy(tail_ptr, ptr, tail_len);
171                 self.vec.set_len(start + tail_len);
172             }
173         }
174     }
175 }
176 
177 /// ////////////////////////////////////////////////////////////////////////
178 
179 pub(crate) struct DrainProducer<'data, T: Send> {
180     slice: &'data mut [T],
181 }
182 
183 impl<T: Send> DrainProducer<'_, T> {
184     /// Creates a draining producer, which *moves* items from the slice.
185     ///
186     /// Unsafe because `!Copy` data must not be read after the borrow is released.
new(slice: &mut [T]) -> DrainProducer<'_, T>187     pub(crate) unsafe fn new(slice: &mut [T]) -> DrainProducer<'_, T> {
188         DrainProducer { slice }
189     }
190 
191     /// Creates a draining producer, which *moves* items from the tail of the vector.
192     ///
193     /// Unsafe because we're moving from beyond `vec.len()`, so the caller must ensure
194     /// that data is initialized and not read after the borrow is released.
from_vec(vec: &mut Vec<T>, len: usize) -> DrainProducer<'_, T>195     unsafe fn from_vec(vec: &mut Vec<T>, len: usize) -> DrainProducer<'_, T> {
196         let start = vec.len();
197         assert!(vec.capacity() - start >= len);
198 
199         // The pointer is derived from `Vec` directly, not through a `Deref`,
200         // so it has provenance over the whole allocation.
201         let ptr = vec.as_mut_ptr().add(start);
202         DrainProducer::new(slice::from_raw_parts_mut(ptr, len))
203     }
204 }
205 
206 impl<'data, T: 'data + Send> Producer for DrainProducer<'data, T> {
207     type Item = T;
208     type IntoIter = SliceDrain<'data, T>;
209 
into_iter(mut self) -> Self::IntoIter210     fn into_iter(mut self) -> Self::IntoIter {
211         // replace the slice so we don't drop it twice
212         let slice = mem::take(&mut self.slice);
213         SliceDrain {
214             iter: slice.iter_mut(),
215         }
216     }
217 
split_at(mut self, index: usize) -> (Self, Self)218     fn split_at(mut self, index: usize) -> (Self, Self) {
219         // replace the slice so we don't drop it twice
220         let slice = mem::take(&mut self.slice);
221         let (left, right) = slice.split_at_mut(index);
222         unsafe { (DrainProducer::new(left), DrainProducer::new(right)) }
223     }
224 }
225 
226 impl<'data, T: 'data + Send> Drop for DrainProducer<'data, T> {
drop(&mut self)227     fn drop(&mut self) {
228         // extract the slice so we can use `Drop for [T]`
229         let slice_ptr: *mut [T] = mem::take::<&'data mut [T]>(&mut self.slice);
230         unsafe { ptr::drop_in_place::<[T]>(slice_ptr) };
231     }
232 }
233 
234 /// ////////////////////////////////////////////////////////////////////////
235 
236 // like std::vec::Drain, without updating a source Vec
237 pub(crate) struct SliceDrain<'data, T> {
238     iter: slice::IterMut<'data, T>,
239 }
240 
241 impl<'data, T: 'data> Iterator for SliceDrain<'data, T> {
242     type Item = T;
243 
next(&mut self) -> Option<T>244     fn next(&mut self) -> Option<T> {
245         // Coerce the pointer early, so we don't keep the
246         // reference that's about to be invalidated.
247         let ptr: *const T = self.iter.next()?;
248         Some(unsafe { ptr::read(ptr) })
249     }
250 
size_hint(&self) -> (usize, Option<usize>)251     fn size_hint(&self) -> (usize, Option<usize>) {
252         self.iter.size_hint()
253     }
254 
count(self) -> usize255     fn count(self) -> usize {
256         self.iter.len()
257     }
258 }
259 
260 impl<'data, T: 'data> DoubleEndedIterator for SliceDrain<'data, T> {
next_back(&mut self) -> Option<Self::Item>261     fn next_back(&mut self) -> Option<Self::Item> {
262         // Coerce the pointer early, so we don't keep the
263         // reference that's about to be invalidated.
264         let ptr: *const T = self.iter.next_back()?;
265         Some(unsafe { ptr::read(ptr) })
266     }
267 }
268 
269 impl<'data, T: 'data> ExactSizeIterator for SliceDrain<'data, T> {
len(&self) -> usize270     fn len(&self) -> usize {
271         self.iter.len()
272     }
273 }
274 
275 impl<'data, T: 'data> iter::FusedIterator for SliceDrain<'data, T> {}
276 
277 impl<'data, T: 'data> Drop for SliceDrain<'data, T> {
drop(&mut self)278     fn drop(&mut self) {
279         // extract the iterator so we can use `Drop for [T]`
280         let slice_ptr: *mut [T] = mem::replace(&mut self.iter, [].iter_mut()).into_slice();
281         unsafe { ptr::drop_in_place::<[T]>(slice_ptr) };
282     }
283 }
284