1 #![warn(rust_2018_idioms)]
2 #![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
3 
4 use tokio::time;
5 use tokio_stream::{self as stream, StreamExt};
6 use tokio_test::assert_pending;
7 use tokio_test::task;
8 
9 use futures::FutureExt;
10 use std::time::Duration;
11 
12 #[tokio::test(start_paused = true)]
usage()13 async fn usage() {
14     let iter = vec![1, 2, 3].into_iter();
15     let stream0 = stream::iter(iter);
16 
17     let iter = vec![4].into_iter();
18     let stream1 =
19         stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
20 
21     let chunk_stream = stream0
22         .chain(stream1)
23         .chunks_timeout(4, Duration::from_secs(2));
24 
25     let mut chunk_stream = task::spawn(chunk_stream);
26 
27     assert_pending!(chunk_stream.poll_next());
28     time::advance(Duration::from_secs(2)).await;
29     assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
30 
31     assert_pending!(chunk_stream.poll_next());
32     time::advance(Duration::from_secs(2)).await;
33     assert_eq!(chunk_stream.next().await, Some(vec![4]));
34 }
35 
36 #[tokio::test(start_paused = true)]
full_chunk_with_timeout()37 async fn full_chunk_with_timeout() {
38     let iter = vec![1, 2].into_iter();
39     let stream0 = stream::iter(iter);
40 
41     let iter = vec![3].into_iter();
42     let stream1 =
43         stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));
44 
45     let iter = vec![4].into_iter();
46     let stream2 =
47         stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
48 
49     let chunk_stream = stream0
50         .chain(stream1)
51         .chain(stream2)
52         .chunks_timeout(3, Duration::from_secs(2));
53 
54     let mut chunk_stream = task::spawn(chunk_stream);
55 
56     assert_pending!(chunk_stream.poll_next());
57     time::advance(Duration::from_secs(2)).await;
58     assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
59 
60     assert_pending!(chunk_stream.poll_next());
61     time::advance(Duration::from_secs(2)).await;
62     assert_eq!(chunk_stream.next().await, Some(vec![4]));
63 }
64 
65 #[tokio::test]
66 #[ignore]
real_time()67 async fn real_time() {
68     let iter = vec![1, 2, 3, 4].into_iter();
69     let stream0 = stream::iter(iter);
70 
71     let iter = vec![5].into_iter();
72     let stream1 =
73         stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
74 
75     let chunk_stream = stream0
76         .chain(stream1)
77         .chunks_timeout(3, Duration::from_secs(2));
78 
79     let mut chunk_stream = task::spawn(chunk_stream);
80 
81     assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
82     assert_eq!(chunk_stream.next().await, Some(vec![4]));
83     assert_eq!(chunk_stream.next().await, Some(vec![5]));
84 }
85