//! HTTP/2 client connections use std::error::Error as StdError; use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::marker::Unpin; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; use http::{Request, Response}; use tokio::io::{AsyncRead, AsyncWrite}; use super::super::dispatch; use crate::body::{Body as IncomingBody, HttpBody as Body}; use crate::common::exec::{BoxSendFuture, Exec}; use crate::proto; use crate::rt::Executor; /// The sender side of an established connection. pub struct SendRequest { dispatch: dispatch::UnboundedSender, Response>, } impl Clone for SendRequest { fn clone(&self) -> SendRequest { SendRequest { dispatch: self.dispatch.clone(), } } } /// A future that processes all HTTP state for the IO object. /// /// In most cases, this should just be spawned into an executor, so that it /// can process incoming and outgoing messages, notice hangups, and the like. #[must_use = "futures do nothing unless polled"] pub struct Connection where T: AsyncRead + AsyncWrite + Send + 'static, B: Body + 'static, { inner: (PhantomData, proto::h2::ClientTask), } /// A builder to configure an HTTP connection. /// /// After setting options, the builder is used to create a handshake future. #[derive(Clone, Debug)] pub struct Builder { pub(super) exec: Exec, h2_builder: proto::h2::client::Config, } /// Returns a handshake future over some IO. /// /// This is a shortcut for `Builder::new().handshake(io)`. /// See [`client::conn`](crate::client::conn) for more. pub async fn handshake(exec: E, io: T) -> crate::Result<(SendRequest, Connection)> where E: Executor + Send + Sync + 'static, T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into>, { Builder::new(exec).handshake(io).await } // ===== impl SendRequest impl SendRequest { /// Polls to determine whether this sender can be used yet for a request. /// /// If the associated connection is closed, this returns an Error. pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { if self.is_closed() { Poll::Ready(Err(crate::Error::new_closed())) } else { Poll::Ready(Ok(())) } } /// Waits until the dispatcher is ready /// /// If the associated connection is closed, this returns an Error. pub async fn ready(&mut self) -> crate::Result<()> { futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await } /* pub(super) async fn when_ready(self) -> crate::Result { let mut me = Some(self); future::poll_fn(move |cx| { ready!(me.as_mut().unwrap().poll_ready(cx))?; Poll::Ready(Ok(me.take().unwrap())) }) .await } pub(super) fn is_ready(&self) -> bool { self.dispatch.is_ready() } */ pub(super) fn is_closed(&self) -> bool { self.dispatch.is_closed() } } impl SendRequest where B: Body + 'static, { /// Sends a `Request` on the associated connection. /// /// Returns a future that if successful, yields the `Response`. /// /// # Note /// /// There are some key differences in what automatic things the `Client` /// does for you that will not be done here: /// /// - `Client` requires absolute-form `Uri`s, since the scheme and /// authority are needed to connect. They aren't required here. /// - Since the `Client` requires absolute-form `Uri`s, it can add /// the `Host` header based on it. You must add a `Host` header yourself /// before calling this method. /// - Since absolute-form `Uri`s are not required, if received, they will /// be serialized as-is. pub fn send_request( &mut self, req: Request, ) -> impl Future>> { let sent = self.dispatch.send(req); async move { match sent { Ok(rx) => match rx.await { Ok(Ok(resp)) => Ok(resp), Ok(Err(err)) => Err(err), // this is definite bug if it happens, but it shouldn't happen! Err(_canceled) => panic!("dispatch dropped without returning error"), }, Err(_req) => { tracing::debug!("connection was not ready"); Err(crate::Error::new_canceled().with("connection was not ready")) } } } } /* pub(super) fn send_request_retryable( &mut self, req: Request, ) -> impl Future, (crate::Error, Option>)>> + Unpin where B: Send, { match self.dispatch.try_send(req) { Ok(rx) => { Either::Left(rx.then(move |res| { match res { Ok(Ok(res)) => future::ok(res), Ok(Err(err)) => future::err(err), // this is definite bug if it happens, but it shouldn't happen! Err(_) => panic!("dispatch dropped without returning error"), } })) } Err(req) => { tracing::debug!("connection was not ready"); let err = crate::Error::new_canceled().with("connection was not ready"); Either::Right(future::err((err, Some(req)))) } } } */ } impl fmt::Debug for SendRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SendRequest").finish() } } // ===== impl Connection impl Connection where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + Unpin + Send + 'static, B::Data: Send, B::Error: Into>, { /// Returns whether the [extended CONNECT protocol][1] is enabled or not. /// /// This setting is configured by the server peer by sending the /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. /// This method returns the currently acknowledged value received from the /// remote. /// /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 pub fn is_extended_connect_protocol_enabled(&self) -> bool { self.inner.1.is_extended_connect_protocol_enabled() } } impl fmt::Debug for Connection where T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static, B: Body + 'static, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection").finish() } } impl Future for Connection where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + Send + 'static, B::Data: Send, B::Error: Into>, { type Output = crate::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(Pin::new(&mut self.inner.1).poll(cx))? { proto::Dispatched::Shutdown => Poll::Ready(Ok(())), #[cfg(feature = "http1")] proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"), } } } // ===== impl Builder impl Builder { /// Creates a new connection builder. #[inline] pub fn new(exec: E) -> Builder where E: Executor + Send + Sync + 'static, { use std::sync::Arc; Builder { exec: Exec::Executor(Arc::new(exec)), h2_builder: Default::default(), } } /// Provide an executor to execute background HTTP2 tasks. pub fn executor(&mut self, exec: E) -> &mut Builder where E: Executor + Send + Sync + 'static, { self.exec = Exec::Executor(Arc::new(exec)); self } /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 /// stream-level flow control. /// /// Passing `None` will do nothing. /// /// If not set, hyper will use a default. /// /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE pub fn initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { if let Some(sz) = sz.into() { self.h2_builder.adaptive_window = false; self.h2_builder.initial_stream_window_size = sz; } self } /// Sets the max connection-level flow control for HTTP2 /// /// Passing `None` will do nothing. /// /// If not set, hyper will use a default. pub fn initial_connection_window_size(&mut self, sz: impl Into>) -> &mut Self { if let Some(sz) = sz.into() { self.h2_builder.adaptive_window = false; self.h2_builder.initial_conn_window_size = sz; } self } /// Sets whether to use an adaptive flow control. /// /// Enabling this will override the limits set in /// `initial_stream_window_size` and /// `initial_connection_window_size`. pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self { use proto::h2::SPEC_WINDOW_SIZE; self.h2_builder.adaptive_window = enabled; if enabled { self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; } self } /// Sets the maximum frame size to use for HTTP2. /// /// Passing `None` will do nothing. /// /// If not set, hyper will use a default. pub fn max_frame_size(&mut self, sz: impl Into>) -> &mut Self { if let Some(sz) = sz.into() { self.h2_builder.max_frame_size = sz; } self } /// Sets an interval for HTTP2 Ping frames should be sent to keep a /// connection alive. /// /// Pass `None` to disable HTTP2 keep-alive. /// /// Default is currently disabled. #[cfg(feature = "runtime")] pub fn keep_alive_interval(&mut self, interval: impl Into>) -> &mut Self { self.h2_builder.keep_alive_interval = interval.into(); self } /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. /// /// If the ping is not acknowledged within the timeout, the connection will /// be closed. Does nothing if `keep_alive_interval` is disabled. /// /// Default is 20 seconds. #[cfg(feature = "runtime")] pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { self.h2_builder.keep_alive_timeout = timeout; self } /// Sets whether HTTP2 keep-alive should apply while the connection is idle. /// /// If disabled, keep-alive pings are only sent while there are open /// request/responses streams. If enabled, pings are also sent when no /// streams are active. Does nothing if `keep_alive_interval` is /// disabled. /// /// Default is `false`. #[cfg(feature = "runtime")] pub fn keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self { self.h2_builder.keep_alive_while_idle = enabled; self } /// Sets the maximum number of HTTP2 concurrent locally reset streams. /// /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more /// details. /// /// The default value is determined by the `h2` crate. /// /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { self.h2_builder.max_concurrent_reset_streams = Some(max); self } /// Set the maximum write buffer size for each HTTP/2 stream. /// /// Default is currently 1MB, but may change. /// /// # Panics /// /// The value must be no larger than `u32::MAX`. pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self { assert!(max <= std::u32::MAX as usize); self.h2_builder.max_send_buffer_size = max; self } /// Constructs a connection with the configured options and IO. /// See [`client::conn`](crate::client::conn) for more. /// /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will /// do nothing. pub fn handshake( &self, io: T, ) -> impl Future, Connection)>> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into>, { let opts = self.clone(); async move { tracing::trace!("client handshake HTTP/1"); let (tx, rx) = dispatch::channel(); let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec).await?; Ok(( SendRequest { dispatch: tx.unbound(), }, Connection { inner: (PhantomData, h2), }, )) } } }