1 use core::cmp; 2 use core::pin::Pin; 3 use futures_core::ready; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 #[cfg(feature = "sink")] 7 use futures_sink::Sink; 8 use pin_project_lite::pin_project; 9 10 pin_project! { 11 /// Stream for the [`take`](super::StreamExt::take) method. 12 #[derive(Debug)] 13 #[must_use = "streams do nothing unless polled"] 14 pub struct Take<St> { 15 #[pin] 16 stream: St, 17 remaining: usize, 18 } 19 } 20 21 impl<St: Stream> Take<St> { new(stream: St, n: usize) -> Self22 pub(super) fn new(stream: St, n: usize) -> Self { 23 Self { stream, remaining: n } 24 } 25 26 delegate_access_inner!(stream, St, ()); 27 } 28 29 impl<St> Stream for Take<St> 30 where 31 St: Stream, 32 { 33 type Item = St::Item; 34 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>>35 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { 36 if self.remaining == 0 { 37 Poll::Ready(None) 38 } else { 39 let this = self.project(); 40 let next = ready!(this.stream.poll_next(cx)); 41 if next.is_some() { 42 *this.remaining -= 1; 43 } else { 44 *this.remaining = 0; 45 } 46 Poll::Ready(next) 47 } 48 } 49 size_hint(&self) -> (usize, Option<usize>)50 fn size_hint(&self) -> (usize, Option<usize>) { 51 if self.remaining == 0 { 52 return (0, Some(0)); 53 } 54 55 let (lower, upper) = self.stream.size_hint(); 56 57 let lower = cmp::min(lower, self.remaining); 58 59 let upper = match upper { 60 Some(x) if x < self.remaining => Some(x), 61 _ => Some(self.remaining), 62 }; 63 64 (lower, upper) 65 } 66 } 67 68 impl<St> FusedStream for Take<St> 69 where 70 St: FusedStream, 71 { is_terminated(&self) -> bool72 fn is_terminated(&self) -> bool { 73 self.remaining == 0 || self.stream.is_terminated() 74 } 75 } 76 77 // Forwarding impl of Sink from the underlying stream 78 #[cfg(feature = "sink")] 79 impl<S, Item> Sink<Item> for Take<S> 80 where 81 S: Stream + Sink<Item>, 82 { 83 type Error = S::Error; 84 85 delegate_sink!(stream, Item); 86 } 87