1 #![feature(test)] 2 3 extern crate test; 4 use crate::test::Bencher; 5 6 use futures::channel::oneshot; 7 use futures::executor::block_on; 8 use futures::future; 9 use futures::stream::{self, StreamExt}; 10 use futures::task::Poll; 11 use futures_util::FutureExt; 12 use std::collections::VecDeque; 13 use std::thread; 14 15 #[bench] oneshot_streams(b: &mut Bencher)16fn oneshot_streams(b: &mut Bencher) { 17 const STREAM_COUNT: usize = 10_000; 18 const STREAM_ITEM_COUNT: usize = 1; 19 20 b.iter(|| { 21 let mut txs = VecDeque::with_capacity(STREAM_COUNT); 22 let mut rxs = Vec::new(); 23 24 for _ in 0..STREAM_COUNT { 25 let (tx, rx) = oneshot::channel(); 26 txs.push_back(tx); 27 rxs.push(rx); 28 } 29 30 thread::spawn(move || { 31 let mut last = 1; 32 while let Some(tx) = txs.pop_front() { 33 let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT)); 34 last += STREAM_ITEM_COUNT; 35 } 36 }); 37 38 let mut flatten = stream::iter(rxs) 39 .map(|recv| recv.into_stream().map(|val| val.unwrap()).flatten()) 40 .flatten_unordered(None); 41 42 block_on(future::poll_fn(move |cx| { 43 let mut count = 0; 44 loop { 45 match flatten.poll_next_unpin(cx) { 46 Poll::Ready(None) => break, 47 Poll::Ready(Some(_)) => { 48 count += 1; 49 } 50 _ => {} 51 } 52 } 53 assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT); 54 55 Poll::Ready(()) 56 })) 57 }); 58 } 59