1 //! Future types for the [`Buffer`] middleware.
2 //!
3 //! [`Buffer`]: crate::buffer::Buffer
4 
5 use super::{error::Closed, message};
6 use futures_core::ready;
7 use pin_project_lite::pin_project;
8 use std::{
9     future::Future,
10     pin::Pin,
11     task::{Context, Poll},
12 };
13 
14 pin_project! {
15     /// Future that completes when the buffered service eventually services the submitted request.
16     #[derive(Debug)]
17     pub struct ResponseFuture<T> {
18         #[pin]
19         state: ResponseState<T>,
20     }
21 }
22 
23 pin_project! {
24     #[project = ResponseStateProj]
25     #[derive(Debug)]
26     enum ResponseState<T> {
27         Failed {
28             error: Option<crate::BoxError>,
29         },
30         Rx {
31             #[pin]
32             rx: message::Rx<T>,
33         },
34         Poll {
35             #[pin]
36             fut: T,
37         },
38     }
39 }
40 
41 impl<T> ResponseFuture<T> {
new(rx: message::Rx<T>) -> Self42     pub(crate) fn new(rx: message::Rx<T>) -> Self {
43         ResponseFuture {
44             state: ResponseState::Rx { rx },
45         }
46     }
47 
failed(err: crate::BoxError) -> Self48     pub(crate) fn failed(err: crate::BoxError) -> Self {
49         ResponseFuture {
50             state: ResponseState::Failed { error: Some(err) },
51         }
52     }
53 }
54 
55 impl<F, T, E> Future for ResponseFuture<F>
56 where
57     F: Future<Output = Result<T, E>>,
58     E: Into<crate::BoxError>,
59 {
60     type Output = Result<T, crate::BoxError>;
61 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>62     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63         let mut this = self.project();
64 
65         loop {
66             match this.state.as_mut().project() {
67                 ResponseStateProj::Failed { error } => {
68                     return Poll::Ready(Err(error.take().expect("polled after error")));
69                 }
70                 ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
71                     Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
72                     Ok(Err(e)) => return Poll::Ready(Err(e.into())),
73                     Err(_) => return Poll::Ready(Err(Closed::new().into())),
74                 },
75                 ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
76             }
77         }
78     }
79 }
80