1 use crate::stream::StreamExt; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future}; 4 use futures_core::ready; 5 use futures_core::stream::Stream; 6 use futures_core::task::{Context, Poll}; 7 8 /// Future for the [`into_future`](super::StreamExt::into_future) method. 9 #[derive(Debug)] 10 #[must_use = "futures do nothing unless you `.await` or poll them"] 11 pub struct StreamFuture<St> { 12 stream: Option<St>, 13 } 14 15 impl<St: Stream + Unpin> StreamFuture<St> { new(stream: St) -> Self16 pub(super) fn new(stream: St) -> Self { 17 Self { stream: Some(stream) } 18 } 19 20 /// Acquires a reference to the underlying stream that this combinator is 21 /// pulling from. 22 /// 23 /// This method returns an `Option` to account for the fact that `StreamFuture`'s 24 /// implementation of `Future::poll` consumes the underlying stream during polling 25 /// in order to return it to the caller of `Future::poll` if the stream yielded 26 /// an element. get_ref(&self) -> Option<&St>27 pub fn get_ref(&self) -> Option<&St> { 28 self.stream.as_ref() 29 } 30 31 /// Acquires a mutable reference to the underlying stream that this 32 /// combinator is pulling from. 33 /// 34 /// Note that care must be taken to avoid tampering with the state of the 35 /// stream which may otherwise confuse this combinator. 36 /// 37 /// This method returns an `Option` to account for the fact that `StreamFuture`'s 38 /// implementation of `Future::poll` consumes the underlying stream during polling 39 /// in order to return it to the caller of `Future::poll` if the stream yielded 40 /// an element. get_mut(&mut self) -> Option<&mut St>41 pub fn get_mut(&mut self) -> Option<&mut St> { 42 self.stream.as_mut() 43 } 44 45 /// Acquires a pinned mutable reference to the underlying stream that this 46 /// combinator is pulling from. 47 /// 48 /// Note that care must be taken to avoid tampering with the state of the 49 /// stream which may otherwise confuse this combinator. 50 /// 51 /// This method returns an `Option` to account for the fact that `StreamFuture`'s 52 /// implementation of `Future::poll` consumes the underlying stream during polling 53 /// in order to return it to the caller of `Future::poll` if the stream yielded 54 /// an element. get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>>55 pub fn get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>> { 56 self.get_mut().stream.as_mut().map(Pin::new) 57 } 58 59 /// Consumes this combinator, returning the underlying stream. 60 /// 61 /// Note that this may discard intermediate state of this combinator, so 62 /// care should be taken to avoid losing resources when this is called. 63 /// 64 /// This method returns an `Option` to account for the fact that `StreamFuture`'s 65 /// implementation of `Future::poll` consumes the underlying stream during polling 66 /// in order to return it to the caller of `Future::poll` if the stream yielded 67 /// an element. into_inner(self) -> Option<St>68 pub fn into_inner(self) -> Option<St> { 69 self.stream 70 } 71 } 72 73 impl<St: Stream + Unpin> FusedFuture for StreamFuture<St> { is_terminated(&self) -> bool74 fn is_terminated(&self) -> bool { 75 self.stream.is_none() 76 } 77 } 78 79 impl<St: Stream + Unpin> Future for StreamFuture<St> { 80 type Output = (Option<St::Item>, St); 81 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>82 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 83 let item = { 84 let s = self.stream.as_mut().expect("polling StreamFuture twice"); 85 ready!(s.poll_next_unpin(cx)) 86 }; 87 let stream = self.stream.take().unwrap(); 88 Poll::Ready((item, stream)) 89 } 90 } 91