1 //! HTTP/1 Server Connections 2 3 use std::error::Error as StdError; 4 use std::fmt; 5 use std::future::Future; 6 use std::marker::Unpin; 7 use std::pin::Pin; 8 use std::task::{Context, Poll}; 9 use std::time::Duration; 10 11 use bytes::Bytes; 12 use tokio::io::{AsyncRead, AsyncWrite}; 13 14 use crate::body::{Body as IncomingBody, HttpBody as Body}; 15 use crate::proto; 16 use crate::service::HttpService; 17 18 type Http1Dispatcher<T, B, S> = proto::h1::Dispatcher< 19 proto::h1::dispatch::Server<S, IncomingBody>, 20 B, 21 T, 22 proto::ServerTransaction, 23 >; 24 25 pin_project_lite::pin_project! { 26 /// A future binding an http1 connection with a Service. 27 /// 28 /// Polling this future will drive HTTP forward. 29 #[must_use = "futures do nothing unless polled"] 30 pub struct Connection<T, S> 31 where 32 S: HttpService<IncomingBody>, 33 { 34 conn: Http1Dispatcher<T, S::ResBody, S>, 35 } 36 } 37 38 /// A configuration builder for HTTP/1 server connections. 39 #[derive(Clone, Debug)] 40 pub struct Builder { 41 h1_half_close: bool, 42 h1_keep_alive: bool, 43 h1_title_case_headers: bool, 44 h1_preserve_header_case: bool, 45 h1_header_read_timeout: Option<Duration>, 46 h1_writev: Option<bool>, 47 max_buf_size: Option<usize>, 48 pipeline_flush: bool, 49 } 50 51 /// Deconstructed parts of a `Connection`. 52 /// 53 /// This allows taking apart a `Connection` at a later time, in order to 54 /// reclaim the IO object, and additional related pieces. 55 #[derive(Debug)] 56 pub struct Parts<T, S> { 57 /// The original IO object used in the handshake. 58 pub io: T, 59 /// A buffer of bytes that have been read but not processed as HTTP. 60 /// 61 /// If the client sent additional bytes after its last request, and 62 /// this connection "ended" with an upgrade, the read buffer will contain 63 /// those bytes. 64 /// 65 /// You will want to check for any existing bytes if you plan to continue 66 /// communicating on the IO object. 67 pub read_buf: Bytes, 68 /// The `Service` used to serve this connection. 69 pub service: S, 70 _inner: (), 71 } 72 73 // ===== impl Connection ===== 74 75 impl<I, S> fmt::Debug for Connection<I, S> 76 where 77 S: HttpService<IncomingBody>, 78 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result79 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 80 f.debug_struct("Connection").finish() 81 } 82 } 83 84 impl<I, B, S> Connection<I, S> 85 where 86 S: HttpService<IncomingBody, ResBody = B>, 87 S::Error: Into<Box<dyn StdError + Send + Sync>>, 88 I: AsyncRead + AsyncWrite + Unpin, 89 B: Body + 'static, 90 B::Error: Into<Box<dyn StdError + Send + Sync>>, 91 { 92 /// Start a graceful shutdown process for this connection. 93 /// 94 /// This `Connection` should continue to be polled until shutdown 95 /// can finish. 96 /// 97 /// # Note 98 /// 99 /// This should only be called while the `Connection` future is still 100 /// pending. If called after `Connection::poll` has resolved, this does 101 /// nothing. graceful_shutdown(mut self: Pin<&mut Self>)102 pub fn graceful_shutdown(mut self: Pin<&mut Self>) { 103 self.conn.disable_keep_alive(); 104 } 105 106 /// Return the inner IO object, and additional information. 107 /// 108 /// If the IO object has been "rewound" the io will not contain those bytes rewound. 109 /// This should only be called after `poll_without_shutdown` signals 110 /// that the connection is "done". Otherwise, it may not have finished 111 /// flushing all necessary HTTP bytes. 112 /// 113 /// # Panics 114 /// This method will panic if this connection is using an h2 protocol. into_parts(self) -> Parts<I, S>115 pub fn into_parts(self) -> Parts<I, S> { 116 let (io, read_buf, dispatch) = self.conn.into_inner(); 117 Parts { 118 io, 119 read_buf, 120 service: dispatch.into_service(), 121 _inner: (), 122 } 123 } 124 125 /// Poll the connection for completion, but without calling `shutdown` 126 /// on the underlying IO. 127 /// 128 /// This is useful to allow running a connection while doing an HTTP 129 /// upgrade. Once the upgrade is completed, the connection would be "done", 130 /// but it is not desired to actually shutdown the IO object. Instead you 131 /// would take it back using `into_parts`. poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> where S: Unpin, S::Future: Unpin, B: Unpin,132 pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> 133 where 134 S: Unpin, 135 S::Future: Unpin, 136 B: Unpin, 137 { 138 self.conn.poll_without_shutdown(cx) 139 } 140 141 /// Prevent shutdown of the underlying IO object at the end of service the request, 142 /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. 143 /// 144 /// # Error 145 /// 146 /// This errors if the underlying connection protocol is not HTTP/1. without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>> where S: Unpin, S::Future: Unpin, B: Unpin,147 pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>> 148 where 149 S: Unpin, 150 S::Future: Unpin, 151 B: Unpin, 152 { 153 let mut zelf = Some(self); 154 futures_util::future::poll_fn(move |cx| { 155 ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?; 156 Poll::Ready(Ok(zelf.take().unwrap().into_parts())) 157 }) 158 } 159 160 /// Enable this connection to support higher-level HTTP upgrades. 161 /// 162 /// See [the `upgrade` module](crate::upgrade) for more. with_upgrades(self) -> upgrades::UpgradeableConnection<I, S> where I: Send,163 pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<I, S> 164 where 165 I: Send, 166 { 167 upgrades::UpgradeableConnection { inner: Some(self) } 168 } 169 } 170 171 impl<I, B, S> Future for Connection<I, S> 172 where 173 S: HttpService<IncomingBody, ResBody = B>, 174 S::Error: Into<Box<dyn StdError + Send + Sync>>, 175 I: AsyncRead + AsyncWrite + Unpin + 'static, 176 B: Body + 'static, 177 B::Error: Into<Box<dyn StdError + Send + Sync>>, 178 { 179 type Output = crate::Result<()>; 180 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>181 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 182 match ready!(Pin::new(&mut self.conn).poll(cx)) { 183 Ok(done) => { 184 match done { 185 proto::Dispatched::Shutdown => {} 186 proto::Dispatched::Upgrade(pending) => { 187 // With no `Send` bound on `I`, we can't try to do 188 // upgrades here. In case a user was trying to use 189 // `Body::on_upgrade` with this API, send a special 190 // error letting them know about that. 191 pending.manual(); 192 } 193 }; 194 return Poll::Ready(Ok(())); 195 } 196 Err(e) => Poll::Ready(Err(e)), 197 } 198 } 199 } 200 201 // ===== impl Builder ===== 202 203 impl Builder { 204 /// Create a new connection builder. new() -> Self205 pub fn new() -> Self { 206 Self { 207 h1_half_close: false, 208 h1_keep_alive: true, 209 h1_title_case_headers: false, 210 h1_preserve_header_case: false, 211 h1_header_read_timeout: None, 212 h1_writev: None, 213 max_buf_size: None, 214 pipeline_flush: false, 215 } 216 } 217 /// Set whether HTTP/1 connections should support half-closures. 218 /// 219 /// Clients can chose to shutdown their write-side while waiting 220 /// for the server to respond. Setting this to `true` will 221 /// prevent closing the connection immediately if `read` 222 /// detects an EOF in the middle of a request. 223 /// 224 /// Default is `false`. half_close(&mut self, val: bool) -> &mut Self225 pub fn half_close(&mut self, val: bool) -> &mut Self { 226 self.h1_half_close = val; 227 self 228 } 229 230 /// Enables or disables HTTP/1 keep-alive. 231 /// 232 /// Default is true. keep_alive(&mut self, val: bool) -> &mut Self233 pub fn keep_alive(&mut self, val: bool) -> &mut Self { 234 self.h1_keep_alive = val; 235 self 236 } 237 238 /// Set whether HTTP/1 connections will write header names as title case at 239 /// the socket level. 240 /// 241 /// Default is false. title_case_headers(&mut self, enabled: bool) -> &mut Self242 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self { 243 self.h1_title_case_headers = enabled; 244 self 245 } 246 247 /// Set whether to support preserving original header cases. 248 /// 249 /// Currently, this will record the original cases received, and store them 250 /// in a private extension on the `Request`. It will also look for and use 251 /// such an extension in any provided `Response`. 252 /// 253 /// Since the relevant extension is still private, there is no way to 254 /// interact with the original cases. The only effect this can have now is 255 /// to forward the cases in a proxy-like fashion. 256 /// 257 /// Default is false. preserve_header_case(&mut self, enabled: bool) -> &mut Self258 pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self { 259 self.h1_preserve_header_case = enabled; 260 self 261 } 262 263 /// Set a timeout for reading client request headers. If a client does not 264 /// transmit the entire header within this time, the connection is closed. 265 /// 266 /// Default is None. header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self267 pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self { 268 self.h1_header_read_timeout = Some(read_timeout); 269 self 270 } 271 272 /// Set whether HTTP/1 connections should try to use vectored writes, 273 /// or always flatten into a single buffer. 274 /// 275 /// Note that setting this to false may mean more copies of body data, 276 /// but may also improve performance when an IO transport doesn't 277 /// support vectored writes well, such as most TLS implementations. 278 /// 279 /// Setting this to true will force hyper to use queued strategy 280 /// which may eliminate unnecessary cloning on some TLS backends 281 /// 282 /// Default is `auto`. In this mode hyper will try to guess which 283 /// mode to use writev(&mut self, val: bool) -> &mut Self284 pub fn writev(&mut self, val: bool) -> &mut Self { 285 self.h1_writev = Some(val); 286 self 287 } 288 289 /// Set the maximum buffer size for the connection. 290 /// 291 /// Default is ~400kb. 292 /// 293 /// # Panics 294 /// 295 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. max_buf_size(&mut self, max: usize) -> &mut Self296 pub fn max_buf_size(&mut self, max: usize) -> &mut Self { 297 assert!( 298 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE, 299 "the max_buf_size cannot be smaller than the minimum that h1 specifies." 300 ); 301 self.max_buf_size = Some(max); 302 self 303 } 304 305 /// Aggregates flushes to better support pipelined responses. 306 /// 307 /// Experimental, may have bugs. 308 /// 309 /// Default is false. pipeline_flush(&mut self, enabled: bool) -> &mut Self310 pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self { 311 self.pipeline_flush = enabled; 312 self 313 } 314 315 // /// Set the timer used in background tasks. 316 // pub fn timer<M>(&mut self, timer: M) -> &mut Self 317 // where 318 // M: Timer + Send + Sync + 'static, 319 // { 320 // self.timer = Time::Timer(Arc::new(timer)); 321 // self 322 // } 323 324 /// Bind a connection together with a [`Service`](crate::service::Service). 325 /// 326 /// This returns a Future that must be polled in order for HTTP to be 327 /// driven on the connection. 328 /// 329 /// # Example 330 /// 331 /// ``` 332 /// # use hyper::{Body as Incoming, Request, Response}; 333 /// # use hyper::service::Service; 334 /// # use hyper::server::conn::http1::Builder; 335 /// # use tokio::io::{AsyncRead, AsyncWrite}; 336 /// # async fn run<I, S>(some_io: I, some_service: S) 337 /// # where 338 /// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static, 339 /// # S: Service<hyper::Request<Incoming>, Response=hyper::Response<Incoming>> + Send + 'static, 340 /// # S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, 341 /// # S::Future: Send, 342 /// # { 343 /// let http = Builder::new(); 344 /// let conn = http.serve_connection(some_io, some_service); 345 /// 346 /// if let Err(e) = conn.await { 347 /// eprintln!("server connection error: {}", e); 348 /// } 349 /// # } 350 /// # fn main() {} 351 /// ``` serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S> where S: HttpService<IncomingBody>, S::Error: Into<Box<dyn StdError + Send + Sync>>, S::ResBody: 'static, <S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>, I: AsyncRead + AsyncWrite + Unpin,352 pub fn serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S> 353 where 354 S: HttpService<IncomingBody>, 355 S::Error: Into<Box<dyn StdError + Send + Sync>>, 356 S::ResBody: 'static, 357 <S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>, 358 I: AsyncRead + AsyncWrite + Unpin, 359 { 360 let mut conn = proto::Conn::new(io); 361 if !self.h1_keep_alive { 362 conn.disable_keep_alive(); 363 } 364 if self.h1_half_close { 365 conn.set_allow_half_close(); 366 } 367 if self.h1_title_case_headers { 368 conn.set_title_case_headers(); 369 } 370 if self.h1_preserve_header_case { 371 conn.set_preserve_header_case(); 372 } 373 if let Some(header_read_timeout) = self.h1_header_read_timeout { 374 conn.set_http1_header_read_timeout(header_read_timeout); 375 } 376 if let Some(writev) = self.h1_writev { 377 if writev { 378 conn.set_write_strategy_queue(); 379 } else { 380 conn.set_write_strategy_flatten(); 381 } 382 } 383 conn.set_flush_pipeline(self.pipeline_flush); 384 if let Some(max) = self.max_buf_size { 385 conn.set_max_buf_size(max); 386 } 387 let sd = proto::h1::dispatch::Server::new(service); 388 let proto = proto::h1::Dispatcher::new(sd, conn); 389 Connection { conn: proto } 390 } 391 } 392 393 mod upgrades { 394 use crate::upgrade::Upgraded; 395 396 use super::*; 397 398 // A future binding a connection with a Service with Upgrade support. 399 // 400 // This type is unnameable outside the crate. 401 #[must_use = "futures do nothing unless polled"] 402 #[allow(missing_debug_implementations)] 403 pub struct UpgradeableConnection<T, S> 404 where 405 S: HttpService<IncomingBody>, 406 { 407 pub(super) inner: Option<Connection<T, S>>, 408 } 409 410 impl<I, B, S> UpgradeableConnection<I, S> 411 where 412 S: HttpService<IncomingBody, ResBody = B>, 413 S::Error: Into<Box<dyn StdError + Send + Sync>>, 414 I: AsyncRead + AsyncWrite + Unpin, 415 B: Body + 'static, 416 B::Error: Into<Box<dyn StdError + Send + Sync>>, 417 { 418 /// Start a graceful shutdown process for this connection. 419 /// 420 /// This `Connection` should continue to be polled until shutdown 421 /// can finish. graceful_shutdown(mut self: Pin<&mut Self>)422 pub fn graceful_shutdown(mut self: Pin<&mut Self>) { 423 Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown() 424 } 425 } 426 427 impl<I, B, S> Future for UpgradeableConnection<I, S> 428 where 429 S: HttpService<IncomingBody, ResBody = B>, 430 S::Error: Into<Box<dyn StdError + Send + Sync>>, 431 I: AsyncRead + AsyncWrite + Unpin + Send + 'static, 432 B: Body + 'static, 433 B::Error: Into<Box<dyn StdError + Send + Sync>>, 434 { 435 type Output = crate::Result<()>; 436 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>437 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 438 match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) { 439 Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())), 440 Ok(proto::Dispatched::Upgrade(pending)) => { 441 let (io, buf, _) = self.inner.take().unwrap().conn.into_inner(); 442 pending.fulfill(Upgraded::new(io, buf)); 443 Poll::Ready(Ok(())) 444 } 445 Err(e) => Poll::Ready(Err(e)), 446 } 447 } 448 } 449 } 450