1 //! Lower-level Server connection API. 2 //! 3 //! The types in this module are to provide a lower-level API based around a 4 //! single connection. Accepting a connection and binding it with a service 5 //! are not handled at this level. This module provides the building blocks to 6 //! customize those things externally. 7 //! 8 //! If you don't have need to manage connections yourself, consider using the 9 //! higher-level [Server](super) API. 10 //! 11 //! ## Example 12 //! A simple example that uses the `Http` struct to talk HTTP over a Tokio TCP stream 13 //! ```no_run 14 //! # #[cfg(all(feature = "http1", feature = "runtime"))] 15 //! # mod rt { 16 //! use http::{Request, Response, StatusCode}; 17 //! use hyper::{server::conn::Http, service::service_fn, Body}; 18 //! use std::{net::SocketAddr, convert::Infallible}; 19 //! use tokio::net::TcpListener; 20 //! 21 //! #[tokio::main] 22 //! async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 23 //! let addr: SocketAddr = ([127, 0, 0, 1], 8080).into(); 24 //! 25 //! let mut tcp_listener = TcpListener::bind(addr).await?; 26 //! loop { 27 //! let (tcp_stream, _) = tcp_listener.accept().await?; 28 //! tokio::task::spawn(async move { 29 //! if let Err(http_err) = Http::new() 30 //! .http1_only(true) 31 //! .http1_keep_alive(true) 32 //! .serve_connection(tcp_stream, service_fn(hello)) 33 //! .await { 34 //! eprintln!("Error while serving HTTP connection: {}", http_err); 35 //! } 36 //! }); 37 //! } 38 //! } 39 //! 40 //! async fn hello(_req: Request<Body>) -> Result<Response<Body>, Infallible> { 41 //! Ok(Response::new(Body::from("Hello World!"))) 42 //! } 43 //! # } 44 //! ``` 45 46 #[cfg(all( 47 any(feature = "http1", feature = "http2"), 48 not(all(feature = "http1", feature = "http2")) 49 ))] 50 use std::marker::PhantomData; 51 #[cfg(all(any(feature = "http1", feature = "http2"), feature = "runtime"))] 52 use std::time::Duration; 53 54 #[cfg(feature = "http2")] 55 use crate::common::io::Rewind; 56 #[cfg(all(feature = "http1", feature = "http2"))] 57 use crate::error::{Kind, Parse}; 58 #[cfg(feature = "http1")] 59 use crate::upgrade::Upgraded; 60 61 #[cfg(all(feature = "backports", feature = "http1"))] 62 pub mod http1; 63 #[cfg(all(feature = "backports", feature = "http2"))] 64 pub mod http2; 65 66 cfg_feature! { 67 #![any(feature = "http1", feature = "http2")] 68 69 use std::error::Error as StdError; 70 use std::fmt; 71 use std::task::{Context, Poll}; 72 use std::pin::Pin; 73 use std::future::Future; 74 use std::marker::Unpin; 75 #[cfg(not(all(feature = "http1", feature = "http2")))] 76 use std::convert::Infallible; 77 78 use bytes::Bytes; 79 use pin_project_lite::pin_project; 80 use tokio::io::{AsyncRead, AsyncWrite}; 81 use tracing::trace; 82 83 pub use super::server::Connecting; 84 use crate::body::{Body, HttpBody}; 85 use crate::common::exec::{ConnStreamExec, Exec}; 86 use crate::proto; 87 use crate::service::HttpService; 88 89 pub(super) use self::upgrades::UpgradeableConnection; 90 } 91 92 #[cfg(feature = "tcp")] 93 pub use super::tcp::{AddrIncoming, AddrStream}; 94 95 /// A lower-level configuration of the HTTP protocol. 96 /// 97 /// This structure is used to configure options for an HTTP server connection. 98 /// 99 /// If you don't have need to manage connections yourself, consider using the 100 /// higher-level [Server](super) API. 101 #[derive(Clone, Debug)] 102 #[cfg(any(feature = "http1", feature = "http2"))] 103 #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] 104 #[cfg_attr( 105 feature = "deprecated", 106 deprecated( 107 note = "This struct will be replaced with `server::conn::http1::Builder` and `server::conn::http2::Builder` in 1.0, enable the \"backports\" feature to use them now." 108 ) 109 )] 110 pub struct Http<E = Exec> { 111 pub(crate) exec: E, 112 h1_half_close: bool, 113 h1_keep_alive: bool, 114 h1_title_case_headers: bool, 115 h1_preserve_header_case: bool, 116 #[cfg(all(feature = "http1", feature = "runtime"))] 117 h1_header_read_timeout: Option<Duration>, 118 h1_writev: Option<bool>, 119 #[cfg(feature = "http2")] 120 h2_builder: proto::h2::server::Config, 121 mode: ConnectionMode, 122 max_buf_size: Option<usize>, 123 pipeline_flush: bool, 124 } 125 126 /// The internal mode of HTTP protocol which indicates the behavior when a parse error occurs. 127 #[cfg(any(feature = "http1", feature = "http2"))] 128 #[derive(Clone, Debug, PartialEq)] 129 enum ConnectionMode { 130 /// Always use HTTP/1 and do not upgrade when a parse error occurs. 131 #[cfg(feature = "http1")] 132 H1Only, 133 /// Always use HTTP/2. 134 #[cfg(feature = "http2")] 135 H2Only, 136 /// Use HTTP/1 and try to upgrade to h2 when a parse error occurs. 137 #[cfg(all(feature = "http1", feature = "http2"))] 138 Fallback, 139 } 140 141 #[cfg(any(feature = "http1", feature = "http2"))] 142 pin_project! { 143 /// A future binding a connection with a Service. 144 /// 145 /// Polling this future will drive HTTP forward. 146 #[must_use = "futures do nothing unless polled"] 147 #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] 148 pub struct Connection<T, S, E = Exec> 149 where 150 S: HttpService<Body>, 151 { 152 pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>, 153 fallback: Fallback<E>, 154 } 155 } 156 157 #[cfg(feature = "http1")] 158 type Http1Dispatcher<T, B, S> = 159 proto::h1::Dispatcher<proto::h1::dispatch::Server<S, Body>, B, T, proto::ServerTransaction>; 160 161 #[cfg(all(not(feature = "http1"), feature = "http2"))] 162 type Http1Dispatcher<T, B, S> = (Infallible, PhantomData<(T, Box<Pin<B>>, Box<Pin<S>>)>); 163 164 #[cfg(feature = "http2")] 165 type Http2Server<T, B, S, E> = proto::h2::Server<Rewind<T>, S, B, E>; 166 167 #[cfg(all(not(feature = "http2"), feature = "http1"))] 168 type Http2Server<T, B, S, E> = ( 169 Infallible, 170 PhantomData<(T, Box<Pin<S>>, Box<Pin<B>>, Box<Pin<E>>)>, 171 ); 172 173 #[cfg(any(feature = "http1", feature = "http2"))] 174 pin_project! { 175 #[project = ProtoServerProj] 176 pub(super) enum ProtoServer<T, B, S, E = Exec> 177 where 178 S: HttpService<Body>, 179 B: HttpBody, 180 { 181 H1 { 182 #[pin] 183 h1: Http1Dispatcher<T, B, S>, 184 }, 185 H2 { 186 #[pin] 187 h2: Http2Server<T, B, S, E>, 188 }, 189 } 190 } 191 192 #[cfg(all(feature = "http1", feature = "http2"))] 193 #[derive(Clone, Debug)] 194 enum Fallback<E> { 195 ToHttp2(proto::h2::server::Config, E), 196 Http1Only, 197 } 198 199 #[cfg(all( 200 any(feature = "http1", feature = "http2"), 201 not(all(feature = "http1", feature = "http2")) 202 ))] 203 type Fallback<E> = PhantomData<E>; 204 205 #[cfg(all(feature = "http1", feature = "http2"))] 206 impl<E> Fallback<E> { to_h2(&self) -> bool207 fn to_h2(&self) -> bool { 208 match *self { 209 Fallback::ToHttp2(..) => true, 210 Fallback::Http1Only => false, 211 } 212 } 213 } 214 215 #[cfg(all(feature = "http1", feature = "http2"))] 216 impl<E> Unpin for Fallback<E> {} 217 218 /// Deconstructed parts of a `Connection`. 219 /// 220 /// This allows taking apart a `Connection` at a later time, in order to 221 /// reclaim the IO object, and additional related pieces. 222 #[derive(Debug)] 223 #[cfg(any(feature = "http1", feature = "http2"))] 224 #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] 225 #[cfg_attr( 226 feature = "deprecated", 227 deprecated( 228 note = "This struct will be replaced with `server::conn::http1::Parts` in 1.0, enable the \"backports\" feature to use them now." 229 ) 230 )] 231 pub struct Parts<T, S> { 232 /// The original IO object used in the handshake. 233 pub io: T, 234 /// A buffer of bytes that have been read but not processed as HTTP. 235 /// 236 /// If the client sent additional bytes after its last request, and 237 /// this connection "ended" with an upgrade, the read buffer will contain 238 /// those bytes. 239 /// 240 /// You will want to check for any existing bytes if you plan to continue 241 /// communicating on the IO object. 242 pub read_buf: Bytes, 243 /// The `Service` used to serve this connection. 244 pub service: S, 245 _inner: (), 246 } 247 248 // ===== impl Http ===== 249 250 #[cfg_attr(feature = "deprecated", allow(deprecated))] 251 #[cfg(any(feature = "http1", feature = "http2"))] 252 impl Http { 253 /// Creates a new instance of the HTTP protocol, ready to spawn a server or 254 /// start accepting connections. new() -> Http255 pub fn new() -> Http { 256 Http { 257 exec: Exec::Default, 258 h1_half_close: false, 259 h1_keep_alive: true, 260 h1_title_case_headers: false, 261 h1_preserve_header_case: false, 262 #[cfg(all(feature = "http1", feature = "runtime"))] 263 h1_header_read_timeout: None, 264 h1_writev: None, 265 #[cfg(feature = "http2")] 266 h2_builder: Default::default(), 267 mode: ConnectionMode::default(), 268 max_buf_size: None, 269 pipeline_flush: false, 270 } 271 } 272 } 273 274 #[cfg_attr(feature = "deprecated", allow(deprecated))] 275 #[cfg(any(feature = "http1", feature = "http2"))] 276 impl<E> Http<E> { 277 /// Sets whether HTTP1 is required. 278 /// 279 /// Default is false 280 #[cfg(feature = "http1")] 281 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] http1_only(&mut self, val: bool) -> &mut Self282 pub fn http1_only(&mut self, val: bool) -> &mut Self { 283 if val { 284 self.mode = ConnectionMode::H1Only; 285 } else { 286 #[cfg(feature = "http2")] 287 { 288 self.mode = ConnectionMode::Fallback; 289 } 290 } 291 self 292 } 293 294 /// Set whether HTTP/1 connections should support half-closures. 295 /// 296 /// Clients can chose to shutdown their write-side while waiting 297 /// for the server to respond. Setting this to `true` will 298 /// prevent closing the connection immediately if `read` 299 /// detects an EOF in the middle of a request. 300 /// 301 /// Default is `false`. 302 #[cfg(feature = "http1")] 303 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] http1_half_close(&mut self, val: bool) -> &mut Self304 pub fn http1_half_close(&mut self, val: bool) -> &mut Self { 305 self.h1_half_close = val; 306 self 307 } 308 309 /// Enables or disables HTTP/1 keep-alive. 310 /// 311 /// Default is true. 312 #[cfg(feature = "http1")] 313 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] http1_keep_alive(&mut self, val: bool) -> &mut Self314 pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self { 315 self.h1_keep_alive = val; 316 self 317 } 318 319 /// Set whether HTTP/1 connections will write header names as title case at 320 /// the socket level. 321 /// 322 /// Note that this setting does not affect HTTP/2. 323 /// 324 /// Default is false. 325 #[cfg(feature = "http1")] 326 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] http1_title_case_headers(&mut self, enabled: bool) -> &mut Self327 pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Self { 328 self.h1_title_case_headers = enabled; 329 self 330 } 331 332 /// Set whether to support preserving original header cases. 333 /// 334 /// Currently, this will record the original cases received, and store them 335 /// in a private extension on the `Request`. It will also look for and use 336 /// such an extension in any provided `Response`. 337 /// 338 /// Since the relevant extension is still private, there is no way to 339 /// interact with the original cases. The only effect this can have now is 340 /// to forward the cases in a proxy-like fashion. 341 /// 342 /// Note that this setting does not affect HTTP/2. 343 /// 344 /// Default is false. 345 #[cfg(feature = "http1")] 346 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self347 pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self { 348 self.h1_preserve_header_case = enabled; 349 self 350 } 351 352 /// Set a timeout for reading client request headers. If a client does not 353 /// transmit the entire header within this time, the connection is closed. 354 /// 355 /// Default is None. 356 #[cfg(all(feature = "http1", feature = "runtime"))] 357 #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))] http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self358 pub fn http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self { 359 self.h1_header_read_timeout = Some(read_timeout); 360 self 361 } 362 363 /// Set whether HTTP/1 connections should try to use vectored writes, 364 /// or always flatten into a single buffer. 365 /// 366 /// Note that setting this to false may mean more copies of body data, 367 /// but may also improve performance when an IO transport doesn't 368 /// support vectored writes well, such as most TLS implementations. 369 /// 370 /// Setting this to true will force hyper to use queued strategy 371 /// which may eliminate unnecessary cloning on some TLS backends 372 /// 373 /// Default is `auto`. In this mode hyper will try to guess which 374 /// mode to use 375 #[inline] 376 #[cfg(feature = "http1")] 377 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] http1_writev(&mut self, val: bool) -> &mut Self378 pub fn http1_writev(&mut self, val: bool) -> &mut Self { 379 self.h1_writev = Some(val); 380 self 381 } 382 383 /// Sets whether HTTP2 is required. 384 /// 385 /// Default is false 386 #[cfg(feature = "http2")] 387 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] http2_only(&mut self, val: bool) -> &mut Self388 pub fn http2_only(&mut self, val: bool) -> &mut Self { 389 if val { 390 self.mode = ConnectionMode::H2Only; 391 } else { 392 #[cfg(feature = "http1")] 393 { 394 self.mode = ConnectionMode::Fallback; 395 } 396 } 397 self 398 } 399 400 /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent. 401 /// 402 /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2). 403 /// As of v0.3.17, it is 20. 404 /// 405 /// See <https://github.com/hyperium/hyper/issues/2877> for more information. 406 #[cfg(feature = "http2")] 407 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] http2_max_pending_accept_reset_streams( &mut self, max: impl Into<Option<usize>>, ) -> &mut Self408 pub fn http2_max_pending_accept_reset_streams( 409 &mut self, 410 max: impl Into<Option<usize>>, 411 ) -> &mut Self { 412 self.h2_builder.max_pending_accept_reset_streams = max.into(); 413 414 self 415 } 416 417 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 418 /// stream-level flow control. 419 /// 420 /// Passing `None` will do nothing. 421 /// 422 /// If not set, hyper will use a default. 423 /// 424 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE 425 #[cfg(feature = "http2")] 426 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self427 pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { 428 if let Some(sz) = sz.into() { 429 self.h2_builder.adaptive_window = false; 430 self.h2_builder.initial_stream_window_size = sz; 431 } 432 self 433 } 434 435 /// Sets the max connection-level flow control for HTTP2. 436 /// 437 /// Passing `None` will do nothing. 438 /// 439 /// If not set, hyper will use a default. 440 #[cfg(feature = "http2")] 441 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] http2_initial_connection_window_size( &mut self, sz: impl Into<Option<u32>>, ) -> &mut Self442 pub fn http2_initial_connection_window_size( 443 &mut self, 444 sz: impl Into<Option<u32>>, 445 ) -> &mut Self { 446 if let Some(sz) = sz.into() { 447 self.h2_builder.adaptive_window = false; 448 self.h2_builder.initial_conn_window_size = sz; 449 } 450 self 451 } 452 453 /// Sets whether to use an adaptive flow control. 454 /// 455 /// Enabling this will override the limits set in 456 /// `http2_initial_stream_window_size` and 457 /// `http2_initial_connection_window_size`. 458 #[cfg(feature = "http2")] 459 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] http2_adaptive_window(&mut self, enabled: bool) -> &mut Self460 pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { 461 use proto::h2::SPEC_WINDOW_SIZE; 462 463 self.h2_builder.adaptive_window = enabled; 464 if enabled { 465 self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; 466 self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; 467 } 468 self 469 } 470 471 /// Sets the maximum frame size to use for HTTP2. 472 /// 473 /// Passing `None` will do nothing. 474 /// 475 /// If not set, hyper will use a default. 476 #[cfg(feature = "http2")] 477 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self478 pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { 479 if let Some(sz) = sz.into() { 480 self.h2_builder.max_frame_size = sz; 481 } 482 self 483 } 484 485 /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 486 /// connections. 487 /// 488 /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing. 489 /// 490 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS 491 #[cfg(feature = "http2")] 492 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self493 pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self { 494 self.h2_builder.max_concurrent_streams = max.into(); 495 self 496 } 497 498 /// Sets an interval for HTTP2 Ping frames should be sent to keep a 499 /// connection alive. 500 /// 501 /// Pass `None` to disable HTTP2 keep-alive. 502 /// 503 /// Default is currently disabled. 504 /// 505 /// # Cargo Feature 506 /// 507 /// Requires the `runtime` cargo feature to be enabled. 508 #[cfg(feature = "runtime")] 509 #[cfg(feature = "http2")] 510 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] http2_keep_alive_interval( &mut self, interval: impl Into<Option<Duration>>, ) -> &mut Self511 pub fn http2_keep_alive_interval( 512 &mut self, 513 interval: impl Into<Option<Duration>>, 514 ) -> &mut Self { 515 self.h2_builder.keep_alive_interval = interval.into(); 516 self 517 } 518 519 /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. 520 /// 521 /// If the ping is not acknowledged within the timeout, the connection will 522 /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. 523 /// 524 /// Default is 20 seconds. 525 /// 526 /// # Cargo Feature 527 /// 528 /// Requires the `runtime` cargo feature to be enabled. 529 #[cfg(feature = "runtime")] 530 #[cfg(feature = "http2")] 531 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self532 pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { 533 self.h2_builder.keep_alive_timeout = timeout; 534 self 535 } 536 537 /// Set the maximum write buffer size for each HTTP/2 stream. 538 /// 539 /// Default is currently ~400KB, but may change. 540 /// 541 /// # Panics 542 /// 543 /// The value must be no larger than `u32::MAX`. 544 #[cfg(feature = "http2")] 545 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] http2_max_send_buf_size(&mut self, max: usize) -> &mut Self546 pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self { 547 assert!(max <= std::u32::MAX as usize); 548 self.h2_builder.max_send_buffer_size = max; 549 self 550 } 551 552 /// Enables the [extended CONNECT protocol]. 553 /// 554 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 555 #[cfg(feature = "http2")] http2_enable_connect_protocol(&mut self) -> &mut Self556 pub fn http2_enable_connect_protocol(&mut self) -> &mut Self { 557 self.h2_builder.enable_connect_protocol = true; 558 self 559 } 560 561 /// Sets the max size of received header frames. 562 /// 563 /// Default is currently ~16MB, but may change. 564 #[cfg(feature = "http2")] 565 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] http2_max_header_list_size(&mut self, max: u32) -> &mut Self566 pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self { 567 self.h2_builder.max_header_list_size = max; 568 self 569 } 570 571 /// Set the maximum buffer size for the connection. 572 /// 573 /// Default is ~400kb. 574 /// 575 /// # Panics 576 /// 577 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. 578 #[cfg(feature = "http1")] 579 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] max_buf_size(&mut self, max: usize) -> &mut Self580 pub fn max_buf_size(&mut self, max: usize) -> &mut Self { 581 assert!( 582 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE, 583 "the max_buf_size cannot be smaller than the minimum that h1 specifies." 584 ); 585 self.max_buf_size = Some(max); 586 self 587 } 588 589 /// Aggregates flushes to better support pipelined responses. 590 /// 591 /// Experimental, may have bugs. 592 /// 593 /// Default is false. pipeline_flush(&mut self, enabled: bool) -> &mut Self594 pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self { 595 self.pipeline_flush = enabled; 596 self 597 } 598 599 /// Set the executor used to spawn background tasks. 600 /// 601 /// Default uses implicit default (like `tokio::spawn`). with_executor<E2>(self, exec: E2) -> Http<E2>602 pub fn with_executor<E2>(self, exec: E2) -> Http<E2> { 603 Http { 604 exec, 605 h1_half_close: self.h1_half_close, 606 h1_keep_alive: self.h1_keep_alive, 607 h1_title_case_headers: self.h1_title_case_headers, 608 h1_preserve_header_case: self.h1_preserve_header_case, 609 #[cfg(all(feature = "http1", feature = "runtime"))] 610 h1_header_read_timeout: self.h1_header_read_timeout, 611 h1_writev: self.h1_writev, 612 #[cfg(feature = "http2")] 613 h2_builder: self.h2_builder, 614 mode: self.mode, 615 max_buf_size: self.max_buf_size, 616 pipeline_flush: self.pipeline_flush, 617 } 618 } 619 620 /// Bind a connection together with a [`Service`](crate::service::Service). 621 /// 622 /// This returns a Future that must be polled in order for HTTP to be 623 /// driven on the connection. 624 /// 625 /// # Example 626 /// 627 /// ``` 628 /// # use hyper::{Body, Request, Response}; 629 /// # use hyper::service::Service; 630 /// # use hyper::server::conn::Http; 631 /// # use tokio::io::{AsyncRead, AsyncWrite}; 632 /// # async fn run<I, S>(some_io: I, some_service: S) 633 /// # where 634 /// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static, 635 /// # S: Service<hyper::Request<Body>, Response=hyper::Response<Body>> + Send + 'static, 636 /// # S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, 637 /// # S::Future: Send, 638 /// # { 639 /// let http = Http::new(); 640 /// let conn = http.serve_connection(some_io, some_service); 641 /// 642 /// if let Err(e) = conn.await { 643 /// eprintln!("server connection error: {}", e); 644 /// } 645 /// # } 646 /// # fn main() {} 647 /// ``` serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E> where S: HttpService<Body, ResBody = Bd>, S::Error: Into<Box<dyn StdError + Send + Sync>>, Bd: HttpBody + 'static, Bd::Error: Into<Box<dyn StdError + Send + Sync>>, I: AsyncRead + AsyncWrite + Unpin, E: ConnStreamExec<S::Future, Bd>,648 pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E> 649 where 650 S: HttpService<Body, ResBody = Bd>, 651 S::Error: Into<Box<dyn StdError + Send + Sync>>, 652 Bd: HttpBody + 'static, 653 Bd::Error: Into<Box<dyn StdError + Send + Sync>>, 654 I: AsyncRead + AsyncWrite + Unpin, 655 E: ConnStreamExec<S::Future, Bd>, 656 { 657 #[cfg(feature = "http1")] 658 macro_rules! h1 { 659 () => {{ 660 let mut conn = proto::Conn::new(io); 661 if !self.h1_keep_alive { 662 conn.disable_keep_alive(); 663 } 664 if self.h1_half_close { 665 conn.set_allow_half_close(); 666 } 667 if self.h1_title_case_headers { 668 conn.set_title_case_headers(); 669 } 670 if self.h1_preserve_header_case { 671 conn.set_preserve_header_case(); 672 } 673 #[cfg(all(feature = "http1", feature = "runtime"))] 674 if let Some(header_read_timeout) = self.h1_header_read_timeout { 675 conn.set_http1_header_read_timeout(header_read_timeout); 676 } 677 if let Some(writev) = self.h1_writev { 678 if writev { 679 conn.set_write_strategy_queue(); 680 } else { 681 conn.set_write_strategy_flatten(); 682 } 683 } 684 conn.set_flush_pipeline(self.pipeline_flush); 685 if let Some(max) = self.max_buf_size { 686 conn.set_max_buf_size(max); 687 } 688 let sd = proto::h1::dispatch::Server::new(service); 689 ProtoServer::H1 { 690 h1: proto::h1::Dispatcher::new(sd, conn), 691 } 692 }}; 693 } 694 695 let proto = match self.mode { 696 #[cfg(feature = "http1")] 697 #[cfg(not(feature = "http2"))] 698 ConnectionMode::H1Only => h1!(), 699 #[cfg(feature = "http2")] 700 #[cfg(feature = "http1")] 701 ConnectionMode::H1Only | ConnectionMode::Fallback => h1!(), 702 #[cfg(feature = "http2")] 703 ConnectionMode::H2Only => { 704 let rewind_io = Rewind::new(io); 705 let h2 = 706 proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone()); 707 ProtoServer::H2 { h2 } 708 } 709 }; 710 711 Connection { 712 conn: Some(proto), 713 #[cfg(all(feature = "http1", feature = "http2"))] 714 fallback: if self.mode == ConnectionMode::Fallback { 715 Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone()) 716 } else { 717 Fallback::Http1Only 718 }, 719 #[cfg(not(all(feature = "http1", feature = "http2")))] 720 fallback: PhantomData, 721 } 722 } 723 } 724 725 // ===== impl Connection ===== 726 727 #[cfg(any(feature = "http1", feature = "http2"))] 728 impl<I, B, S, E> Connection<I, S, E> 729 where 730 S: HttpService<Body, ResBody = B>, 731 S::Error: Into<Box<dyn StdError + Send + Sync>>, 732 I: AsyncRead + AsyncWrite + Unpin, 733 B: HttpBody + 'static, 734 B::Error: Into<Box<dyn StdError + Send + Sync>>, 735 E: ConnStreamExec<S::Future, B>, 736 { 737 /// Start a graceful shutdown process for this connection. 738 /// 739 /// This `Connection` should continue to be polled until shutdown 740 /// can finish. 741 /// 742 /// # Note 743 /// 744 /// This should only be called while the `Connection` future is still 745 /// pending. If called after `Connection::poll` has resolved, this does 746 /// nothing. graceful_shutdown(mut self: Pin<&mut Self>)747 pub fn graceful_shutdown(mut self: Pin<&mut Self>) { 748 match self.conn { 749 #[cfg(feature = "http1")] 750 Some(ProtoServer::H1 { ref mut h1, .. }) => { 751 h1.disable_keep_alive(); 752 } 753 #[cfg(feature = "http2")] 754 Some(ProtoServer::H2 { ref mut h2 }) => { 755 h2.graceful_shutdown(); 756 } 757 None => (), 758 759 #[cfg(not(feature = "http1"))] 760 Some(ProtoServer::H1 { ref mut h1, .. }) => match h1.0 {}, 761 #[cfg(not(feature = "http2"))] 762 Some(ProtoServer::H2 { ref mut h2 }) => match h2.0 {}, 763 } 764 } 765 766 /// Return the inner IO object, and additional information. 767 /// 768 /// If the IO object has been "rewound" the io will not contain those bytes rewound. 769 /// This should only be called after `poll_without_shutdown` signals 770 /// that the connection is "done". Otherwise, it may not have finished 771 /// flushing all necessary HTTP bytes. 772 /// 773 /// # Panics 774 /// This method will panic if this connection is using an h2 protocol. 775 #[cfg_attr(feature = "deprecated", allow(deprecated))] into_parts(self) -> Parts<I, S>776 pub fn into_parts(self) -> Parts<I, S> { 777 self.try_into_parts() 778 .unwrap_or_else(|| panic!("h2 cannot into_inner")) 779 } 780 781 /// Return the inner IO object, and additional information, if available. 782 /// 783 /// This method will return a `None` if this connection is using an h2 protocol. 784 #[cfg_attr(feature = "deprecated", allow(deprecated))] try_into_parts(self) -> Option<Parts<I, S>>785 pub fn try_into_parts(self) -> Option<Parts<I, S>> { 786 match self.conn.unwrap() { 787 #[cfg(feature = "http1")] 788 ProtoServer::H1 { h1, .. } => { 789 let (io, read_buf, dispatch) = h1.into_inner(); 790 Some(Parts { 791 io, 792 read_buf, 793 service: dispatch.into_service(), 794 _inner: (), 795 }) 796 } 797 ProtoServer::H2 { .. } => None, 798 799 #[cfg(not(feature = "http1"))] 800 ProtoServer::H1 { h1, .. } => match h1.0 {}, 801 } 802 } 803 804 /// Poll the connection for completion, but without calling `shutdown` 805 /// on the underlying IO. 806 /// 807 /// This is useful to allow running a connection while doing an HTTP 808 /// upgrade. Once the upgrade is completed, the connection would be "done", 809 /// but it is not desired to actually shutdown the IO object. Instead you 810 /// would take it back using `into_parts`. poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>811 pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { 812 loop { 813 match *self.conn.as_mut().unwrap() { 814 #[cfg(feature = "http1")] 815 ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) { 816 Ok(()) => return Poll::Ready(Ok(())), 817 Err(e) => { 818 #[cfg(feature = "http2")] 819 match *e.kind() { 820 Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => { 821 self.upgrade_h2(); 822 continue; 823 } 824 _ => (), 825 } 826 827 return Poll::Ready(Err(e)); 828 } 829 }, 830 #[cfg(feature = "http2")] 831 ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()), 832 833 #[cfg(not(feature = "http1"))] 834 ProtoServer::H1 { ref mut h1, .. } => match h1.0 {}, 835 #[cfg(not(feature = "http2"))] 836 ProtoServer::H2 { ref mut h2 } => match h2.0 {}, 837 }; 838 } 839 } 840 841 /// Prevent shutdown of the underlying IO object at the end of service the request, 842 /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. 843 /// 844 /// # Error 845 /// 846 /// This errors if the underlying connection protocol is not HTTP/1. 847 #[cfg_attr(feature = "deprecated", allow(deprecated))] without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>848 pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>> { 849 let mut conn = Some(self); 850 futures_util::future::poll_fn(move |cx| { 851 ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?; 852 Poll::Ready( 853 conn.take() 854 .unwrap() 855 .try_into_parts() 856 .ok_or_else(crate::Error::new_without_shutdown_not_h1), 857 ) 858 }) 859 } 860 861 #[cfg(all(feature = "http1", feature = "http2"))] upgrade_h2(&mut self)862 fn upgrade_h2(&mut self) { 863 trace!("Trying to upgrade connection to h2"); 864 let conn = self.conn.take(); 865 866 let (io, read_buf, dispatch) = match conn.unwrap() { 867 ProtoServer::H1 { h1, .. } => h1.into_inner(), 868 ProtoServer::H2 { .. } => { 869 panic!("h2 cannot into_inner"); 870 } 871 }; 872 let mut rewind_io = Rewind::new(io); 873 rewind_io.rewind(read_buf); 874 let (builder, exec) = match self.fallback { 875 Fallback::ToHttp2(ref builder, ref exec) => (builder, exec), 876 Fallback::Http1Only => unreachable!("upgrade_h2 with Fallback::Http1Only"), 877 }; 878 let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone()); 879 880 debug_assert!(self.conn.is_none()); 881 self.conn = Some(ProtoServer::H2 { h2 }); 882 } 883 884 /// Enable this connection to support higher-level HTTP upgrades. 885 /// 886 /// See [the `upgrade` module](crate::upgrade) for more. with_upgrades(self) -> UpgradeableConnection<I, S, E> where I: Send,887 pub fn with_upgrades(self) -> UpgradeableConnection<I, S, E> 888 where 889 I: Send, 890 { 891 UpgradeableConnection { inner: self } 892 } 893 } 894 895 #[cfg(any(feature = "http1", feature = "http2"))] 896 impl<I, B, S, E> Future for Connection<I, S, E> 897 where 898 S: HttpService<Body, ResBody = B>, 899 S::Error: Into<Box<dyn StdError + Send + Sync>>, 900 I: AsyncRead + AsyncWrite + Unpin, 901 B: HttpBody + 'static, 902 B::Error: Into<Box<dyn StdError + Send + Sync>>, 903 E: ConnStreamExec<S::Future, B>, 904 { 905 type Output = crate::Result<()>; 906 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>907 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 908 loop { 909 match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) { 910 Ok(done) => { 911 match done { 912 proto::Dispatched::Shutdown => {} 913 #[cfg(feature = "http1")] 914 proto::Dispatched::Upgrade(pending) => { 915 // With no `Send` bound on `I`, we can't try to do 916 // upgrades here. In case a user was trying to use 917 // `Body::on_upgrade` with this API, send a special 918 // error letting them know about that. 919 pending.manual(); 920 } 921 }; 922 return Poll::Ready(Ok(())); 923 } 924 Err(e) => { 925 #[cfg(feature = "http1")] 926 #[cfg(feature = "http2")] 927 match *e.kind() { 928 Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => { 929 self.upgrade_h2(); 930 continue; 931 } 932 _ => (), 933 } 934 935 return Poll::Ready(Err(e)); 936 } 937 } 938 } 939 } 940 } 941 942 #[cfg(any(feature = "http1", feature = "http2"))] 943 impl<I, S> fmt::Debug for Connection<I, S> 944 where 945 S: HttpService<Body>, 946 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result947 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 948 f.debug_struct("Connection").finish() 949 } 950 } 951 952 // ===== impl ConnectionMode ===== 953 954 #[cfg(any(feature = "http1", feature = "http2"))] 955 impl Default for ConnectionMode { 956 #[cfg(all(feature = "http1", feature = "http2"))] default() -> ConnectionMode957 fn default() -> ConnectionMode { 958 ConnectionMode::Fallback 959 } 960 961 #[cfg(all(feature = "http1", not(feature = "http2")))] default() -> ConnectionMode962 fn default() -> ConnectionMode { 963 ConnectionMode::H1Only 964 } 965 966 #[cfg(all(not(feature = "http1"), feature = "http2"))] default() -> ConnectionMode967 fn default() -> ConnectionMode { 968 ConnectionMode::H2Only 969 } 970 } 971 972 // ===== impl ProtoServer ===== 973 974 #[cfg(any(feature = "http1", feature = "http2"))] 975 impl<T, B, S, E> Future for ProtoServer<T, B, S, E> 976 where 977 T: AsyncRead + AsyncWrite + Unpin, 978 S: HttpService<Body, ResBody = B>, 979 S::Error: Into<Box<dyn StdError + Send + Sync>>, 980 B: HttpBody + 'static, 981 B::Error: Into<Box<dyn StdError + Send + Sync>>, 982 E: ConnStreamExec<S::Future, B>, 983 { 984 type Output = crate::Result<proto::Dispatched>; 985 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>986 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 987 match self.project() { 988 #[cfg(feature = "http1")] 989 ProtoServerProj::H1 { h1, .. } => h1.poll(cx), 990 #[cfg(feature = "http2")] 991 ProtoServerProj::H2 { h2 } => h2.poll(cx), 992 993 #[cfg(not(feature = "http1"))] 994 ProtoServerProj::H1 { h1, .. } => match h1.0 {}, 995 #[cfg(not(feature = "http2"))] 996 ProtoServerProj::H2 { h2 } => match h2.0 {}, 997 } 998 } 999 } 1000 1001 #[cfg(any(feature = "http1", feature = "http2"))] 1002 mod upgrades { 1003 use super::*; 1004 1005 // A future binding a connection with a Service with Upgrade support. 1006 // 1007 // This type is unnameable outside the crate, and so basically just an 1008 // `impl Future`, without requiring Rust 1.26. 1009 #[must_use = "futures do nothing unless polled"] 1010 #[allow(missing_debug_implementations)] 1011 pub struct UpgradeableConnection<T, S, E> 1012 where 1013 S: HttpService<Body>, 1014 { 1015 pub(super) inner: Connection<T, S, E>, 1016 } 1017 1018 impl<I, B, S, E> UpgradeableConnection<I, S, E> 1019 where 1020 S: HttpService<Body, ResBody = B>, 1021 S::Error: Into<Box<dyn StdError + Send + Sync>>, 1022 I: AsyncRead + AsyncWrite + Unpin, 1023 B: HttpBody + 'static, 1024 B::Error: Into<Box<dyn StdError + Send + Sync>>, 1025 E: ConnStreamExec<S::Future, B>, 1026 { 1027 /// Start a graceful shutdown process for this connection. 1028 /// 1029 /// This `Connection` should continue to be polled until shutdown 1030 /// can finish. graceful_shutdown(mut self: Pin<&mut Self>)1031 pub fn graceful_shutdown(mut self: Pin<&mut Self>) { 1032 Pin::new(&mut self.inner).graceful_shutdown() 1033 } 1034 } 1035 1036 impl<I, B, S, E> Future for UpgradeableConnection<I, S, E> 1037 where 1038 S: HttpService<Body, ResBody = B>, 1039 S::Error: Into<Box<dyn StdError + Send + Sync>>, 1040 I: AsyncRead + AsyncWrite + Unpin + Send + 'static, 1041 B: HttpBody + 'static, 1042 B::Error: Into<Box<dyn StdError + Send + Sync>>, 1043 E: ConnStreamExec<S::Future, B>, 1044 { 1045 type Output = crate::Result<()>; 1046 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1047 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 1048 loop { 1049 match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) { 1050 Ok(proto::Dispatched::Shutdown) => return Poll::Ready(Ok(())), 1051 #[cfg(feature = "http1")] 1052 Ok(proto::Dispatched::Upgrade(pending)) => { 1053 match self.inner.conn.take() { 1054 Some(ProtoServer::H1 { h1, .. }) => { 1055 let (io, buf, _) = h1.into_inner(); 1056 pending.fulfill(Upgraded::new(io, buf)); 1057 return Poll::Ready(Ok(())); 1058 } 1059 _ => { 1060 drop(pending); 1061 unreachable!("Upgrade expects h1") 1062 } 1063 }; 1064 } 1065 Err(e) => { 1066 #[cfg(feature = "http1")] 1067 #[cfg(feature = "http2")] 1068 match *e.kind() { 1069 Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => { 1070 self.inner.upgrade_h2(); 1071 continue; 1072 } 1073 _ => (), 1074 } 1075 1076 return Poll::Ready(Err(e)); 1077 } 1078 } 1079 } 1080 } 1081 } 1082 } 1083