1 use core::pin::Pin; 2 use futures_core::future::{FusedFuture, Future}; 3 use futures_core::ready; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 #[cfg(feature = "sink")] 7 use futures_sink::Sink; 8 use pin_project_lite::pin_project; 9 10 pin_project! { 11 #[project = FlattenProj] 12 #[derive(Debug)] 13 pub enum Flatten<Fut1, Fut2> { 14 First { #[pin] f: Fut1 }, 15 Second { #[pin] f: Fut2 }, 16 Empty, 17 } 18 } 19 20 impl<Fut1, Fut2> Flatten<Fut1, Fut2> { new(future: Fut1) -> Self21 pub(crate) fn new(future: Fut1) -> Self { 22 Self::First { f: future } 23 } 24 } 25 26 impl<Fut> FusedFuture for Flatten<Fut, Fut::Output> 27 where 28 Fut: Future, 29 Fut::Output: Future, 30 { is_terminated(&self) -> bool31 fn is_terminated(&self) -> bool { 32 match self { 33 Self::Empty => true, 34 _ => false, 35 } 36 } 37 } 38 39 impl<Fut> Future for Flatten<Fut, Fut::Output> 40 where 41 Fut: Future, 42 Fut::Output: Future, 43 { 44 type Output = <Fut::Output as Future>::Output; 45 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>46 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 47 Poll::Ready(loop { 48 match self.as_mut().project() { 49 FlattenProj::First { f } => { 50 let f = ready!(f.poll(cx)); 51 self.set(Self::Second { f }); 52 } 53 FlattenProj::Second { f } => { 54 let output = ready!(f.poll(cx)); 55 self.set(Self::Empty); 56 break output; 57 } 58 FlattenProj::Empty => panic!("Flatten polled after completion"), 59 } 60 }) 61 } 62 } 63 64 impl<Fut> FusedStream for Flatten<Fut, Fut::Output> 65 where 66 Fut: Future, 67 Fut::Output: Stream, 68 { is_terminated(&self) -> bool69 fn is_terminated(&self) -> bool { 70 match self { 71 Self::Empty => true, 72 _ => false, 73 } 74 } 75 } 76 77 impl<Fut> Stream for Flatten<Fut, Fut::Output> 78 where 79 Fut: Future, 80 Fut::Output: Stream, 81 { 82 type Item = <Fut::Output as Stream>::Item; 83 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>84 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 85 Poll::Ready(loop { 86 match self.as_mut().project() { 87 FlattenProj::First { f } => { 88 let f = ready!(f.poll(cx)); 89 self.set(Self::Second { f }); 90 } 91 FlattenProj::Second { f } => { 92 let output = ready!(f.poll_next(cx)); 93 if output.is_none() { 94 self.set(Self::Empty); 95 } 96 break output; 97 } 98 FlattenProj::Empty => break None, 99 } 100 }) 101 } 102 } 103 104 #[cfg(feature = "sink")] 105 impl<Fut, Item> Sink<Item> for Flatten<Fut, Fut::Output> 106 where 107 Fut: Future, 108 Fut::Output: Sink<Item>, 109 { 110 type Error = <Fut::Output as Sink<Item>>::Error; 111 poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>112 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 113 Poll::Ready(loop { 114 match self.as_mut().project() { 115 FlattenProj::First { f } => { 116 let f = ready!(f.poll(cx)); 117 self.set(Self::Second { f }); 118 } 119 FlattenProj::Second { f } => { 120 break ready!(f.poll_ready(cx)); 121 } 122 FlattenProj::Empty => panic!("poll_ready called after eof"), 123 } 124 }) 125 } 126 start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>127 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { 128 match self.project() { 129 FlattenProj::First { .. } => panic!("poll_ready not called first"), 130 FlattenProj::Second { f } => f.start_send(item), 131 FlattenProj::Empty => panic!("start_send called after eof"), 132 } 133 } 134 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>135 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 136 match self.project() { 137 FlattenProj::First { .. } => Poll::Ready(Ok(())), 138 FlattenProj::Second { f } => f.poll_flush(cx), 139 FlattenProj::Empty => panic!("poll_flush called after eof"), 140 } 141 } 142 poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>143 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 144 let res = match self.as_mut().project() { 145 FlattenProj::Second { f } => f.poll_close(cx), 146 _ => Poll::Ready(Ok(())), 147 }; 148 if res.is_ready() { 149 self.set(Self::Empty); 150 } 151 res 152 } 153 } 154