1 //! HTTP/2 Server Connections 2 3 use std::error::Error as StdError; 4 use std::fmt; 5 use std::future::Future; 6 use std::marker::Unpin; 7 use std::pin::Pin; 8 use std::task::{Context, Poll}; 9 use std::time::Duration; 10 11 use pin_project_lite::pin_project; 12 use tokio::io::{AsyncRead, AsyncWrite}; 13 14 use crate::body::{Body as IncomingBody, HttpBody as Body}; 15 use crate::common::exec::ConnStreamExec; 16 use crate::proto; 17 use crate::service::HttpService; 18 19 pin_project! { 20 /// A future binding an HTTP/2 connection with a Service. 21 /// 22 /// Polling this future will drive HTTP forward. 23 #[must_use = "futures do nothing unless polled"] 24 pub struct Connection<T, S, E> 25 where 26 S: HttpService<IncomingBody>, 27 { 28 conn: proto::h2::Server<T, S, S::ResBody, E>, 29 } 30 } 31 32 /// A configuration builder for HTTP/2 server connections. 33 #[derive(Clone, Debug)] 34 pub struct Builder<E> { 35 exec: E, 36 h2_builder: proto::h2::server::Config, 37 } 38 39 // ===== impl Connection ===== 40 41 impl<I, S, E> fmt::Debug for Connection<I, S, E> 42 where 43 S: HttpService<IncomingBody>, 44 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 46 f.debug_struct("Connection").finish() 47 } 48 } 49 50 impl<I, B, S, E> Connection<I, S, E> 51 where 52 S: HttpService<IncomingBody, ResBody = B>, 53 S::Error: Into<Box<dyn StdError + Send + Sync>>, 54 I: AsyncRead + AsyncWrite + Unpin, 55 B: Body + 'static, 56 B::Error: Into<Box<dyn StdError + Send + Sync>>, 57 E: ConnStreamExec<S::Future, B>, 58 { 59 /// Start a graceful shutdown process for this connection. 60 /// 61 /// This `Connection` should continue to be polled until shutdown 62 /// can finish. 63 /// 64 /// # Note 65 /// 66 /// This should only be called while the `Connection` future is still 67 /// pending. If called after `Connection::poll` has resolved, this does 68 /// nothing. graceful_shutdown(mut self: Pin<&mut Self>)69 pub fn graceful_shutdown(mut self: Pin<&mut Self>) { 70 self.conn.graceful_shutdown(); 71 } 72 } 73 74 impl<I, B, S, E> Future for Connection<I, S, E> 75 where 76 S: HttpService<IncomingBody, ResBody = B>, 77 S::Error: Into<Box<dyn StdError + Send + Sync>>, 78 I: AsyncRead + AsyncWrite + Unpin + 'static, 79 B: Body + 'static, 80 B::Error: Into<Box<dyn StdError + Send + Sync>>, 81 E: ConnStreamExec<S::Future, B>, 82 { 83 type Output = crate::Result<()>; 84 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>85 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 86 match ready!(Pin::new(&mut self.conn).poll(cx)) { 87 Ok(_done) => { 88 //TODO: the proto::h2::Server no longer needs to return 89 //the Dispatched enum 90 Poll::Ready(Ok(())) 91 } 92 Err(e) => Poll::Ready(Err(e)), 93 } 94 } 95 } 96 97 // ===== impl Builder ===== 98 99 impl<E> Builder<E> { 100 /// Create a new connection builder. 101 /// 102 /// This starts with the default options, and an executor. new(exec: E) -> Self103 pub fn new(exec: E) -> Self { 104 Self { 105 exec: exec, 106 h2_builder: Default::default(), 107 } 108 } 109 110 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 111 /// stream-level flow control. 112 /// 113 /// Passing `None` will do nothing. 114 /// 115 /// If not set, hyper will use a default. 116 /// 117 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self118 pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { 119 if let Some(sz) = sz.into() { 120 self.h2_builder.adaptive_window = false; 121 self.h2_builder.initial_stream_window_size = sz; 122 } 123 self 124 } 125 126 /// Sets the max connection-level flow control for HTTP2. 127 /// 128 /// Passing `None` will do nothing. 129 /// 130 /// If not set, hyper will use a default. initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self131 pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { 132 if let Some(sz) = sz.into() { 133 self.h2_builder.adaptive_window = false; 134 self.h2_builder.initial_conn_window_size = sz; 135 } 136 self 137 } 138 139 /// Sets whether to use an adaptive flow control. 140 /// 141 /// Enabling this will override the limits set in 142 /// `initial_stream_window_size` and 143 /// `initial_connection_window_size`. adaptive_window(&mut self, enabled: bool) -> &mut Self144 pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self { 145 use proto::h2::SPEC_WINDOW_SIZE; 146 147 self.h2_builder.adaptive_window = enabled; 148 if enabled { 149 self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; 150 self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; 151 } 152 self 153 } 154 155 /// Sets the maximum frame size to use for HTTP2. 156 /// 157 /// Passing `None` will do nothing. 158 /// 159 /// If not set, hyper will use a default. max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self160 pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { 161 if let Some(sz) = sz.into() { 162 self.h2_builder.max_frame_size = sz; 163 } 164 self 165 } 166 167 /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 168 /// connections. 169 /// 170 /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing. 171 /// 172 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self173 pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self { 174 self.h2_builder.max_concurrent_streams = max.into(); 175 self 176 } 177 178 /// Sets an interval for HTTP2 Ping frames should be sent to keep a 179 /// connection alive. 180 /// 181 /// Pass `None` to disable HTTP2 keep-alive. 182 /// 183 /// Default is currently disabled. 184 /// 185 /// # Cargo Feature 186 /// keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self187 pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self { 188 self.h2_builder.keep_alive_interval = interval.into(); 189 self 190 } 191 192 /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. 193 /// 194 /// If the ping is not acknowledged within the timeout, the connection will 195 /// be closed. Does nothing if `keep_alive_interval` is disabled. 196 /// 197 /// Default is 20 seconds. 198 /// 199 /// # Cargo Feature 200 /// keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self201 pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { 202 self.h2_builder.keep_alive_timeout = timeout; 203 self 204 } 205 206 /// Set the maximum write buffer size for each HTTP/2 stream. 207 /// 208 /// Default is currently ~400KB, but may change. 209 /// 210 /// # Panics 211 /// 212 /// The value must be no larger than `u32::MAX`. max_send_buf_size(&mut self, max: usize) -> &mut Self213 pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self { 214 assert!(max <= std::u32::MAX as usize); 215 self.h2_builder.max_send_buffer_size = max; 216 self 217 } 218 219 /// Enables the [extended CONNECT protocol]. 220 /// 221 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 enable_connect_protocol(&mut self) -> &mut Self222 pub fn enable_connect_protocol(&mut self) -> &mut Self { 223 self.h2_builder.enable_connect_protocol = true; 224 self 225 } 226 227 /// Sets the max size of received header frames. 228 /// 229 /// Default is currently ~16MB, but may change. max_header_list_size(&mut self, max: u32) -> &mut Self230 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { 231 self.h2_builder.max_header_list_size = max; 232 self 233 } 234 235 // /// Set the timer used in background tasks. 236 // pub fn timer<M>(&mut self, timer: M) -> &mut Self 237 // where 238 // M: Timer + Send + Sync + 'static, 239 // { 240 // self.timer = Time::Timer(Arc::new(timer)); 241 // self 242 // } 243 244 /// Bind a connection together with a [`Service`](crate::service::Service). 245 /// 246 /// This returns a Future that must be polled in order for HTTP to be 247 /// driven on the connection. serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E> where S: HttpService<IncomingBody, ResBody = Bd>, S::Error: Into<Box<dyn StdError + Send + Sync>>, Bd: Body + 'static, Bd::Error: Into<Box<dyn StdError + Send + Sync>>, I: AsyncRead + AsyncWrite + Unpin, E: ConnStreamExec<S::Future, Bd>,248 pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E> 249 where 250 S: HttpService<IncomingBody, ResBody = Bd>, 251 S::Error: Into<Box<dyn StdError + Send + Sync>>, 252 Bd: Body + 'static, 253 Bd::Error: Into<Box<dyn StdError + Send + Sync>>, 254 I: AsyncRead + AsyncWrite + Unpin, 255 E: ConnStreamExec<S::Future, Bd>, 256 { 257 let proto = proto::h2::Server::new(io, service, &self.h2_builder, self.exec.clone()); 258 Connection { conn: proto } 259 } 260 } 261