1 use core::pin::Pin; 2 use futures_core::future::{FusedFuture, Future, TryFuture}; 3 use futures_core::ready; 4 use futures_core::stream::{FusedStream, Stream, TryStream}; 5 use futures_core::task::{Context, Poll}; 6 #[cfg(feature = "sink")] 7 use futures_sink::Sink; 8 use pin_project_lite::pin_project; 9 10 pin_project! { 11 #[project = TryFlattenProj] 12 #[derive(Debug)] 13 pub enum TryFlatten<Fut1, Fut2> { 14 First { #[pin] f: Fut1 }, 15 Second { #[pin] f: Fut2 }, 16 Empty, 17 } 18 } 19 20 impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> { new(future: Fut1) -> Self21 pub(crate) fn new(future: Fut1) -> Self { 22 Self::First { f: future } 23 } 24 } 25 26 impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok> 27 where 28 Fut: TryFuture, 29 Fut::Ok: TryFuture<Error = Fut::Error>, 30 { is_terminated(&self) -> bool31 fn is_terminated(&self) -> bool { 32 match self { 33 Self::Empty => true, 34 _ => false, 35 } 36 } 37 } 38 39 impl<Fut> Future for TryFlatten<Fut, Fut::Ok> 40 where 41 Fut: TryFuture, 42 Fut::Ok: TryFuture<Error = Fut::Error>, 43 { 44 type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>; 45 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>46 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 47 Poll::Ready(loop { 48 match self.as_mut().project() { 49 TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { 50 Ok(f) => self.set(Self::Second { f }), 51 Err(e) => { 52 self.set(Self::Empty); 53 break Err(e); 54 } 55 }, 56 TryFlattenProj::Second { f } => { 57 let output = ready!(f.try_poll(cx)); 58 self.set(Self::Empty); 59 break output; 60 } 61 TryFlattenProj::Empty => panic!("TryFlatten polled after completion"), 62 } 63 }) 64 } 65 } 66 67 impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok> 68 where 69 Fut: TryFuture, 70 Fut::Ok: TryStream<Error = Fut::Error>, 71 { is_terminated(&self) -> bool72 fn is_terminated(&self) -> bool { 73 match self { 74 Self::Empty => true, 75 _ => false, 76 } 77 } 78 } 79 80 impl<Fut> Stream for TryFlatten<Fut, Fut::Ok> 81 where 82 Fut: TryFuture, 83 Fut::Ok: TryStream<Error = Fut::Error>, 84 { 85 type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>; 86 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>87 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 88 Poll::Ready(loop { 89 match self.as_mut().project() { 90 TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { 91 Ok(f) => self.set(Self::Second { f }), 92 Err(e) => { 93 self.set(Self::Empty); 94 break Some(Err(e)); 95 } 96 }, 97 TryFlattenProj::Second { f } => { 98 let output = ready!(f.try_poll_next(cx)); 99 if output.is_none() { 100 self.set(Self::Empty); 101 } 102 break output; 103 } 104 TryFlattenProj::Empty => break None, 105 } 106 }) 107 } 108 } 109 110 #[cfg(feature = "sink")] 111 impl<Fut, Item> Sink<Item> for TryFlatten<Fut, Fut::Ok> 112 where 113 Fut: TryFuture, 114 Fut::Ok: Sink<Item, Error = Fut::Error>, 115 { 116 type Error = Fut::Error; 117 poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>118 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 119 Poll::Ready(loop { 120 match self.as_mut().project() { 121 TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { 122 Ok(f) => self.set(Self::Second { f }), 123 Err(e) => { 124 self.set(Self::Empty); 125 break Err(e); 126 } 127 }, 128 TryFlattenProj::Second { f } => { 129 break ready!(f.poll_ready(cx)); 130 } 131 TryFlattenProj::Empty => panic!("poll_ready called after eof"), 132 } 133 }) 134 } 135 start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>136 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { 137 match self.project() { 138 TryFlattenProj::First { .. } => panic!("poll_ready not called first"), 139 TryFlattenProj::Second { f } => f.start_send(item), 140 TryFlattenProj::Empty => panic!("start_send called after eof"), 141 } 142 } 143 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>144 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 145 match self.project() { 146 TryFlattenProj::First { .. } => Poll::Ready(Ok(())), 147 TryFlattenProj::Second { f } => f.poll_flush(cx), 148 TryFlattenProj::Empty => panic!("poll_flush called after eof"), 149 } 150 } 151 poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>152 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 153 let res = match self.as_mut().project() { 154 TryFlattenProj::Second { f } => f.poll_close(cx), 155 _ => Poll::Ready(Ok(())), 156 }; 157 if res.is_ready() { 158 self.set(Self::Empty); 159 } 160 res 161 } 162 } 163