1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::future::Future;
4 use futures_core::ready;
5 use futures_core::stream::{FusedStream, Stream};
6 use futures_core::task::{Context, Poll};
7 #[cfg(feature = "sink")]
8 use futures_sink::Sink;
9 use pin_project_lite::pin_project;
10 
11 // FIXME: docs, tests
12 
13 pin_project! {
14     /// Stream for the [`take_until`](super::StreamExt::take_until) method.
15     #[must_use = "streams do nothing unless polled"]
16     pub struct TakeUntil<St: Stream, Fut: Future> {
17         #[pin]
18         stream: St,
19         // Contains the inner Future on start and None once the inner Future is resolved
20         // or taken out by the user.
21         #[pin]
22         fut: Option<Fut>,
23         // Contains fut's return value once fut is resolved
24         fut_result: Option<Fut::Output>,
25         // Whether the future was taken out by the user.
26         free: bool,
27     }
28 }
29 
30 impl<St, Fut> fmt::Debug for TakeUntil<St, Fut>
31 where
32     St: Stream + fmt::Debug,
33     St::Item: fmt::Debug,
34     Fut: Future + fmt::Debug,
35 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result36     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37         f.debug_struct("TakeUntil").field("stream", &self.stream).field("fut", &self.fut).finish()
38     }
39 }
40 
41 impl<St, Fut> TakeUntil<St, Fut>
42 where
43     St: Stream,
44     Fut: Future,
45 {
new(stream: St, fut: Fut) -> Self46     pub(super) fn new(stream: St, fut: Fut) -> Self {
47         Self { stream, fut: Some(fut), fut_result: None, free: false }
48     }
49 
50     delegate_access_inner!(stream, St, ());
51 
52     /// Extract the stopping future out of the combinator.
53     /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
54     /// Taking out the future means the combinator will be yielding
55     /// elements from the wrapped stream without ever stopping it.
take_future(&mut self) -> Option<Fut>56     pub fn take_future(&mut self) -> Option<Fut> {
57         if self.fut.is_some() {
58             self.free = true;
59         }
60 
61         self.fut.take()
62     }
63 
64     /// Once the stopping future is resolved, this method can be used
65     /// to extract the value returned by the stopping future.
66     ///
67     /// This may be used to retrieve arbitrary data from the stopping
68     /// future, for example a reason why the stream was stopped.
69     ///
70     /// This method will return `None` if the future isn't resolved yet,
71     /// or if the result was already taken out.
72     ///
73     /// # Examples
74     ///
75     /// ```
76     /// # futures::executor::block_on(async {
77     /// use futures::future;
78     /// use futures::stream::{self, StreamExt};
79     /// use futures::task::Poll;
80     ///
81     /// let stream = stream::iter(1..=10);
82     ///
83     /// let mut i = 0;
84     /// let stop_fut = future::poll_fn(|_cx| {
85     ///     i += 1;
86     ///     if i <= 5 {
87     ///         Poll::Pending
88     ///     } else {
89     ///         Poll::Ready("reason")
90     ///     }
91     /// });
92     ///
93     /// let mut stream = stream.take_until(stop_fut);
94     /// let _ = stream.by_ref().collect::<Vec<_>>().await;
95     ///
96     /// let result = stream.take_result().unwrap();
97     /// assert_eq!(result, "reason");
98     /// # });
99     /// ```
take_result(&mut self) -> Option<Fut::Output>100     pub fn take_result(&mut self) -> Option<Fut::Output> {
101         self.fut_result.take()
102     }
103 
104     /// Whether the stream was stopped yet by the stopping future
105     /// being resolved.
is_stopped(&self) -> bool106     pub fn is_stopped(&self) -> bool {
107         !self.free && self.fut.is_none()
108     }
109 }
110 
111 impl<St, Fut> Stream for TakeUntil<St, Fut>
112 where
113     St: Stream,
114     Fut: Future,
115 {
116     type Item = St::Item;
117 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>>118     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
119         let mut this = self.project();
120 
121         if let Some(f) = this.fut.as_mut().as_pin_mut() {
122             if let Poll::Ready(result) = f.poll(cx) {
123                 this.fut.set(None);
124                 *this.fut_result = Some(result);
125             }
126         }
127 
128         if !*this.free && this.fut.is_none() {
129             // Future resolved, inner stream stopped
130             Poll::Ready(None)
131         } else {
132             // Future either not resolved yet or taken out by the user
133             let item = ready!(this.stream.poll_next(cx));
134             if item.is_none() {
135                 this.fut.set(None);
136             }
137             Poll::Ready(item)
138         }
139     }
140 
size_hint(&self) -> (usize, Option<usize>)141     fn size_hint(&self) -> (usize, Option<usize>) {
142         if self.is_stopped() {
143             return (0, Some(0));
144         }
145 
146         self.stream.size_hint()
147     }
148 }
149 
150 impl<St, Fut> FusedStream for TakeUntil<St, Fut>
151 where
152     St: Stream,
153     Fut: Future,
154 {
is_terminated(&self) -> bool155     fn is_terminated(&self) -> bool {
156         self.is_stopped()
157     }
158 }
159 
160 // Forwarding impl of Sink from the underlying stream
161 #[cfg(feature = "sink")]
162 impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut>
163 where
164     S: Stream + Sink<Item>,
165     Fut: Future,
166 {
167     type Error = S::Error;
168 
169     delegate_sink!(stream, Item);
170 }
171