1 use core::pin::Pin;
2 use std::convert::Infallible;
3 
4 use futures::{
5     stream::{self, repeat, Repeat, StreamExt, TryStreamExt},
6     task::Poll,
7     Stream,
8 };
9 use futures_executor::block_on;
10 use futures_task::Context;
11 use futures_test::task::noop_context;
12 
13 #[test]
try_filter_map_after_err()14 fn try_filter_map_after_err() {
15     let cx = &mut noop_context();
16     let mut s = stream::iter(1..=3)
17         .map(Ok)
18         .try_filter_map(|v| async move { Err::<Option<()>, _>(v) })
19         .filter_map(|r| async move { r.ok() })
20         .boxed();
21     assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
22 }
23 
24 #[test]
try_skip_while_after_err()25 fn try_skip_while_after_err() {
26     let cx = &mut noop_context();
27     let mut s = stream::iter(1..=3)
28         .map(Ok)
29         .try_skip_while(|_| async move { Err::<_, ()>(()) })
30         .filter_map(|r| async move { r.ok() })
31         .boxed();
32     assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
33 }
34 
35 #[test]
try_take_while_after_err()36 fn try_take_while_after_err() {
37     let cx = &mut noop_context();
38     let mut s = stream::iter(1..=3)
39         .map(Ok)
40         .try_take_while(|_| async move { Err::<_, ()>(()) })
41         .filter_map(|r| async move { r.ok() })
42         .boxed();
43     assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
44 }
45 
46 #[test]
try_flatten_unordered()47 fn try_flatten_unordered() {
48     let test_st = stream::iter(1..7)
49         .map(|val: u32| {
50             if val % 2 == 0 {
51                 Ok(stream::unfold((val, 1), |(val, pow)| async move {
52                     Some((val.pow(pow), (val, pow + 1)))
53                 })
54                 .take(3)
55                 .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) }))
56             } else {
57                 Err(val)
58             }
59         })
60         .map_ok(Box::pin)
61         .try_flatten_unordered(None);
62 
63     block_on(async move {
64         assert_eq!(
65             // All numbers can be divided by 16 and odds must be `Err`
66             // For all basic evens we must have powers from 1 to 3
67             vec![
68                 Err(1),
69                 Err(3),
70                 Err(5),
71                 Ok(2),
72                 Ok(4),
73                 Ok(6),
74                 Ok(4),
75                 Err(16),
76                 Ok(36),
77                 Ok(8),
78                 Err(64),
79                 Ok(216)
80             ],
81             test_st.collect::<Vec<_>>().await
82         )
83     });
84 
85     #[derive(Clone, Debug)]
86     struct ErrorStream {
87         error_after: usize,
88         polled: usize,
89     }
90 
91     impl Stream for ErrorStream {
92         type Item = Result<Repeat<Result<(), ()>>, ()>;
93 
94         fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95             if self.polled > self.error_after {
96                 panic!("Polled after error");
97             } else {
98                 let out =
99                     if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) };
100                 self.polled += 1;
101                 Poll::Ready(Some(out))
102             }
103         }
104     }
105 
106     block_on(async move {
107         let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None);
108         let mut ctr = 0;
109         while (st.try_next().await).is_ok() {
110             ctr += 1;
111         }
112         assert_eq!(ctr, 0);
113 
114         assert_eq!(
115             ErrorStream { error_after: 10, polled: 0 }
116                 .try_flatten_unordered(None)
117                 .inspect_ok(|_| panic!("Unexpected `Ok`"))
118                 .try_collect::<Vec<_>>()
119                 .await,
120             Err(())
121         );
122 
123         let mut taken = 0;
124         assert_eq!(
125             ErrorStream { error_after: 10, polled: 0 }
126                 .map_ok(|st| st.take(3))
127                 .try_flatten_unordered(1)
128                 .inspect(|_| taken += 1)
129                 .try_fold((), |(), res| async move { Ok(res) })
130                 .await,
131             Err(())
132         );
133         assert_eq!(taken, 31);
134     })
135 }
136 
is_even(number: u8) -> bool137 async fn is_even(number: u8) -> bool {
138     number % 2 == 0
139 }
140 
141 #[test]
try_all()142 fn try_all() {
143     block_on(async {
144         let empty: [Result<u8, Infallible>; 0] = [];
145         let st = stream::iter(empty);
146         let all = st.try_all(is_even).await;
147         assert_eq!(Ok(true), all);
148 
149         let st = stream::iter([Ok::<_, Infallible>(2), Ok(4), Ok(6), Ok(8)]);
150         let all = st.try_all(is_even).await;
151         assert_eq!(Ok(true), all);
152 
153         let st = stream::iter([Ok::<_, Infallible>(2), Ok(3), Ok(4)]);
154         let all = st.try_all(is_even).await;
155         assert_eq!(Ok(false), all);
156 
157         let st = stream::iter([Ok(2), Ok(4), Err("err"), Ok(8)]);
158         let all = st.try_all(is_even).await;
159         assert_eq!(Err("err"), all);
160     });
161 }
162 
163 #[test]
try_any()164 fn try_any() {
165     block_on(async {
166         let empty: [Result<u8, Infallible>; 0] = [];
167         let st = stream::iter(empty);
168         let any = st.try_any(is_even).await;
169         assert_eq!(Ok(false), any);
170 
171         let st = stream::iter([Ok::<_, Infallible>(1), Ok(2), Ok(3)]);
172         let any = st.try_any(is_even).await;
173         assert_eq!(Ok(true), any);
174 
175         let st = stream::iter([Ok::<_, Infallible>(1), Ok(3), Ok(5)]);
176         let any = st.try_any(is_even).await;
177         assert_eq!(Ok(false), any);
178 
179         let st = stream::iter([Ok(1), Ok(3), Err("err"), Ok(8)]);
180         let any = st.try_any(is_even).await;
181         assert_eq!(Err("err"), any);
182     });
183 }
184