1 use crate::stream::Fuse; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future}; 4 use futures_core::ready; 5 use futures_core::stream::Stream; 6 use futures_core::task::{Context, Poll}; 7 use futures_sink::Sink; 8 use pin_project_lite::pin_project; 9 10 pin_project! { 11 /// Future for the [`forward`](super::StreamExt::forward) method. 12 #[project = ForwardProj] 13 #[derive(Debug)] 14 #[must_use = "futures do nothing unless you `.await` or poll them"] 15 pub struct Forward<St, Si, Item> { 16 #[pin] 17 sink: Option<Si>, 18 #[pin] 19 stream: Fuse<St>, 20 buffered_item: Option<Item>, 21 } 22 } 23 24 impl<St, Si, Item> Forward<St, Si, Item> { new(stream: St, sink: Si) -> Self25 pub(crate) fn new(stream: St, sink: Si) -> Self { 26 Self { sink: Some(sink), stream: Fuse::new(stream), buffered_item: None } 27 } 28 } 29 30 impl<St, Si, Item, E> FusedFuture for Forward<St, Si, Item> 31 where 32 Si: Sink<Item, Error = E>, 33 St: Stream<Item = Result<Item, E>>, 34 { is_terminated(&self) -> bool35 fn is_terminated(&self) -> bool { 36 self.sink.is_none() 37 } 38 } 39 40 impl<St, Si, Item, E> Future for Forward<St, Si, Item> 41 where 42 Si: Sink<Item, Error = E>, 43 St: Stream<Item = Result<Item, E>>, 44 { 45 type Output = Result<(), E>; 46 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>47 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 48 let ForwardProj { mut sink, mut stream, buffered_item } = self.project(); 49 let mut si = sink.as_mut().as_pin_mut().expect("polled `Forward` after completion"); 50 51 loop { 52 // If we've got an item buffered already, we need to write it to the 53 // sink before we can do anything else 54 if buffered_item.is_some() { 55 ready!(si.as_mut().poll_ready(cx))?; 56 si.as_mut().start_send(buffered_item.take().unwrap())?; 57 } 58 59 match stream.as_mut().poll_next(cx)? { 60 Poll::Ready(Some(item)) => { 61 *buffered_item = Some(item); 62 } 63 Poll::Ready(None) => { 64 ready!(si.poll_close(cx))?; 65 sink.set(None); 66 return Poll::Ready(Ok(())); 67 } 68 Poll::Pending => { 69 ready!(si.poll_flush(cx))?; 70 return Poll::Pending; 71 } 72 } 73 } 74 } 75 } 76