1 use super::plumbing::*;
2 use super::*;
3 use std::fmt;
4 use std::sync::atomic::{AtomicBool, Ordering};
5 
6 /// `TakeAnyWhile` is an iterator that iterates over elements from anywhere in `I`
7 /// until the callback returns `false`.
8 /// This struct is created by the [`take_any_while()`] method on [`ParallelIterator`]
9 ///
10 /// [`take_any_while()`]: trait.ParallelIterator.html#method.take_any_while
11 /// [`ParallelIterator`]: trait.ParallelIterator.html
12 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
13 #[derive(Clone)]
14 pub struct TakeAnyWhile<I: ParallelIterator, P> {
15     base: I,
16     predicate: P,
17 }
18 
19 impl<I: ParallelIterator + fmt::Debug, P> fmt::Debug for TakeAnyWhile<I, P> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result20     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21         f.debug_struct("TakeAnyWhile")
22             .field("base", &self.base)
23             .finish()
24     }
25 }
26 
27 impl<I, P> TakeAnyWhile<I, P>
28 where
29     I: ParallelIterator,
30 {
31     /// Creates a new `TakeAnyWhile` iterator.
new(base: I, predicate: P) -> Self32     pub(super) fn new(base: I, predicate: P) -> Self {
33         TakeAnyWhile { base, predicate }
34     }
35 }
36 
37 impl<I, P> ParallelIterator for TakeAnyWhile<I, P>
38 where
39     I: ParallelIterator,
40     P: Fn(&I::Item) -> bool + Sync + Send,
41 {
42     type Item = I::Item;
43 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,44     fn drive_unindexed<C>(self, consumer: C) -> C::Result
45     where
46         C: UnindexedConsumer<Self::Item>,
47     {
48         let consumer1 = TakeAnyWhileConsumer {
49             base: consumer,
50             predicate: &self.predicate,
51             taking: &AtomicBool::new(true),
52         };
53         self.base.drive_unindexed(consumer1)
54     }
55 }
56 
57 /// ////////////////////////////////////////////////////////////////////////
58 /// Consumer implementation
59 
60 struct TakeAnyWhileConsumer<'p, C, P> {
61     base: C,
62     predicate: &'p P,
63     taking: &'p AtomicBool,
64 }
65 
66 impl<'p, T, C, P> Consumer<T> for TakeAnyWhileConsumer<'p, C, P>
67 where
68     C: Consumer<T>,
69     P: Fn(&T) -> bool + Sync,
70 {
71     type Folder = TakeAnyWhileFolder<'p, C::Folder, P>;
72     type Reducer = C::Reducer;
73     type Result = C::Result;
74 
split_at(self, index: usize) -> (Self, Self, Self::Reducer)75     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
76         let (left, right, reducer) = self.base.split_at(index);
77         (
78             TakeAnyWhileConsumer { base: left, ..self },
79             TakeAnyWhileConsumer {
80                 base: right,
81                 ..self
82             },
83             reducer,
84         )
85     }
86 
into_folder(self) -> Self::Folder87     fn into_folder(self) -> Self::Folder {
88         TakeAnyWhileFolder {
89             base: self.base.into_folder(),
90             predicate: self.predicate,
91             taking: self.taking,
92         }
93     }
94 
full(&self) -> bool95     fn full(&self) -> bool {
96         !self.taking.load(Ordering::Relaxed) || self.base.full()
97     }
98 }
99 
100 impl<'p, T, C, P> UnindexedConsumer<T> for TakeAnyWhileConsumer<'p, C, P>
101 where
102     C: UnindexedConsumer<T>,
103     P: Fn(&T) -> bool + Sync,
104 {
split_off_left(&self) -> Self105     fn split_off_left(&self) -> Self {
106         TakeAnyWhileConsumer {
107             base: self.base.split_off_left(),
108             ..*self
109         }
110     }
111 
to_reducer(&self) -> Self::Reducer112     fn to_reducer(&self) -> Self::Reducer {
113         self.base.to_reducer()
114     }
115 }
116 
117 struct TakeAnyWhileFolder<'p, C, P> {
118     base: C,
119     predicate: &'p P,
120     taking: &'p AtomicBool,
121 }
122 
take<T>(item: &T, taking: &AtomicBool, predicate: &impl Fn(&T) -> bool) -> bool123 fn take<T>(item: &T, taking: &AtomicBool, predicate: &impl Fn(&T) -> bool) -> bool {
124     if !taking.load(Ordering::Relaxed) {
125         return false;
126     }
127     if predicate(item) {
128         return true;
129     }
130     taking.store(false, Ordering::Relaxed);
131     false
132 }
133 
134 impl<'p, T, C, P> Folder<T> for TakeAnyWhileFolder<'p, C, P>
135 where
136     C: Folder<T>,
137     P: Fn(&T) -> bool + 'p,
138 {
139     type Result = C::Result;
140 
consume(mut self, item: T) -> Self141     fn consume(mut self, item: T) -> Self {
142         if take(&item, self.taking, self.predicate) {
143             self.base = self.base.consume(item);
144         }
145         self
146     }
147 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,148     fn consume_iter<I>(mut self, iter: I) -> Self
149     where
150         I: IntoIterator<Item = T>,
151     {
152         self.base = self.base.consume_iter(
153             iter.into_iter()
154                 .take_while(move |x| take(x, self.taking, self.predicate)),
155         );
156         self
157     }
158 
complete(self) -> C::Result159     fn complete(self) -> C::Result {
160         self.base.complete()
161     }
162 
full(&self) -> bool163     fn full(&self) -> bool {
164         !self.taking.load(Ordering::Relaxed) || self.base.full()
165     }
166 }
167