1 //! Future types for the [`Buffer`] middleware. 2 //! 3 //! [`Buffer`]: crate::buffer::Buffer 4 5 use super::{error::Closed, message}; 6 use futures_core::ready; 7 use pin_project_lite::pin_project; 8 use std::{ 9 future::Future, 10 pin::Pin, 11 task::{Context, Poll}, 12 }; 13 14 pin_project! { 15 /// Future that completes when the buffered service eventually services the submitted request. 16 #[derive(Debug)] 17 pub struct ResponseFuture<T> { 18 #[pin] 19 state: ResponseState<T>, 20 } 21 } 22 23 pin_project! { 24 #[project = ResponseStateProj] 25 #[derive(Debug)] 26 enum ResponseState<T> { 27 Failed { 28 error: Option<crate::BoxError>, 29 }, 30 Rx { 31 #[pin] 32 rx: message::Rx<T>, 33 }, 34 Poll { 35 #[pin] 36 fut: T, 37 }, 38 } 39 } 40 41 impl<T> ResponseFuture<T> { new(rx: message::Rx<T>) -> Self42 pub(crate) fn new(rx: message::Rx<T>) -> Self { 43 ResponseFuture { 44 state: ResponseState::Rx { rx }, 45 } 46 } 47 failed(err: crate::BoxError) -> Self48 pub(crate) fn failed(err: crate::BoxError) -> Self { 49 ResponseFuture { 50 state: ResponseState::Failed { error: Some(err) }, 51 } 52 } 53 } 54 55 impl<F, T, E> Future for ResponseFuture<F> 56 where 57 F: Future<Output = Result<T, E>>, 58 E: Into<crate::BoxError>, 59 { 60 type Output = Result<T, crate::BoxError>; 61 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>62 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 63 let mut this = self.project(); 64 65 loop { 66 match this.state.as_mut().project() { 67 ResponseStateProj::Failed { error } => { 68 return Poll::Ready(Err(error.take().expect("polled after error"))); 69 } 70 ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) { 71 Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }), 72 Ok(Err(e)) => return Poll::Ready(Err(e.into())), 73 Err(_) => return Poll::Ready(Err(Closed::new().into())), 74 }, 75 ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into), 76 } 77 } 78 } 79 } 80