1 use super::super::plumbing::*; 2 use crate::SendPtr; 3 use std::marker::PhantomData; 4 use std::ptr; 5 use std::slice; 6 7 pub(super) struct CollectConsumer<'c, T: Send> { 8 /// See `CollectResult` for explanation of why this is not a slice 9 start: SendPtr<T>, 10 len: usize, 11 marker: PhantomData<&'c mut T>, 12 } 13 14 impl<T: Send> CollectConsumer<'_, T> { 15 /// Create a collector for `len` items in the unused capacity of the vector. appender(vec: &mut Vec<T>, len: usize) -> CollectConsumer<'_, T>16 pub(super) fn appender(vec: &mut Vec<T>, len: usize) -> CollectConsumer<'_, T> { 17 let start = vec.len(); 18 assert!(vec.capacity() - start >= len); 19 20 // SAFETY: We already made sure to have the additional space allocated. 21 // The pointer is derived from `Vec` directly, not through a `Deref`, 22 // so it has provenance over the whole allocation. 23 unsafe { CollectConsumer::new(vec.as_mut_ptr().add(start), len) } 24 } 25 } 26 27 impl<'c, T: Send + 'c> CollectConsumer<'c, T> { 28 /// The target memory is considered uninitialized, and will be 29 /// overwritten without reading or dropping existing values. new(start: *mut T, len: usize) -> Self30 unsafe fn new(start: *mut T, len: usize) -> Self { 31 CollectConsumer { 32 start: SendPtr(start), 33 len, 34 marker: PhantomData, 35 } 36 } 37 } 38 39 /// CollectResult represents an initialized part of the target slice. 40 /// 41 /// This is a proxy owner of the elements in the slice; when it drops, 42 /// the elements will be dropped, unless its ownership is released before then. 43 #[must_use] 44 pub(super) struct CollectResult<'c, T> { 45 /// This pointer and length has the same representation as a slice, 46 /// but retains the provenance of the entire array so that we can merge 47 /// these regions together in `CollectReducer`. 48 start: SendPtr<T>, 49 total_len: usize, 50 /// The current initialized length after `start` 51 initialized_len: usize, 52 /// Lifetime invariance guarantees that the data flows from consumer to result, 53 /// especially for the `scope_fn` callback in `Collect::with_consumer`. 54 invariant_lifetime: PhantomData<&'c mut &'c mut [T]>, 55 } 56 57 unsafe impl<'c, T> Send for CollectResult<'c, T> where T: Send {} 58 59 impl<'c, T> CollectResult<'c, T> { 60 /// The current length of the collect result len(&self) -> usize61 pub(super) fn len(&self) -> usize { 62 self.initialized_len 63 } 64 65 /// Release ownership of the slice of elements, and return the length release_ownership(mut self) -> usize66 pub(super) fn release_ownership(mut self) -> usize { 67 let ret = self.initialized_len; 68 self.initialized_len = 0; 69 ret 70 } 71 } 72 73 impl<'c, T> Drop for CollectResult<'c, T> { drop(&mut self)74 fn drop(&mut self) { 75 // Drop the first `self.initialized_len` elements, which have been recorded 76 // to be initialized by the folder. 77 unsafe { 78 ptr::drop_in_place(slice::from_raw_parts_mut( 79 self.start.0, 80 self.initialized_len, 81 )); 82 } 83 } 84 } 85 86 impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> { 87 type Folder = CollectResult<'c, T>; 88 type Reducer = CollectReducer; 89 type Result = CollectResult<'c, T>; 90 split_at(self, index: usize) -> (Self, Self, CollectReducer)91 fn split_at(self, index: usize) -> (Self, Self, CollectReducer) { 92 let CollectConsumer { start, len, .. } = self; 93 94 // Produce new consumers. 95 // SAFETY: This assert checks that `index` is a valid offset for `start` 96 unsafe { 97 assert!(index <= len); 98 ( 99 CollectConsumer::new(start.0, index), 100 CollectConsumer::new(start.0.add(index), len - index), 101 CollectReducer, 102 ) 103 } 104 } 105 into_folder(self) -> Self::Folder106 fn into_folder(self) -> Self::Folder { 107 // Create a result/folder that consumes values and writes them 108 // into the region after start. The initial result has length 0. 109 CollectResult { 110 start: self.start, 111 total_len: self.len, 112 initialized_len: 0, 113 invariant_lifetime: PhantomData, 114 } 115 } 116 full(&self) -> bool117 fn full(&self) -> bool { 118 false 119 } 120 } 121 122 impl<'c, T: Send + 'c> Folder<T> for CollectResult<'c, T> { 123 type Result = Self; 124 consume(mut self, item: T) -> Self125 fn consume(mut self, item: T) -> Self { 126 assert!( 127 self.initialized_len < self.total_len, 128 "too many values pushed to consumer" 129 ); 130 131 // SAFETY: The assert above is a bounds check for this write, and we 132 // avoid assignment here so we do not drop an uninitialized T. 133 unsafe { 134 // Write item and increase the initialized length 135 self.start.0.add(self.initialized_len).write(item); 136 self.initialized_len += 1; 137 } 138 139 self 140 } 141 complete(self) -> Self::Result142 fn complete(self) -> Self::Result { 143 // NB: We don't explicitly check that the local writes were complete, 144 // but Collect will assert the total result length in the end. 145 self 146 } 147 full(&self) -> bool148 fn full(&self) -> bool { 149 false 150 } 151 } 152 153 /// Pretend to be unindexed for `special_collect_into_vec`, 154 /// but we should never actually get used that way... 155 impl<'c, T: Send + 'c> UnindexedConsumer<T> for CollectConsumer<'c, T> { split_off_left(&self) -> Self156 fn split_off_left(&self) -> Self { 157 unreachable!("CollectConsumer must be indexed!") 158 } to_reducer(&self) -> Self::Reducer159 fn to_reducer(&self) -> Self::Reducer { 160 CollectReducer 161 } 162 } 163 164 /// CollectReducer combines adjacent chunks; the result must always 165 /// be contiguous so that it is one combined slice. 166 pub(super) struct CollectReducer; 167 168 impl<'c, T> Reducer<CollectResult<'c, T>> for CollectReducer { reduce( self, mut left: CollectResult<'c, T>, right: CollectResult<'c, T>, ) -> CollectResult<'c, T>169 fn reduce( 170 self, 171 mut left: CollectResult<'c, T>, 172 right: CollectResult<'c, T>, 173 ) -> CollectResult<'c, T> { 174 // Merge if the CollectResults are adjacent and in left to right order 175 // else: drop the right piece now and total length will end up short in the end, 176 // when the correctness of the collected result is asserted. 177 unsafe { 178 let left_end = left.start.0.add(left.initialized_len); 179 if left_end == right.start.0 { 180 left.total_len += right.total_len; 181 left.initialized_len += right.release_ownership(); 182 } 183 left 184 } 185 } 186 } 187