1 use core::pin::Pin;
2 use futures_core::future::{FusedFuture, Future, TryFuture};
3 use futures_core::ready;
4 use futures_core::stream::{FusedStream, Stream, TryStream};
5 use futures_core::task::{Context, Poll};
6 #[cfg(feature = "sink")]
7 use futures_sink::Sink;
8 use pin_project_lite::pin_project;
9 
10 pin_project! {
11     #[project = TryFlattenProj]
12     #[derive(Debug)]
13     pub enum TryFlatten<Fut1, Fut2> {
14         First { #[pin] f: Fut1 },
15         Second { #[pin] f: Fut2 },
16         Empty,
17     }
18 }
19 
20 impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> {
new(future: Fut1) -> Self21     pub(crate) fn new(future: Fut1) -> Self {
22         Self::First { f: future }
23     }
24 }
25 
26 impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok>
27 where
28     Fut: TryFuture,
29     Fut::Ok: TryFuture<Error = Fut::Error>,
30 {
is_terminated(&self) -> bool31     fn is_terminated(&self) -> bool {
32         match self {
33             Self::Empty => true,
34             _ => false,
35         }
36     }
37 }
38 
39 impl<Fut> Future for TryFlatten<Fut, Fut::Ok>
40 where
41     Fut: TryFuture,
42     Fut::Ok: TryFuture<Error = Fut::Error>,
43 {
44     type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>;
45 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>46     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47         Poll::Ready(loop {
48             match self.as_mut().project() {
49                 TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
50                     Ok(f) => self.set(Self::Second { f }),
51                     Err(e) => {
52                         self.set(Self::Empty);
53                         break Err(e);
54                     }
55                 },
56                 TryFlattenProj::Second { f } => {
57                     let output = ready!(f.try_poll(cx));
58                     self.set(Self::Empty);
59                     break output;
60                 }
61                 TryFlattenProj::Empty => panic!("TryFlatten polled after completion"),
62             }
63         })
64     }
65 }
66 
67 impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok>
68 where
69     Fut: TryFuture,
70     Fut::Ok: TryStream<Error = Fut::Error>,
71 {
is_terminated(&self) -> bool72     fn is_terminated(&self) -> bool {
73         match self {
74             Self::Empty => true,
75             _ => false,
76         }
77     }
78 }
79 
80 impl<Fut> Stream for TryFlatten<Fut, Fut::Ok>
81 where
82     Fut: TryFuture,
83     Fut::Ok: TryStream<Error = Fut::Error>,
84 {
85     type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;
86 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>87     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88         Poll::Ready(loop {
89             match self.as_mut().project() {
90                 TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
91                     Ok(f) => self.set(Self::Second { f }),
92                     Err(e) => {
93                         self.set(Self::Empty);
94                         break Some(Err(e));
95                     }
96                 },
97                 TryFlattenProj::Second { f } => {
98                     let output = ready!(f.try_poll_next(cx));
99                     if output.is_none() {
100                         self.set(Self::Empty);
101                     }
102                     break output;
103                 }
104                 TryFlattenProj::Empty => break None,
105             }
106         })
107     }
108 }
109 
110 #[cfg(feature = "sink")]
111 impl<Fut, Item> Sink<Item> for TryFlatten<Fut, Fut::Ok>
112 where
113     Fut: TryFuture,
114     Fut::Ok: Sink<Item, Error = Fut::Error>,
115 {
116     type Error = Fut::Error;
117 
poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>118     fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119         Poll::Ready(loop {
120             match self.as_mut().project() {
121                 TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
122                     Ok(f) => self.set(Self::Second { f }),
123                     Err(e) => {
124                         self.set(Self::Empty);
125                         break Err(e);
126                     }
127                 },
128                 TryFlattenProj::Second { f } => {
129                     break ready!(f.poll_ready(cx));
130                 }
131                 TryFlattenProj::Empty => panic!("poll_ready called after eof"),
132             }
133         })
134     }
135 
start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>136     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
137         match self.project() {
138             TryFlattenProj::First { .. } => panic!("poll_ready not called first"),
139             TryFlattenProj::Second { f } => f.start_send(item),
140             TryFlattenProj::Empty => panic!("start_send called after eof"),
141         }
142     }
143 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>144     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145         match self.project() {
146             TryFlattenProj::First { .. } => Poll::Ready(Ok(())),
147             TryFlattenProj::Second { f } => f.poll_flush(cx),
148             TryFlattenProj::Empty => panic!("poll_flush called after eof"),
149         }
150     }
151 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>152     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153         let res = match self.as_mut().project() {
154             TryFlattenProj::Second { f } => f.poll_close(cx),
155             _ => Poll::Ready(Ok(())),
156         };
157         if res.is_ready() {
158             self.set(Self::Empty);
159         }
160         res
161     }
162 }
163