1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::Future; 4 use futures_core::ready; 5 use futures_core::stream::{FusedStream, Stream}; 6 use futures_core::task::{Context, Poll}; 7 #[cfg(feature = "sink")] 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 11 // FIXME: docs, tests 12 13 pin_project! { 14 /// Stream for the [`take_until`](super::StreamExt::take_until) method. 15 #[must_use = "streams do nothing unless polled"] 16 pub struct TakeUntil<St: Stream, Fut: Future> { 17 #[pin] 18 stream: St, 19 // Contains the inner Future on start and None once the inner Future is resolved 20 // or taken out by the user. 21 #[pin] 22 fut: Option<Fut>, 23 // Contains fut's return value once fut is resolved 24 fut_result: Option<Fut::Output>, 25 // Whether the future was taken out by the user. 26 free: bool, 27 } 28 } 29 30 impl<St, Fut> fmt::Debug for TakeUntil<St, Fut> 31 where 32 St: Stream + fmt::Debug, 33 St::Item: fmt::Debug, 34 Fut: Future + fmt::Debug, 35 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 37 f.debug_struct("TakeUntil").field("stream", &self.stream).field("fut", &self.fut).finish() 38 } 39 } 40 41 impl<St, Fut> TakeUntil<St, Fut> 42 where 43 St: Stream, 44 Fut: Future, 45 { new(stream: St, fut: Fut) -> Self46 pub(super) fn new(stream: St, fut: Fut) -> Self { 47 Self { stream, fut: Some(fut), fut_result: None, free: false } 48 } 49 50 delegate_access_inner!(stream, St, ()); 51 52 /// Extract the stopping future out of the combinator. 53 /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet. 54 /// Taking out the future means the combinator will be yielding 55 /// elements from the wrapped stream without ever stopping it. take_future(&mut self) -> Option<Fut>56 pub fn take_future(&mut self) -> Option<Fut> { 57 if self.fut.is_some() { 58 self.free = true; 59 } 60 61 self.fut.take() 62 } 63 64 /// Once the stopping future is resolved, this method can be used 65 /// to extract the value returned by the stopping future. 66 /// 67 /// This may be used to retrieve arbitrary data from the stopping 68 /// future, for example a reason why the stream was stopped. 69 /// 70 /// This method will return `None` if the future isn't resolved yet, 71 /// or if the result was already taken out. 72 /// 73 /// # Examples 74 /// 75 /// ``` 76 /// # futures::executor::block_on(async { 77 /// use futures::future; 78 /// use futures::stream::{self, StreamExt}; 79 /// use futures::task::Poll; 80 /// 81 /// let stream = stream::iter(1..=10); 82 /// 83 /// let mut i = 0; 84 /// let stop_fut = future::poll_fn(|_cx| { 85 /// i += 1; 86 /// if i <= 5 { 87 /// Poll::Pending 88 /// } else { 89 /// Poll::Ready("reason") 90 /// } 91 /// }); 92 /// 93 /// let mut stream = stream.take_until(stop_fut); 94 /// let _ = stream.by_ref().collect::<Vec<_>>().await; 95 /// 96 /// let result = stream.take_result().unwrap(); 97 /// assert_eq!(result, "reason"); 98 /// # }); 99 /// ``` take_result(&mut self) -> Option<Fut::Output>100 pub fn take_result(&mut self) -> Option<Fut::Output> { 101 self.fut_result.take() 102 } 103 104 /// Whether the stream was stopped yet by the stopping future 105 /// being resolved. is_stopped(&self) -> bool106 pub fn is_stopped(&self) -> bool { 107 !self.free && self.fut.is_none() 108 } 109 } 110 111 impl<St, Fut> Stream for TakeUntil<St, Fut> 112 where 113 St: Stream, 114 Fut: Future, 115 { 116 type Item = St::Item; 117 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>>118 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { 119 let mut this = self.project(); 120 121 if let Some(f) = this.fut.as_mut().as_pin_mut() { 122 if let Poll::Ready(result) = f.poll(cx) { 123 this.fut.set(None); 124 *this.fut_result = Some(result); 125 } 126 } 127 128 if !*this.free && this.fut.is_none() { 129 // Future resolved, inner stream stopped 130 Poll::Ready(None) 131 } else { 132 // Future either not resolved yet or taken out by the user 133 let item = ready!(this.stream.poll_next(cx)); 134 if item.is_none() { 135 this.fut.set(None); 136 } 137 Poll::Ready(item) 138 } 139 } 140 size_hint(&self) -> (usize, Option<usize>)141 fn size_hint(&self) -> (usize, Option<usize>) { 142 if self.is_stopped() { 143 return (0, Some(0)); 144 } 145 146 self.stream.size_hint() 147 } 148 } 149 150 impl<St, Fut> FusedStream for TakeUntil<St, Fut> 151 where 152 St: Stream, 153 Fut: Future, 154 { is_terminated(&self) -> bool155 fn is_terminated(&self) -> bool { 156 self.is_stopped() 157 } 158 } 159 160 // Forwarding impl of Sink from the underlying stream 161 #[cfg(feature = "sink")] 162 impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut> 163 where 164 S: Stream + Sink<Item>, 165 Fut: Future, 166 { 167 type Error = S::Error; 168 169 delegate_sink!(stream, Item); 170 } 171