1 use crate::stream::Fuse;
2 use core::pin::Pin;
3 use futures_core::future::{FusedFuture, Future};
4 use futures_core::ready;
5 use futures_core::stream::Stream;
6 use futures_core::task::{Context, Poll};
7 use futures_sink::Sink;
8 use pin_project_lite::pin_project;
9 
10 pin_project! {
11     /// Future for the [`forward`](super::StreamExt::forward) method.
12     #[project = ForwardProj]
13     #[derive(Debug)]
14     #[must_use = "futures do nothing unless you `.await` or poll them"]
15     pub struct Forward<St, Si, Item> {
16         #[pin]
17         sink: Option<Si>,
18         #[pin]
19         stream: Fuse<St>,
20         buffered_item: Option<Item>,
21     }
22 }
23 
24 impl<St, Si, Item> Forward<St, Si, Item> {
new(stream: St, sink: Si) -> Self25     pub(crate) fn new(stream: St, sink: Si) -> Self {
26         Self { sink: Some(sink), stream: Fuse::new(stream), buffered_item: None }
27     }
28 }
29 
30 impl<St, Si, Item, E> FusedFuture for Forward<St, Si, Item>
31 where
32     Si: Sink<Item, Error = E>,
33     St: Stream<Item = Result<Item, E>>,
34 {
is_terminated(&self) -> bool35     fn is_terminated(&self) -> bool {
36         self.sink.is_none()
37     }
38 }
39 
40 impl<St, Si, Item, E> Future for Forward<St, Si, Item>
41 where
42     Si: Sink<Item, Error = E>,
43     St: Stream<Item = Result<Item, E>>,
44 {
45     type Output = Result<(), E>;
46 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>47     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48         let ForwardProj { mut sink, mut stream, buffered_item } = self.project();
49         let mut si = sink.as_mut().as_pin_mut().expect("polled `Forward` after completion");
50 
51         loop {
52             // If we've got an item buffered already, we need to write it to the
53             // sink before we can do anything else
54             if buffered_item.is_some() {
55                 ready!(si.as_mut().poll_ready(cx))?;
56                 si.as_mut().start_send(buffered_item.take().unwrap())?;
57             }
58 
59             match stream.as_mut().poll_next(cx)? {
60                 Poll::Ready(Some(item)) => {
61                     *buffered_item = Some(item);
62                 }
63                 Poll::Ready(None) => {
64                     ready!(si.poll_close(cx))?;
65                     sink.set(None);
66                     return Poll::Ready(Ok(()));
67                 }
68                 Poll::Pending => {
69                     ready!(si.poll_flush(cx))?;
70                     return Poll::Pending;
71                 }
72             }
73         }
74     }
75 }
76