1 use crate::stream::StreamExt;
2 use core::pin::Pin;
3 use futures_core::future::{FusedFuture, Future};
4 use futures_core::ready;
5 use futures_core::stream::Stream;
6 use futures_core::task::{Context, Poll};
7 
8 /// Future for the [`into_future`](super::StreamExt::into_future) method.
9 #[derive(Debug)]
10 #[must_use = "futures do nothing unless you `.await` or poll them"]
11 pub struct StreamFuture<St> {
12     stream: Option<St>,
13 }
14 
15 impl<St: Stream + Unpin> StreamFuture<St> {
new(stream: St) -> Self16     pub(super) fn new(stream: St) -> Self {
17         Self { stream: Some(stream) }
18     }
19 
20     /// Acquires a reference to the underlying stream that this combinator is
21     /// pulling from.
22     ///
23     /// This method returns an `Option` to account for the fact that `StreamFuture`'s
24     /// implementation of `Future::poll` consumes the underlying stream during polling
25     /// in order to return it to the caller of `Future::poll` if the stream yielded
26     /// an element.
get_ref(&self) -> Option<&St>27     pub fn get_ref(&self) -> Option<&St> {
28         self.stream.as_ref()
29     }
30 
31     /// Acquires a mutable reference to the underlying stream that this
32     /// combinator is pulling from.
33     ///
34     /// Note that care must be taken to avoid tampering with the state of the
35     /// stream which may otherwise confuse this combinator.
36     ///
37     /// This method returns an `Option` to account for the fact that `StreamFuture`'s
38     /// implementation of `Future::poll` consumes the underlying stream during polling
39     /// in order to return it to the caller of `Future::poll` if the stream yielded
40     /// an element.
get_mut(&mut self) -> Option<&mut St>41     pub fn get_mut(&mut self) -> Option<&mut St> {
42         self.stream.as_mut()
43     }
44 
45     /// Acquires a pinned mutable reference to the underlying stream that this
46     /// combinator is pulling from.
47     ///
48     /// Note that care must be taken to avoid tampering with the state of the
49     /// stream which may otherwise confuse this combinator.
50     ///
51     /// This method returns an `Option` to account for the fact that `StreamFuture`'s
52     /// implementation of `Future::poll` consumes the underlying stream during polling
53     /// in order to return it to the caller of `Future::poll` if the stream yielded
54     /// an element.
get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>>55     pub fn get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>> {
56         self.get_mut().stream.as_mut().map(Pin::new)
57     }
58 
59     /// Consumes this combinator, returning the underlying stream.
60     ///
61     /// Note that this may discard intermediate state of this combinator, so
62     /// care should be taken to avoid losing resources when this is called.
63     ///
64     /// This method returns an `Option` to account for the fact that `StreamFuture`'s
65     /// implementation of `Future::poll` consumes the underlying stream during polling
66     /// in order to return it to the caller of `Future::poll` if the stream yielded
67     /// an element.
into_inner(self) -> Option<St>68     pub fn into_inner(self) -> Option<St> {
69         self.stream
70     }
71 }
72 
73 impl<St: Stream + Unpin> FusedFuture for StreamFuture<St> {
is_terminated(&self) -> bool74     fn is_terminated(&self) -> bool {
75         self.stream.is_none()
76     }
77 }
78 
79 impl<St: Stream + Unpin> Future for StreamFuture<St> {
80     type Output = (Option<St::Item>, St);
81 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>82     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83         let item = {
84             let s = self.stream.as_mut().expect("polling StreamFuture twice");
85             ready!(s.poll_next_unpin(cx))
86         };
87         let stream = self.stream.take().unwrap();
88         Poll::Ready((item, stream))
89     }
90 }
91