1 //! HTTP/2 Server Connections
2 
3 use std::error::Error as StdError;
4 use std::fmt;
5 use std::future::Future;
6 use std::marker::Unpin;
7 use std::pin::Pin;
8 use std::task::{Context, Poll};
9 use std::time::Duration;
10 
11 use pin_project_lite::pin_project;
12 use tokio::io::{AsyncRead, AsyncWrite};
13 
14 use crate::body::{Body as IncomingBody, HttpBody as Body};
15 use crate::common::exec::ConnStreamExec;
16 use crate::proto;
17 use crate::service::HttpService;
18 
19 pin_project! {
20     /// A future binding an HTTP/2 connection with a Service.
21     ///
22     /// Polling this future will drive HTTP forward.
23     #[must_use = "futures do nothing unless polled"]
24     pub struct Connection<T, S, E>
25     where
26         S: HttpService<IncomingBody>,
27     {
28         conn: proto::h2::Server<T, S, S::ResBody, E>,
29     }
30 }
31 
32 /// A configuration builder for HTTP/2 server connections.
33 #[derive(Clone, Debug)]
34 pub struct Builder<E> {
35     exec: E,
36     h2_builder: proto::h2::server::Config,
37 }
38 
39 // ===== impl Connection =====
40 
41 impl<I, S, E> fmt::Debug for Connection<I, S, E>
42 where
43     S: HttpService<IncomingBody>,
44 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result45     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46         f.debug_struct("Connection").finish()
47     }
48 }
49 
50 impl<I, B, S, E> Connection<I, S, E>
51 where
52     S: HttpService<IncomingBody, ResBody = B>,
53     S::Error: Into<Box<dyn StdError + Send + Sync>>,
54     I: AsyncRead + AsyncWrite + Unpin,
55     B: Body + 'static,
56     B::Error: Into<Box<dyn StdError + Send + Sync>>,
57     E: ConnStreamExec<S::Future, B>,
58 {
59     /// Start a graceful shutdown process for this connection.
60     ///
61     /// This `Connection` should continue to be polled until shutdown
62     /// can finish.
63     ///
64     /// # Note
65     ///
66     /// This should only be called while the `Connection` future is still
67     /// pending. If called after `Connection::poll` has resolved, this does
68     /// nothing.
graceful_shutdown(mut self: Pin<&mut Self>)69     pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
70         self.conn.graceful_shutdown();
71     }
72 }
73 
74 impl<I, B, S, E> Future for Connection<I, S, E>
75 where
76     S: HttpService<IncomingBody, ResBody = B>,
77     S::Error: Into<Box<dyn StdError + Send + Sync>>,
78     I: AsyncRead + AsyncWrite + Unpin + 'static,
79     B: Body + 'static,
80     B::Error: Into<Box<dyn StdError + Send + Sync>>,
81     E: ConnStreamExec<S::Future, B>,
82 {
83     type Output = crate::Result<()>;
84 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>85     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86         match ready!(Pin::new(&mut self.conn).poll(cx)) {
87             Ok(_done) => {
88                 //TODO: the proto::h2::Server no longer needs to return
89                 //the Dispatched enum
90                 Poll::Ready(Ok(()))
91             }
92             Err(e) => Poll::Ready(Err(e)),
93         }
94     }
95 }
96 
97 // ===== impl Builder =====
98 
99 impl<E> Builder<E> {
100     /// Create a new connection builder.
101     ///
102     /// This starts with the default options, and an executor.
new(exec: E) -> Self103     pub fn new(exec: E) -> Self {
104         Self {
105             exec: exec,
106             h2_builder: Default::default(),
107         }
108     }
109 
110     /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
111     /// stream-level flow control.
112     ///
113     /// Passing `None` will do nothing.
114     ///
115     /// If not set, hyper will use a default.
116     ///
117     /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self118     pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
119         if let Some(sz) = sz.into() {
120             self.h2_builder.adaptive_window = false;
121             self.h2_builder.initial_stream_window_size = sz;
122         }
123         self
124     }
125 
126     /// Sets the max connection-level flow control for HTTP2.
127     ///
128     /// Passing `None` will do nothing.
129     ///
130     /// If not set, hyper will use a default.
initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self131     pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
132         if let Some(sz) = sz.into() {
133             self.h2_builder.adaptive_window = false;
134             self.h2_builder.initial_conn_window_size = sz;
135         }
136         self
137     }
138 
139     /// Sets whether to use an adaptive flow control.
140     ///
141     /// Enabling this will override the limits set in
142     /// `initial_stream_window_size` and
143     /// `initial_connection_window_size`.
adaptive_window(&mut self, enabled: bool) -> &mut Self144     pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
145         use proto::h2::SPEC_WINDOW_SIZE;
146 
147         self.h2_builder.adaptive_window = enabled;
148         if enabled {
149             self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
150             self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
151         }
152         self
153     }
154 
155     /// Sets the maximum frame size to use for HTTP2.
156     ///
157     /// Passing `None` will do nothing.
158     ///
159     /// If not set, hyper will use a default.
max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self160     pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
161         if let Some(sz) = sz.into() {
162             self.h2_builder.max_frame_size = sz;
163         }
164         self
165     }
166 
167     /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
168     /// connections.
169     ///
170     /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
171     ///
172     /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self173     pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
174         self.h2_builder.max_concurrent_streams = max.into();
175         self
176     }
177 
178     /// Sets an interval for HTTP2 Ping frames should be sent to keep a
179     /// connection alive.
180     ///
181     /// Pass `None` to disable HTTP2 keep-alive.
182     ///
183     /// Default is currently disabled.
184     ///
185     /// # Cargo Feature
186     ///
keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self187     pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
188         self.h2_builder.keep_alive_interval = interval.into();
189         self
190     }
191 
192     /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
193     ///
194     /// If the ping is not acknowledged within the timeout, the connection will
195     /// be closed. Does nothing if `keep_alive_interval` is disabled.
196     ///
197     /// Default is 20 seconds.
198     ///
199     /// # Cargo Feature
200     ///
keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self201     pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
202         self.h2_builder.keep_alive_timeout = timeout;
203         self
204     }
205 
206     /// Set the maximum write buffer size for each HTTP/2 stream.
207     ///
208     /// Default is currently ~400KB, but may change.
209     ///
210     /// # Panics
211     ///
212     /// The value must be no larger than `u32::MAX`.
max_send_buf_size(&mut self, max: usize) -> &mut Self213     pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
214         assert!(max <= std::u32::MAX as usize);
215         self.h2_builder.max_send_buffer_size = max;
216         self
217     }
218 
219     /// Enables the [extended CONNECT protocol].
220     ///
221     /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
enable_connect_protocol(&mut self) -> &mut Self222     pub fn enable_connect_protocol(&mut self) -> &mut Self {
223         self.h2_builder.enable_connect_protocol = true;
224         self
225     }
226 
227     /// Sets the max size of received header frames.
228     ///
229     /// Default is currently ~16MB, but may change.
max_header_list_size(&mut self, max: u32) -> &mut Self230     pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
231         self.h2_builder.max_header_list_size = max;
232         self
233     }
234 
235     // /// Set the timer used in background tasks.
236     // pub fn timer<M>(&mut self, timer: M) -> &mut Self
237     // where
238     //     M: Timer + Send + Sync + 'static,
239     // {
240     //     self.timer = Time::Timer(Arc::new(timer));
241     //     self
242     // }
243 
244     /// Bind a connection together with a [`Service`](crate::service::Service).
245     ///
246     /// This returns a Future that must be polled in order for HTTP to be
247     /// driven on the connection.
serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E> where S: HttpService<IncomingBody, ResBody = Bd>, S::Error: Into<Box<dyn StdError + Send + Sync>>, Bd: Body + 'static, Bd::Error: Into<Box<dyn StdError + Send + Sync>>, I: AsyncRead + AsyncWrite + Unpin, E: ConnStreamExec<S::Future, Bd>,248     pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
249     where
250         S: HttpService<IncomingBody, ResBody = Bd>,
251         S::Error: Into<Box<dyn StdError + Send + Sync>>,
252         Bd: Body + 'static,
253         Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
254         I: AsyncRead + AsyncWrite + Unpin,
255         E: ConnStreamExec<S::Future, Bd>,
256     {
257         let proto = proto::h2::Server::new(io, service, &self.h2_builder, self.exec.clone());
258         Connection { conn: proto }
259     }
260 }
261