1 use core::cmp;
2 use core::pin::Pin;
3 use futures_core::ready;
4 use futures_core::stream::{FusedStream, Stream};
5 use futures_core::task::{Context, Poll};
6 #[cfg(feature = "sink")]
7 use futures_sink::Sink;
8 use pin_project_lite::pin_project;
9 
10 pin_project! {
11     /// Stream for the [`take`](super::StreamExt::take) method.
12     #[derive(Debug)]
13     #[must_use = "streams do nothing unless polled"]
14     pub struct Take<St> {
15         #[pin]
16         stream: St,
17         remaining: usize,
18     }
19 }
20 
21 impl<St: Stream> Take<St> {
new(stream: St, n: usize) -> Self22     pub(super) fn new(stream: St, n: usize) -> Self {
23         Self { stream, remaining: n }
24     }
25 
26     delegate_access_inner!(stream, St, ());
27 }
28 
29 impl<St> Stream for Take<St>
30 where
31     St: Stream,
32 {
33     type Item = St::Item;
34 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>>35     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
36         if self.remaining == 0 {
37             Poll::Ready(None)
38         } else {
39             let this = self.project();
40             let next = ready!(this.stream.poll_next(cx));
41             if next.is_some() {
42                 *this.remaining -= 1;
43             } else {
44                 *this.remaining = 0;
45             }
46             Poll::Ready(next)
47         }
48     }
49 
size_hint(&self) -> (usize, Option<usize>)50     fn size_hint(&self) -> (usize, Option<usize>) {
51         if self.remaining == 0 {
52             return (0, Some(0));
53         }
54 
55         let (lower, upper) = self.stream.size_hint();
56 
57         let lower = cmp::min(lower, self.remaining);
58 
59         let upper = match upper {
60             Some(x) if x < self.remaining => Some(x),
61             _ => Some(self.remaining),
62         };
63 
64         (lower, upper)
65     }
66 }
67 
68 impl<St> FusedStream for Take<St>
69 where
70     St: FusedStream,
71 {
is_terminated(&self) -> bool72     fn is_terminated(&self) -> bool {
73         self.remaining == 0 || self.stream.is_terminated()
74     }
75 }
76 
77 // Forwarding impl of Sink from the underlying stream
78 #[cfg(feature = "sink")]
79 impl<S, Item> Sink<Item> for Take<S>
80 where
81     S: Stream + Sink<Item>,
82 {
83     type Error = S::Error;
84 
85     delegate_sink!(stream, Item);
86 }
87