1 use futures::channel::mpsc;
2 use futures::channel::oneshot;
3 use futures::executor::{block_on, block_on_stream};
4 use futures::future::{err, ok};
5 use futures::stream::{empty, iter_ok, poll_fn, Peekable};
6 
7 // mod support;
8 // use support::*;
9 
10 pub struct Iter<I> {
11     iter: I,
12 }
13 
iter<J, T, E>(i: J) -> Iter<J::IntoIter> where J: IntoIterator<Item = Result<T, E>>,14 pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
15 where
16     J: IntoIterator<Item = Result<T, E>>,
17 {
18     Iter { iter: i.into_iter() }
19 }
20 
21 impl<I, T, E> Stream for Iter<I>
22 where
23     I: Iterator<Item = Result<T, E>>,
24 {
25     type Item = T;
26     type Error = E;
27 
poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<T>, E>28     fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<T>, E> {
29         match self.iter.next() {
30             Some(Ok(e)) => Ok(Poll::Ready(Some(e))),
31             Some(Err(e)) => Err(e),
32             None => Ok(Poll::Ready(None)),
33         }
34     }
35 }
36 
list() -> Box<Stream<Item = i32, Error = u32> + Send>37 fn list() -> Box<Stream<Item = i32, Error = u32> + Send> {
38     let (tx, rx) = mpsc::channel(1);
39     tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Ok(3))).forget();
40     Box::new(rx.then(|r| r.unwrap()))
41 }
42 
err_list() -> Box<Stream<Item = i32, Error = u32> + Send>43 fn err_list() -> Box<Stream<Item = i32, Error = u32> + Send> {
44     let (tx, rx) = mpsc::channel(1);
45     tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Err(3))).forget();
46     Box::new(rx.then(|r| r.unwrap()))
47 }
48 
49 #[test]
map()50 fn map() {
51     assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4]));
52 }
53 
54 #[test]
map_err()55 fn map_err() {
56     assert_done(|| err_list().map_err(|a| a + 1).collect::<Vec<_>>(), Err(4));
57 }
58 
59 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
60 struct FromErrTest(u32);
61 
62 impl From<u32> for FromErrTest {
from(i: u32) -> Self63     fn from(i: u32) -> Self {
64         Self(i)
65     }
66 }
67 
68 #[test]
from_err()69 fn from_err() {
70     assert_done(|| err_list().err_into().collect::<Vec<_>>(), Err(FromErrTest(3)));
71 }
72 
73 #[test]
fold()74 fn fold() {
75     assert_done(|| list().fold(0, |a, b| ok::<i32, u32>(a + b)), Ok(6));
76     assert_done(|| err_list().fold(0, |a, b| ok::<i32, u32>(a + b)), Err(3));
77 }
78 
79 #[test]
filter()80 fn filter() {
81     assert_done(|| list().filter(|a| ok(*a % 2 == 0)).collect(), Ok(vec![2]));
82 }
83 
84 #[test]
filter_map()85 fn filter_map() {
86     assert_done(
87         || list().filter_map(|x| ok(if x % 2 == 0 { Some(x + 10) } else { None })).collect(),
88         Ok(vec![12]),
89     );
90 }
91 
92 #[test]
and_then()93 fn and_then() {
94     assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4]));
95     assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(), Err(1));
96 }
97 
98 #[test]
then()99 fn then() {
100     assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4]));
101 }
102 
103 #[test]
or_else()104 fn or_else() {
105     assert_done(|| err_list().or_else(|a| ok::<i32, u32>(a as i32)).collect(), Ok(vec![1, 2, 3]));
106 }
107 
108 #[test]
flatten()109 fn flatten() {
110     assert_done(|| list().map(|_| list()).flatten().collect(), Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
111 }
112 
113 #[test]
skip()114 fn skip() {
115     assert_done(|| list().skip(2).collect(), Ok(vec![3]));
116 }
117 
118 #[test]
skip_passes_errors_through()119 fn skip_passes_errors_through() {
120     let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1));
121     assert_eq!(s.next(), Some(Err(1)));
122     assert_eq!(s.next(), Some(Err(2)));
123     assert_eq!(s.next(), Some(Ok(4)));
124     assert_eq!(s.next(), Some(Ok(5)));
125     assert_eq!(s.next(), None);
126 }
127 
128 #[test]
skip_while()129 fn skip_while() {
130     assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), Ok(vec![2, 3]));
131 }
132 #[test]
take()133 fn take() {
134     assert_done(|| list().take(2).collect(), Ok(vec![1, 2]));
135 }
136 
137 #[test]
take_while()138 fn take_while() {
139     assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), Ok(vec![1, 2]));
140 }
141 
142 #[test]
take_passes_errors_through()143 fn take_passes_errors_through() {
144     let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)]).take(1));
145     assert_eq!(s.next(), Some(Err(1)));
146     assert_eq!(s.next(), Some(Err(2)));
147     assert_eq!(s.next(), Some(Ok(3)));
148     assert_eq!(s.next(), None);
149 
150     let mut s = block_on_stream(iter(vec![Ok(1), Err(2)]).take(1));
151     assert_eq!(s.next(), Some(Ok(1)));
152     assert_eq!(s.next(), None);
153 }
154 
155 #[test]
peekable()156 fn peekable() {
157     assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3]));
158 }
159 
160 #[test]
fuse()161 fn fuse() {
162     let mut stream = block_on_stream(list().fuse());
163     assert_eq!(stream.next(), Some(Ok(1)));
164     assert_eq!(stream.next(), Some(Ok(2)));
165     assert_eq!(stream.next(), Some(Ok(3)));
166     assert_eq!(stream.next(), None);
167     assert_eq!(stream.next(), None);
168     assert_eq!(stream.next(), None);
169 }
170 
171 #[test]
buffered()172 fn buffered() {
173     let (tx, rx) = mpsc::channel(1);
174     let (a, b) = oneshot::channel::<u32>();
175     let (c, d) = oneshot::channel::<u32>();
176 
177     tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
178         .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
179         .forget();
180 
181     let mut rx = rx.buffered(2);
182     sassert_empty(&mut rx);
183     c.send(3).unwrap();
184     sassert_empty(&mut rx);
185     a.send(5).unwrap();
186     let mut rx = block_on_stream(rx);
187     assert_eq!(rx.next(), Some(Ok(5)));
188     assert_eq!(rx.next(), Some(Ok(3)));
189     assert_eq!(rx.next(), None);
190 
191     let (tx, rx) = mpsc::channel(1);
192     let (a, b) = oneshot::channel::<u32>();
193     let (c, d) = oneshot::channel::<u32>();
194 
195     tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
196         .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
197         .forget();
198 
199     let mut rx = rx.buffered(1);
200     sassert_empty(&mut rx);
201     c.send(3).unwrap();
202     sassert_empty(&mut rx);
203     a.send(5).unwrap();
204     let mut rx = block_on_stream(rx);
205     assert_eq!(rx.next(), Some(Ok(5)));
206     assert_eq!(rx.next(), Some(Ok(3)));
207     assert_eq!(rx.next(), None);
208 }
209 
210 #[test]
unordered()211 fn unordered() {
212     let (tx, rx) = mpsc::channel(1);
213     let (a, b) = oneshot::channel::<u32>();
214     let (c, d) = oneshot::channel::<u32>();
215 
216     tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
217         .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
218         .forget();
219 
220     let mut rx = rx.buffer_unordered(2);
221     sassert_empty(&mut rx);
222     let mut rx = block_on_stream(rx);
223     c.send(3).unwrap();
224     assert_eq!(rx.next(), Some(Ok(3)));
225     a.send(5).unwrap();
226     assert_eq!(rx.next(), Some(Ok(5)));
227     assert_eq!(rx.next(), None);
228 
229     let (tx, rx) = mpsc::channel(1);
230     let (a, b) = oneshot::channel::<u32>();
231     let (c, d) = oneshot::channel::<u32>();
232 
233     tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
234         .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
235         .forget();
236 
237     // We don't even get to see `c` until `a` completes.
238     let mut rx = rx.buffer_unordered(1);
239     sassert_empty(&mut rx);
240     c.send(3).unwrap();
241     sassert_empty(&mut rx);
242     a.send(5).unwrap();
243     let mut rx = block_on_stream(rx);
244     assert_eq!(rx.next(), Some(Ok(5)));
245     assert_eq!(rx.next(), Some(Ok(3)));
246     assert_eq!(rx.next(), None);
247 }
248 
249 #[test]
zip()250 fn zip() {
251     assert_done(|| list().zip(list()).collect(), Ok(vec![(1, 1), (2, 2), (3, 3)]));
252     assert_done(|| list().zip(list().take(2)).collect(), Ok(vec![(1, 1), (2, 2)]));
253     assert_done(|| list().take(2).zip(list()).collect(), Ok(vec![(1, 1), (2, 2)]));
254     assert_done(|| err_list().zip(list()).collect::<Vec<_>>(), Err(3));
255     assert_done(|| list().zip(list().map(|x| x + 1)).collect(), Ok(vec![(1, 2), (2, 3), (3, 4)]));
256 }
257 
258 #[test]
peek()259 fn peek() {
260     struct Peek {
261         inner: Peekable<Box<Stream<Item = i32, Error = u32> + Send>>,
262     }
263 
264     impl Future for Peek {
265         type Item = ();
266         type Error = u32;
267 
268         fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(), u32> {
269             {
270                 let res = ready!(self.inner.peek(cx))?;
271                 assert_eq!(res, Some(&1));
272             }
273             assert_eq!(self.inner.peek(cx).unwrap(), Some(&1).into());
274             assert_eq!(self.inner.poll_next(cx).unwrap(), Some(1).into());
275             Ok(Poll::Ready(()))
276         }
277     }
278 
279     block_on(Peek { inner: list().peekable() }).unwrap()
280 }
281 
282 #[test]
wait()283 fn wait() {
284     assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(), Ok(vec![1, 2, 3]));
285 }
286 
287 #[test]
chunks()288 fn chunks() {
289     assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]]));
290     assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]]));
291     assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]]));
292     let mut list = block_on_stream(err_list().chunks(3));
293     let i = list.next().unwrap().unwrap();
294     assert_eq!(i, vec![1, 2]);
295     let i = list.next().unwrap().unwrap_err();
296     assert_eq!(i, 3);
297 }
298 
299 #[test]
300 #[should_panic]
chunks_panic_on_cap_zero()301 fn chunks_panic_on_cap_zero() {
302     let _ = list().chunks(0);
303 }
304 
305 #[test]
forward()306 fn forward() {
307     let v = Vec::new();
308     let v = block_on(iter_ok::<_, Never>(vec![0, 1]).forward(v)).unwrap().1;
309     assert_eq!(v, vec![0, 1]);
310 
311     let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1;
312     assert_eq!(v, vec![0, 1, 2, 3]);
313 
314     assert_done(
315         move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s),
316         Ok(vec![0, 1, 2, 3, 4, 5]),
317     );
318 }
319 
320 #[test]
concat()321 fn concat() {
322     let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
323     assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
324 
325     let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
326     assert_done(move || b.concat(), Err(()));
327 }
328 
329 #[test]
concat2()330 fn concat2() {
331     let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
332     assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
333 
334     let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
335     assert_done(move || b.concat(), Err(()));
336 
337     let c = empty::<Vec<()>, ()>();
338     assert_done(move || c.concat(), Ok(vec![]))
339 }
340 
341 #[test]
stream_poll_fn()342 fn stream_poll_fn() {
343     let mut counter = 5usize;
344 
345     let read_stream = poll_fn(move |_| -> Poll<Option<usize>, std::io::Error> {
346         if counter == 0 {
347             return Ok(Poll::Ready(None));
348         }
349         counter -= 1;
350         Ok(Poll::Ready(Some(counter)))
351     });
352 
353     assert_eq!(block_on_stream(read_stream).count(), 5);
354 }
355 
356 #[test]
inspect()357 fn inspect() {
358     let mut seen = vec![];
359     assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3]));
360     assert_eq!(seen, [1, 2, 3]);
361 }
362 
363 #[test]
inspect_err()364 fn inspect_err() {
365     let mut seen = vec![];
366     assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect::<Vec<_>>(), Err(3));
367     assert_eq!(seen, [3]);
368 }
369