1 use crate::stream::{Fuse, StreamExt, TryStreamExt};
2 use core::fmt;
3 use core::pin::Pin;
4 use futures_core::future::Future;
5 use futures_core::ready;
6 use futures_core::stream::{Stream, TryStream};
7 use futures_core::task::{Context, Poll};
8 use futures_sink::Sink;
9 
10 /// Future for the [`send_all`](super::SinkExt::send_all) method.
11 #[allow(explicit_outlives_requirements)] // https://github.com/rust-lang/rust/issues/60993
12 #[must_use = "futures do nothing unless you `.await` or poll them"]
13 pub struct SendAll<'a, Si, St>
14 where
15     Si: ?Sized,
16     St: ?Sized + TryStream,
17 {
18     sink: &'a mut Si,
19     stream: Fuse<&'a mut St>,
20     buffered: Option<St::Ok>,
21 }
22 
23 impl<Si, St> fmt::Debug for SendAll<'_, Si, St>
24 where
25     Si: fmt::Debug + ?Sized,
26     St: fmt::Debug + ?Sized + TryStream,
27     St::Ok: fmt::Debug,
28 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30         f.debug_struct("SendAll")
31             .field("sink", &self.sink)
32             .field("stream", &self.stream)
33             .field("buffered", &self.buffered)
34             .finish()
35     }
36 }
37 
38 // Pinning is never projected to any fields
39 impl<Si, St> Unpin for SendAll<'_, Si, St>
40 where
41     Si: Unpin + ?Sized,
42     St: TryStream + Unpin + ?Sized,
43 {
44 }
45 
46 impl<'a, Si, St, Ok, Error> SendAll<'a, Si, St>
47 where
48     Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
49     St: TryStream<Ok = Ok, Error = Error> + Stream + Unpin + ?Sized,
50 {
new(sink: &'a mut Si, stream: &'a mut St) -> Self51     pub(super) fn new(sink: &'a mut Si, stream: &'a mut St) -> Self {
52         Self { sink, stream: stream.fuse(), buffered: None }
53     }
54 
try_start_send( &mut self, cx: &mut Context<'_>, item: St::Ok, ) -> Poll<Result<(), Si::Error>>55     fn try_start_send(
56         &mut self,
57         cx: &mut Context<'_>,
58         item: St::Ok,
59     ) -> Poll<Result<(), Si::Error>> {
60         debug_assert!(self.buffered.is_none());
61         match Pin::new(&mut self.sink).poll_ready(cx)? {
62             Poll::Ready(()) => Poll::Ready(Pin::new(&mut self.sink).start_send(item)),
63             Poll::Pending => {
64                 self.buffered = Some(item);
65                 Poll::Pending
66             }
67         }
68     }
69 }
70 
71 impl<Si, St, Ok, Error> Future for SendAll<'_, Si, St>
72 where
73     Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
74     St: Stream<Item = Result<Ok, Error>> + Unpin + ?Sized,
75 {
76     type Output = Result<(), Error>;
77 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>78     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79         let this = &mut *self;
80         // If we've got an item buffered already, we need to write it to the
81         // sink before we can do anything else
82         if let Some(item) = this.buffered.take() {
83             ready!(this.try_start_send(cx, item))?
84         }
85 
86         loop {
87             match this.stream.try_poll_next_unpin(cx)? {
88                 Poll::Ready(Some(item)) => ready!(this.try_start_send(cx, item))?,
89                 Poll::Ready(None) => {
90                     ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
91                     return Poll::Ready(Ok(()));
92                 }
93                 Poll::Pending => {
94                     ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
95                     return Poll::Pending;
96                 }
97             }
98         }
99     }
100 }
101