1 #![feature(test)]
2 
3 extern crate test;
4 use crate::test::Bencher;
5 
6 use futures::channel::oneshot;
7 use futures::executor::block_on;
8 use futures::future;
9 use futures::stream::{self, StreamExt};
10 use futures::task::Poll;
11 use futures_util::FutureExt;
12 use std::collections::VecDeque;
13 use std::thread;
14 
15 #[bench]
oneshot_streams(b: &mut Bencher)16 fn oneshot_streams(b: &mut Bencher) {
17     const STREAM_COUNT: usize = 10_000;
18     const STREAM_ITEM_COUNT: usize = 1;
19 
20     b.iter(|| {
21         let mut txs = VecDeque::with_capacity(STREAM_COUNT);
22         let mut rxs = Vec::new();
23 
24         for _ in 0..STREAM_COUNT {
25             let (tx, rx) = oneshot::channel();
26             txs.push_back(tx);
27             rxs.push(rx);
28         }
29 
30         thread::spawn(move || {
31             let mut last = 1;
32             while let Some(tx) = txs.pop_front() {
33                 let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
34                 last += STREAM_ITEM_COUNT;
35             }
36         });
37 
38         let mut flatten = stream::iter(rxs)
39             .map(|recv| recv.into_stream().map(|val| val.unwrap()).flatten())
40             .flatten_unordered(None);
41 
42         block_on(future::poll_fn(move |cx| {
43             let mut count = 0;
44             loop {
45                 match flatten.poll_next_unpin(cx) {
46                     Poll::Ready(None) => break,
47                     Poll::Ready(Some(_)) => {
48                         count += 1;
49                     }
50                     _ => {}
51                 }
52             }
53             assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);
54 
55             Poll::Ready(())
56         }))
57     });
58 }
59