1 use super::plumbing::*;
2 use super::*;
3 use std::fmt;
4 use std::sync::atomic::{AtomicBool, Ordering};
5 
6 /// `SkipAnyWhile` is an iterator that skips over elements from anywhere in `I`
7 /// until the callback returns `false`.
8 /// This struct is created by the [`skip_any_while()`] method on [`ParallelIterator`]
9 ///
10 /// [`skip_any_while()`]: trait.ParallelIterator.html#method.skip_any_while
11 /// [`ParallelIterator`]: trait.ParallelIterator.html
12 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
13 #[derive(Clone)]
14 pub struct SkipAnyWhile<I: ParallelIterator, P> {
15     base: I,
16     predicate: P,
17 }
18 
19 impl<I: ParallelIterator + fmt::Debug, P> fmt::Debug for SkipAnyWhile<I, P> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result20     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21         f.debug_struct("SkipAnyWhile")
22             .field("base", &self.base)
23             .finish()
24     }
25 }
26 
27 impl<I, P> SkipAnyWhile<I, P>
28 where
29     I: ParallelIterator,
30 {
31     /// Creates a new `SkipAnyWhile` iterator.
new(base: I, predicate: P) -> Self32     pub(super) fn new(base: I, predicate: P) -> Self {
33         SkipAnyWhile { base, predicate }
34     }
35 }
36 
37 impl<I, P> ParallelIterator for SkipAnyWhile<I, P>
38 where
39     I: ParallelIterator,
40     P: Fn(&I::Item) -> bool + Sync + Send,
41 {
42     type Item = I::Item;
43 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,44     fn drive_unindexed<C>(self, consumer: C) -> C::Result
45     where
46         C: UnindexedConsumer<Self::Item>,
47     {
48         let consumer1 = SkipAnyWhileConsumer {
49             base: consumer,
50             predicate: &self.predicate,
51             skipping: &AtomicBool::new(true),
52         };
53         self.base.drive_unindexed(consumer1)
54     }
55 }
56 
57 /// ////////////////////////////////////////////////////////////////////////
58 /// Consumer implementation
59 
60 struct SkipAnyWhileConsumer<'p, C, P> {
61     base: C,
62     predicate: &'p P,
63     skipping: &'p AtomicBool,
64 }
65 
66 impl<'p, T, C, P> Consumer<T> for SkipAnyWhileConsumer<'p, C, P>
67 where
68     C: Consumer<T>,
69     P: Fn(&T) -> bool + Sync,
70 {
71     type Folder = SkipAnyWhileFolder<'p, C::Folder, P>;
72     type Reducer = C::Reducer;
73     type Result = C::Result;
74 
split_at(self, index: usize) -> (Self, Self, Self::Reducer)75     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
76         let (left, right, reducer) = self.base.split_at(index);
77         (
78             SkipAnyWhileConsumer { base: left, ..self },
79             SkipAnyWhileConsumer {
80                 base: right,
81                 ..self
82             },
83             reducer,
84         )
85     }
86 
into_folder(self) -> Self::Folder87     fn into_folder(self) -> Self::Folder {
88         SkipAnyWhileFolder {
89             base: self.base.into_folder(),
90             predicate: self.predicate,
91             skipping: self.skipping,
92         }
93     }
94 
full(&self) -> bool95     fn full(&self) -> bool {
96         self.base.full()
97     }
98 }
99 
100 impl<'p, T, C, P> UnindexedConsumer<T> for SkipAnyWhileConsumer<'p, C, P>
101 where
102     C: UnindexedConsumer<T>,
103     P: Fn(&T) -> bool + Sync,
104 {
split_off_left(&self) -> Self105     fn split_off_left(&self) -> Self {
106         SkipAnyWhileConsumer {
107             base: self.base.split_off_left(),
108             ..*self
109         }
110     }
111 
to_reducer(&self) -> Self::Reducer112     fn to_reducer(&self) -> Self::Reducer {
113         self.base.to_reducer()
114     }
115 }
116 
117 struct SkipAnyWhileFolder<'p, C, P> {
118     base: C,
119     predicate: &'p P,
120     skipping: &'p AtomicBool,
121 }
122 
skip<T>(item: &T, skipping: &AtomicBool, predicate: &impl Fn(&T) -> bool) -> bool123 fn skip<T>(item: &T, skipping: &AtomicBool, predicate: &impl Fn(&T) -> bool) -> bool {
124     if !skipping.load(Ordering::Relaxed) {
125         return false;
126     }
127     if predicate(item) {
128         return true;
129     }
130     skipping.store(false, Ordering::Relaxed);
131     false
132 }
133 
134 impl<'p, T, C, P> Folder<T> for SkipAnyWhileFolder<'p, C, P>
135 where
136     C: Folder<T>,
137     P: Fn(&T) -> bool + 'p,
138 {
139     type Result = C::Result;
140 
consume(mut self, item: T) -> Self141     fn consume(mut self, item: T) -> Self {
142         if !skip(&item, self.skipping, self.predicate) {
143             self.base = self.base.consume(item);
144         }
145         self
146     }
147 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,148     fn consume_iter<I>(mut self, iter: I) -> Self
149     where
150         I: IntoIterator<Item = T>,
151     {
152         self.base = self.base.consume_iter(
153             iter.into_iter()
154                 .skip_while(move |x| skip(x, self.skipping, self.predicate)),
155         );
156         self
157     }
158 
complete(self) -> C::Result159     fn complete(self) -> C::Result {
160         self.base.complete()
161     }
162 
full(&self) -> bool163     fn full(&self) -> bool {
164         self.base.full()
165     }
166 }
167