1 //! HTTP/1 Server Connections
2 
3 use std::error::Error as StdError;
4 use std::fmt;
5 use std::future::Future;
6 use std::marker::Unpin;
7 use std::pin::Pin;
8 use std::task::{Context, Poll};
9 use std::time::Duration;
10 
11 use bytes::Bytes;
12 use tokio::io::{AsyncRead, AsyncWrite};
13 
14 use crate::body::{Body as IncomingBody, HttpBody as Body};
15 use crate::proto;
16 use crate::service::HttpService;
17 
18 type Http1Dispatcher<T, B, S> = proto::h1::Dispatcher<
19     proto::h1::dispatch::Server<S, IncomingBody>,
20     B,
21     T,
22     proto::ServerTransaction,
23 >;
24 
25 pin_project_lite::pin_project! {
26     /// A future binding an http1 connection with a Service.
27     ///
28     /// Polling this future will drive HTTP forward.
29     #[must_use = "futures do nothing unless polled"]
30     pub struct Connection<T, S>
31     where
32         S: HttpService<IncomingBody>,
33     {
34         conn: Http1Dispatcher<T, S::ResBody, S>,
35     }
36 }
37 
38 /// A configuration builder for HTTP/1 server connections.
39 #[derive(Clone, Debug)]
40 pub struct Builder {
41     h1_half_close: bool,
42     h1_keep_alive: bool,
43     h1_title_case_headers: bool,
44     h1_preserve_header_case: bool,
45     h1_header_read_timeout: Option<Duration>,
46     h1_writev: Option<bool>,
47     max_buf_size: Option<usize>,
48     pipeline_flush: bool,
49 }
50 
51 /// Deconstructed parts of a `Connection`.
52 ///
53 /// This allows taking apart a `Connection` at a later time, in order to
54 /// reclaim the IO object, and additional related pieces.
55 #[derive(Debug)]
56 pub struct Parts<T, S> {
57     /// The original IO object used in the handshake.
58     pub io: T,
59     /// A buffer of bytes that have been read but not processed as HTTP.
60     ///
61     /// If the client sent additional bytes after its last request, and
62     /// this connection "ended" with an upgrade, the read buffer will contain
63     /// those bytes.
64     ///
65     /// You will want to check for any existing bytes if you plan to continue
66     /// communicating on the IO object.
67     pub read_buf: Bytes,
68     /// The `Service` used to serve this connection.
69     pub service: S,
70     _inner: (),
71 }
72 
73 // ===== impl Connection =====
74 
75 impl<I, S> fmt::Debug for Connection<I, S>
76 where
77     S: HttpService<IncomingBody>,
78 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result79     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80         f.debug_struct("Connection").finish()
81     }
82 }
83 
84 impl<I, B, S> Connection<I, S>
85 where
86     S: HttpService<IncomingBody, ResBody = B>,
87     S::Error: Into<Box<dyn StdError + Send + Sync>>,
88     I: AsyncRead + AsyncWrite + Unpin,
89     B: Body + 'static,
90     B::Error: Into<Box<dyn StdError + Send + Sync>>,
91 {
92     /// Start a graceful shutdown process for this connection.
93     ///
94     /// This `Connection` should continue to be polled until shutdown
95     /// can finish.
96     ///
97     /// # Note
98     ///
99     /// This should only be called while the `Connection` future is still
100     /// pending. If called after `Connection::poll` has resolved, this does
101     /// nothing.
graceful_shutdown(mut self: Pin<&mut Self>)102     pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
103         self.conn.disable_keep_alive();
104     }
105 
106     /// Return the inner IO object, and additional information.
107     ///
108     /// If the IO object has been "rewound" the io will not contain those bytes rewound.
109     /// This should only be called after `poll_without_shutdown` signals
110     /// that the connection is "done". Otherwise, it may not have finished
111     /// flushing all necessary HTTP bytes.
112     ///
113     /// # Panics
114     /// This method will panic if this connection is using an h2 protocol.
into_parts(self) -> Parts<I, S>115     pub fn into_parts(self) -> Parts<I, S> {
116         let (io, read_buf, dispatch) = self.conn.into_inner();
117         Parts {
118             io,
119             read_buf,
120             service: dispatch.into_service(),
121             _inner: (),
122         }
123     }
124 
125     /// Poll the connection for completion, but without calling `shutdown`
126     /// on the underlying IO.
127     ///
128     /// This is useful to allow running a connection while doing an HTTP
129     /// upgrade. Once the upgrade is completed, the connection would be "done",
130     /// but it is not desired to actually shutdown the IO object. Instead you
131     /// would take it back using `into_parts`.
poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> where S: Unpin, S::Future: Unpin, B: Unpin,132     pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>
133     where
134         S: Unpin,
135         S::Future: Unpin,
136         B: Unpin,
137     {
138         self.conn.poll_without_shutdown(cx)
139     }
140 
141     /// Prevent shutdown of the underlying IO object at the end of service the request,
142     /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
143     ///
144     /// # Error
145     ///
146     /// This errors if the underlying connection protocol is not HTTP/1.
without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>> where S: Unpin, S::Future: Unpin, B: Unpin,147     pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>
148     where
149         S: Unpin,
150         S::Future: Unpin,
151         B: Unpin,
152     {
153         let mut zelf = Some(self);
154         futures_util::future::poll_fn(move |cx| {
155             ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?;
156             Poll::Ready(Ok(zelf.take().unwrap().into_parts()))
157         })
158     }
159 
160     /// Enable this connection to support higher-level HTTP upgrades.
161     ///
162     /// See [the `upgrade` module](crate::upgrade) for more.
with_upgrades(self) -> upgrades::UpgradeableConnection<I, S> where I: Send,163     pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<I, S>
164     where
165         I: Send,
166     {
167         upgrades::UpgradeableConnection { inner: Some(self) }
168     }
169 }
170 
171 impl<I, B, S> Future for Connection<I, S>
172 where
173     S: HttpService<IncomingBody, ResBody = B>,
174     S::Error: Into<Box<dyn StdError + Send + Sync>>,
175     I: AsyncRead + AsyncWrite + Unpin + 'static,
176     B: Body + 'static,
177     B::Error: Into<Box<dyn StdError + Send + Sync>>,
178 {
179     type Output = crate::Result<()>;
180 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>181     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
182         match ready!(Pin::new(&mut self.conn).poll(cx)) {
183             Ok(done) => {
184                 match done {
185                     proto::Dispatched::Shutdown => {}
186                     proto::Dispatched::Upgrade(pending) => {
187                         // With no `Send` bound on `I`, we can't try to do
188                         // upgrades here. In case a user was trying to use
189                         // `Body::on_upgrade` with this API, send a special
190                         // error letting them know about that.
191                         pending.manual();
192                     }
193                 };
194                 return Poll::Ready(Ok(()));
195             }
196             Err(e) => Poll::Ready(Err(e)),
197         }
198     }
199 }
200 
201 // ===== impl Builder =====
202 
203 impl Builder {
204     /// Create a new connection builder.
new() -> Self205     pub fn new() -> Self {
206         Self {
207             h1_half_close: false,
208             h1_keep_alive: true,
209             h1_title_case_headers: false,
210             h1_preserve_header_case: false,
211             h1_header_read_timeout: None,
212             h1_writev: None,
213             max_buf_size: None,
214             pipeline_flush: false,
215         }
216     }
217     /// Set whether HTTP/1 connections should support half-closures.
218     ///
219     /// Clients can chose to shutdown their write-side while waiting
220     /// for the server to respond. Setting this to `true` will
221     /// prevent closing the connection immediately if `read`
222     /// detects an EOF in the middle of a request.
223     ///
224     /// Default is `false`.
half_close(&mut self, val: bool) -> &mut Self225     pub fn half_close(&mut self, val: bool) -> &mut Self {
226         self.h1_half_close = val;
227         self
228     }
229 
230     /// Enables or disables HTTP/1 keep-alive.
231     ///
232     /// Default is true.
keep_alive(&mut self, val: bool) -> &mut Self233     pub fn keep_alive(&mut self, val: bool) -> &mut Self {
234         self.h1_keep_alive = val;
235         self
236     }
237 
238     /// Set whether HTTP/1 connections will write header names as title case at
239     /// the socket level.
240     ///
241     /// Default is false.
title_case_headers(&mut self, enabled: bool) -> &mut Self242     pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
243         self.h1_title_case_headers = enabled;
244         self
245     }
246 
247     /// Set whether to support preserving original header cases.
248     ///
249     /// Currently, this will record the original cases received, and store them
250     /// in a private extension on the `Request`. It will also look for and use
251     /// such an extension in any provided `Response`.
252     ///
253     /// Since the relevant extension is still private, there is no way to
254     /// interact with the original cases. The only effect this can have now is
255     /// to forward the cases in a proxy-like fashion.
256     ///
257     /// Default is false.
preserve_header_case(&mut self, enabled: bool) -> &mut Self258     pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
259         self.h1_preserve_header_case = enabled;
260         self
261     }
262 
263     /// Set a timeout for reading client request headers. If a client does not
264     /// transmit the entire header within this time, the connection is closed.
265     ///
266     /// Default is None.
header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self267     pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
268         self.h1_header_read_timeout = Some(read_timeout);
269         self
270     }
271 
272     /// Set whether HTTP/1 connections should try to use vectored writes,
273     /// or always flatten into a single buffer.
274     ///
275     /// Note that setting this to false may mean more copies of body data,
276     /// but may also improve performance when an IO transport doesn't
277     /// support vectored writes well, such as most TLS implementations.
278     ///
279     /// Setting this to true will force hyper to use queued strategy
280     /// which may eliminate unnecessary cloning on some TLS backends
281     ///
282     /// Default is `auto`. In this mode hyper will try to guess which
283     /// mode to use
writev(&mut self, val: bool) -> &mut Self284     pub fn writev(&mut self, val: bool) -> &mut Self {
285         self.h1_writev = Some(val);
286         self
287     }
288 
289     /// Set the maximum buffer size for the connection.
290     ///
291     /// Default is ~400kb.
292     ///
293     /// # Panics
294     ///
295     /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
max_buf_size(&mut self, max: usize) -> &mut Self296     pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
297         assert!(
298             max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
299             "the max_buf_size cannot be smaller than the minimum that h1 specifies."
300         );
301         self.max_buf_size = Some(max);
302         self
303     }
304 
305     /// Aggregates flushes to better support pipelined responses.
306     ///
307     /// Experimental, may have bugs.
308     ///
309     /// Default is false.
pipeline_flush(&mut self, enabled: bool) -> &mut Self310     pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
311         self.pipeline_flush = enabled;
312         self
313     }
314 
315     // /// Set the timer used in background tasks.
316     // pub fn timer<M>(&mut self, timer: M) -> &mut Self
317     // where
318     //     M: Timer + Send + Sync + 'static,
319     // {
320     //     self.timer = Time::Timer(Arc::new(timer));
321     //     self
322     // }
323 
324     /// Bind a connection together with a [`Service`](crate::service::Service).
325     ///
326     /// This returns a Future that must be polled in order for HTTP to be
327     /// driven on the connection.
328     ///
329     /// # Example
330     ///
331     /// ```
332     /// # use hyper::{Body as Incoming, Request, Response};
333     /// # use hyper::service::Service;
334     /// # use hyper::server::conn::http1::Builder;
335     /// # use tokio::io::{AsyncRead, AsyncWrite};
336     /// # async fn run<I, S>(some_io: I, some_service: S)
337     /// # where
338     /// #     I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
339     /// #     S: Service<hyper::Request<Incoming>, Response=hyper::Response<Incoming>> + Send + 'static,
340     /// #     S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
341     /// #     S::Future: Send,
342     /// # {
343     /// let http = Builder::new();
344     /// let conn = http.serve_connection(some_io, some_service);
345     ///
346     /// if let Err(e) = conn.await {
347     ///     eprintln!("server connection error: {}", e);
348     /// }
349     /// # }
350     /// # fn main() {}
351     /// ```
serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S> where S: HttpService<IncomingBody>, S::Error: Into<Box<dyn StdError + Send + Sync>>, S::ResBody: 'static, <S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>, I: AsyncRead + AsyncWrite + Unpin,352     pub fn serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S>
353     where
354         S: HttpService<IncomingBody>,
355         S::Error: Into<Box<dyn StdError + Send + Sync>>,
356         S::ResBody: 'static,
357         <S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
358         I: AsyncRead + AsyncWrite + Unpin,
359     {
360         let mut conn = proto::Conn::new(io);
361         if !self.h1_keep_alive {
362             conn.disable_keep_alive();
363         }
364         if self.h1_half_close {
365             conn.set_allow_half_close();
366         }
367         if self.h1_title_case_headers {
368             conn.set_title_case_headers();
369         }
370         if self.h1_preserve_header_case {
371             conn.set_preserve_header_case();
372         }
373         if let Some(header_read_timeout) = self.h1_header_read_timeout {
374             conn.set_http1_header_read_timeout(header_read_timeout);
375         }
376         if let Some(writev) = self.h1_writev {
377             if writev {
378                 conn.set_write_strategy_queue();
379             } else {
380                 conn.set_write_strategy_flatten();
381             }
382         }
383         conn.set_flush_pipeline(self.pipeline_flush);
384         if let Some(max) = self.max_buf_size {
385             conn.set_max_buf_size(max);
386         }
387         let sd = proto::h1::dispatch::Server::new(service);
388         let proto = proto::h1::Dispatcher::new(sd, conn);
389         Connection { conn: proto }
390     }
391 }
392 
393 mod upgrades {
394     use crate::upgrade::Upgraded;
395 
396     use super::*;
397 
398     // A future binding a connection with a Service with Upgrade support.
399     //
400     // This type is unnameable outside the crate.
401     #[must_use = "futures do nothing unless polled"]
402     #[allow(missing_debug_implementations)]
403     pub struct UpgradeableConnection<T, S>
404     where
405         S: HttpService<IncomingBody>,
406     {
407         pub(super) inner: Option<Connection<T, S>>,
408     }
409 
410     impl<I, B, S> UpgradeableConnection<I, S>
411     where
412         S: HttpService<IncomingBody, ResBody = B>,
413         S::Error: Into<Box<dyn StdError + Send + Sync>>,
414         I: AsyncRead + AsyncWrite + Unpin,
415         B: Body + 'static,
416         B::Error: Into<Box<dyn StdError + Send + Sync>>,
417     {
418         /// Start a graceful shutdown process for this connection.
419         ///
420         /// This `Connection` should continue to be polled until shutdown
421         /// can finish.
graceful_shutdown(mut self: Pin<&mut Self>)422         pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
423             Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown()
424         }
425     }
426 
427     impl<I, B, S> Future for UpgradeableConnection<I, S>
428     where
429         S: HttpService<IncomingBody, ResBody = B>,
430         S::Error: Into<Box<dyn StdError + Send + Sync>>,
431         I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
432         B: Body + 'static,
433         B::Error: Into<Box<dyn StdError + Send + Sync>>,
434     {
435         type Output = crate::Result<()>;
436 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>437         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
438             match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) {
439                 Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
440                 Ok(proto::Dispatched::Upgrade(pending)) => {
441                     let (io, buf, _) = self.inner.take().unwrap().conn.into_inner();
442                     pending.fulfill(Upgraded::new(io, buf));
443                     Poll::Ready(Ok(()))
444                 }
445                 Err(e) => Poll::Ready(Err(e)),
446             }
447         }
448     }
449 }
450