//! Future types use super::{Policy, Retry}; use futures_core::ready; use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use tower_service::Service; pin_project! { /// The [`Future`] returned by a [`Retry`] service. #[derive(Debug)] pub struct ResponseFuture where P: Policy, S: Service, { request: Option, #[pin] retry: Retry, #[pin] state: State, } } pin_project! { #[project = StateProj] #[derive(Debug)] enum State { // Polling the future from [`Service::call`] Called { #[pin] future: F }, // Polling the future from [`Policy::retry`] Checking { #[pin] checking: P }, // Polling [`Service::poll_ready`] after [`Checking`] was OK. Retrying, } } impl ResponseFuture where P: Policy, S: Service, { pub(crate) fn new( request: Option, retry: Retry, future: S::Future, ) -> ResponseFuture { ResponseFuture { request, retry, state: State::Called { future }, } } } impl Future for ResponseFuture where P: Policy + Clone, S: Service + Clone, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); loop { match this.state.as_mut().project() { StateProj::Called { future } => { let result = ready!(future.poll(cx)); if let Some(ref req) = this.request { match this.retry.policy.retry(req, result.as_ref()) { Some(checking) => { this.state.set(State::Checking { checking }); } None => return Poll::Ready(result), } } else { // request wasn't cloned, so no way to retry it return Poll::Ready(result); } } StateProj::Checking { checking } => { this.retry .as_mut() .project() .policy .set(ready!(checking.poll(cx))); this.state.set(State::Retrying); } StateProj::Retrying => { // NOTE: we assume here that // // this.retry.poll_ready() // // is equivalent to // // this.retry.service.poll_ready() // // we need to make that assumption to avoid adding an Unpin bound to the Policy // in Ready to make it Unpin so that we can get &mut Ready as needed to call // poll_ready on it. ready!(this.retry.as_mut().project().service.poll_ready(cx))?; let req = this .request .take() .expect("retrying requires cloned request"); *this.request = this.retry.policy.clone_request(&req); this.state.set(State::Called { future: this.retry.as_mut().project().service.call(req), }); } } } } }