1 use std::error::Error as StdError; 2 use std::future::Future; 3 use std::marker::Unpin; 4 use std::pin::Pin; 5 use std::task::{Context, Poll}; 6 #[cfg(feature = "runtime")] 7 use std::time::Duration; 8 9 use bytes::Bytes; 10 use h2::server::{Connection, Handshake, SendResponse}; 11 use h2::{Reason, RecvStream}; 12 use http::{Method, Request}; 13 use pin_project_lite::pin_project; 14 use tokio::io::{AsyncRead, AsyncWrite}; 15 use tracing::{debug, trace, warn}; 16 17 use super::{ping, PipeToSendStream, SendBuf}; 18 use crate::body::HttpBody; 19 use crate::common::date; 20 use crate::common::exec::ConnStreamExec; 21 use crate::ext::Protocol; 22 use crate::headers; 23 use crate::proto::h2::ping::Recorder; 24 use crate::proto::h2::{H2Upgraded, UpgradedSendStream}; 25 use crate::proto::Dispatched; 26 use crate::service::HttpService; 27 28 use crate::upgrade::{OnUpgrade, Pending, Upgraded}; 29 use crate::{Body, Response}; 30 31 // Our defaults are chosen for the "majority" case, which usually are not 32 // resource constrained, and so the spec default of 64kb can be too limiting 33 // for performance. 34 // 35 // At the same time, a server more often has multiple clients connected, and 36 // so is more likely to use more resources than a client would. 37 const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb 38 const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb 39 const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb 40 const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb 41 const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 16 << 20; // 16 MB "sane default" taken from golang http2 42 43 #[derive(Clone, Debug)] 44 pub(crate) struct Config { 45 pub(crate) adaptive_window: bool, 46 pub(crate) initial_conn_window_size: u32, 47 pub(crate) initial_stream_window_size: u32, 48 pub(crate) max_frame_size: u32, 49 pub(crate) enable_connect_protocol: bool, 50 pub(crate) max_concurrent_streams: Option<u32>, 51 pub(crate) max_pending_accept_reset_streams: Option<usize>, 52 #[cfg(feature = "runtime")] 53 pub(crate) keep_alive_interval: Option<Duration>, 54 #[cfg(feature = "runtime")] 55 pub(crate) keep_alive_timeout: Duration, 56 pub(crate) max_send_buffer_size: usize, 57 pub(crate) max_header_list_size: u32, 58 } 59 60 impl Default for Config { default() -> Config61 fn default() -> Config { 62 Config { 63 adaptive_window: false, 64 initial_conn_window_size: DEFAULT_CONN_WINDOW, 65 initial_stream_window_size: DEFAULT_STREAM_WINDOW, 66 max_frame_size: DEFAULT_MAX_FRAME_SIZE, 67 enable_connect_protocol: false, 68 max_concurrent_streams: None, 69 max_pending_accept_reset_streams: None, 70 #[cfg(feature = "runtime")] 71 keep_alive_interval: None, 72 #[cfg(feature = "runtime")] 73 keep_alive_timeout: Duration::from_secs(20), 74 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, 75 max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE, 76 } 77 } 78 } 79 80 pin_project! { 81 pub(crate) struct Server<T, S, B, E> 82 where 83 S: HttpService<Body>, 84 B: HttpBody, 85 { 86 exec: E, 87 service: S, 88 state: State<T, B>, 89 } 90 } 91 92 enum State<T, B> 93 where 94 B: HttpBody, 95 { 96 Handshaking { 97 ping_config: ping::Config, 98 hs: Handshake<T, SendBuf<B::Data>>, 99 }, 100 Serving(Serving<T, B>), 101 Closed, 102 } 103 104 struct Serving<T, B> 105 where 106 B: HttpBody, 107 { 108 ping: Option<(ping::Recorder, ping::Ponger)>, 109 conn: Connection<T, SendBuf<B::Data>>, 110 closing: Option<crate::Error>, 111 } 112 113 impl<T, S, B, E> Server<T, S, B, E> 114 where 115 T: AsyncRead + AsyncWrite + Unpin, 116 S: HttpService<Body, ResBody = B>, 117 S::Error: Into<Box<dyn StdError + Send + Sync>>, 118 B: HttpBody + 'static, 119 E: ConnStreamExec<S::Future, B>, 120 { new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E>121 pub(crate) fn new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E> { 122 let mut builder = h2::server::Builder::default(); 123 builder 124 .initial_window_size(config.initial_stream_window_size) 125 .initial_connection_window_size(config.initial_conn_window_size) 126 .max_frame_size(config.max_frame_size) 127 .max_header_list_size(config.max_header_list_size) 128 .max_send_buffer_size(config.max_send_buffer_size); 129 if let Some(max) = config.max_concurrent_streams { 130 builder.max_concurrent_streams(max); 131 } 132 if let Some(max) = config.max_pending_accept_reset_streams { 133 builder.max_pending_accept_reset_streams(max); 134 } 135 if config.enable_connect_protocol { 136 builder.enable_connect_protocol(); 137 } 138 let handshake = builder.handshake(io); 139 140 let bdp = if config.adaptive_window { 141 Some(config.initial_stream_window_size) 142 } else { 143 None 144 }; 145 146 let ping_config = ping::Config { 147 bdp_initial_window: bdp, 148 #[cfg(feature = "runtime")] 149 keep_alive_interval: config.keep_alive_interval, 150 #[cfg(feature = "runtime")] 151 keep_alive_timeout: config.keep_alive_timeout, 152 // If keep-alive is enabled for servers, always enabled while 153 // idle, so it can more aggressively close dead connections. 154 #[cfg(feature = "runtime")] 155 keep_alive_while_idle: true, 156 }; 157 158 Server { 159 exec, 160 state: State::Handshaking { 161 ping_config, 162 hs: handshake, 163 }, 164 service, 165 } 166 } 167 graceful_shutdown(&mut self)168 pub(crate) fn graceful_shutdown(&mut self) { 169 trace!("graceful_shutdown"); 170 match self.state { 171 State::Handshaking { .. } => { 172 // fall-through, to replace state with Closed 173 } 174 State::Serving(ref mut srv) => { 175 if srv.closing.is_none() { 176 srv.conn.graceful_shutdown(); 177 } 178 return; 179 } 180 State::Closed => { 181 return; 182 } 183 } 184 self.state = State::Closed; 185 } 186 } 187 188 impl<T, S, B, E> Future for Server<T, S, B, E> 189 where 190 T: AsyncRead + AsyncWrite + Unpin, 191 S: HttpService<Body, ResBody = B>, 192 S::Error: Into<Box<dyn StdError + Send + Sync>>, 193 B: HttpBody + 'static, 194 E: ConnStreamExec<S::Future, B>, 195 { 196 type Output = crate::Result<Dispatched>; 197 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>198 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 199 let me = &mut *self; 200 loop { 201 let next = match me.state { 202 State::Handshaking { 203 ref mut hs, 204 ref ping_config, 205 } => { 206 let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?; 207 let ping = if ping_config.is_enabled() { 208 let pp = conn.ping_pong().expect("conn.ping_pong"); 209 Some(ping::channel(pp, ping_config.clone())) 210 } else { 211 None 212 }; 213 State::Serving(Serving { 214 ping, 215 conn, 216 closing: None, 217 }) 218 } 219 State::Serving(ref mut srv) => { 220 ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?; 221 return Poll::Ready(Ok(Dispatched::Shutdown)); 222 } 223 State::Closed => { 224 // graceful_shutdown was called before handshaking finished, 225 // nothing to do here... 226 return Poll::Ready(Ok(Dispatched::Shutdown)); 227 } 228 }; 229 me.state = next; 230 } 231 } 232 } 233 234 impl<T, B> Serving<T, B> 235 where 236 T: AsyncRead + AsyncWrite + Unpin, 237 B: HttpBody + 'static, 238 { poll_server<S, E>( &mut self, cx: &mut Context<'_>, service: &mut S, exec: &mut E, ) -> Poll<crate::Result<()>> where S: HttpService<Body, ResBody = B>, S::Error: Into<Box<dyn StdError + Send + Sync>>, E: ConnStreamExec<S::Future, B>,239 fn poll_server<S, E>( 240 &mut self, 241 cx: &mut Context<'_>, 242 service: &mut S, 243 exec: &mut E, 244 ) -> Poll<crate::Result<()>> 245 where 246 S: HttpService<Body, ResBody = B>, 247 S::Error: Into<Box<dyn StdError + Send + Sync>>, 248 E: ConnStreamExec<S::Future, B>, 249 { 250 if self.closing.is_none() { 251 loop { 252 self.poll_ping(cx); 253 254 // Check that the service is ready to accept a new request. 255 // 256 // - If not, just drive the connection some. 257 // - If ready, try to accept a new request from the connection. 258 match service.poll_ready(cx) { 259 Poll::Ready(Ok(())) => (), 260 Poll::Pending => { 261 // use `poll_closed` instead of `poll_accept`, 262 // in order to avoid accepting a request. 263 ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?; 264 trace!("incoming connection complete"); 265 return Poll::Ready(Ok(())); 266 } 267 Poll::Ready(Err(err)) => { 268 let err = crate::Error::new_user_service(err); 269 debug!("service closed: {}", err); 270 271 let reason = err.h2_reason(); 272 if reason == Reason::NO_ERROR { 273 // NO_ERROR is only used for graceful shutdowns... 274 trace!("interpreting NO_ERROR user error as graceful_shutdown"); 275 self.conn.graceful_shutdown(); 276 } else { 277 trace!("abruptly shutting down with {:?}", reason); 278 self.conn.abrupt_shutdown(reason); 279 } 280 self.closing = Some(err); 281 break; 282 } 283 } 284 285 // When the service is ready, accepts an incoming request. 286 match ready!(self.conn.poll_accept(cx)) { 287 Some(Ok((req, mut respond))) => { 288 trace!("incoming request"); 289 let content_length = headers::content_length_parse_all(req.headers()); 290 let ping = self 291 .ping 292 .as_ref() 293 .map(|ping| ping.0.clone()) 294 .unwrap_or_else(ping::disabled); 295 296 // Record the headers received 297 ping.record_non_data(); 298 299 let is_connect = req.method() == Method::CONNECT; 300 let (mut parts, stream) = req.into_parts(); 301 let (mut req, connect_parts) = if !is_connect { 302 ( 303 Request::from_parts( 304 parts, 305 crate::Body::h2(stream, content_length.into(), ping), 306 ), 307 None, 308 ) 309 } else { 310 if content_length.map_or(false, |len| len != 0) { 311 warn!("h2 connect request with non-zero body not supported"); 312 respond.send_reset(h2::Reason::INTERNAL_ERROR); 313 return Poll::Ready(Ok(())); 314 } 315 let (pending, upgrade) = crate::upgrade::pending(); 316 debug_assert!(parts.extensions.get::<OnUpgrade>().is_none()); 317 parts.extensions.insert(upgrade); 318 ( 319 Request::from_parts(parts, crate::Body::empty()), 320 Some(ConnectParts { 321 pending, 322 ping, 323 recv_stream: stream, 324 }), 325 ) 326 }; 327 328 if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() { 329 req.extensions_mut().insert(Protocol::from_inner(protocol)); 330 } 331 332 let fut = H2Stream::new(service.call(req), connect_parts, respond); 333 exec.execute_h2stream(fut); 334 } 335 Some(Err(e)) => { 336 return Poll::Ready(Err(crate::Error::new_h2(e))); 337 } 338 None => { 339 // no more incoming streams... 340 if let Some((ref ping, _)) = self.ping { 341 ping.ensure_not_timed_out()?; 342 } 343 344 trace!("incoming connection complete"); 345 return Poll::Ready(Ok(())); 346 } 347 } 348 } 349 } 350 351 debug_assert!( 352 self.closing.is_some(), 353 "poll_server broke loop without closing" 354 ); 355 356 ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?; 357 358 Poll::Ready(Err(self.closing.take().expect("polled after error"))) 359 } 360 poll_ping(&mut self, cx: &mut Context<'_>)361 fn poll_ping(&mut self, cx: &mut Context<'_>) { 362 if let Some((_, ref mut estimator)) = self.ping { 363 match estimator.poll(cx) { 364 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { 365 self.conn.set_target_window_size(wnd); 366 let _ = self.conn.set_initial_window_size(wnd); 367 } 368 #[cfg(feature = "runtime")] 369 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { 370 debug!("keep-alive timed out, closing connection"); 371 self.conn.abrupt_shutdown(h2::Reason::NO_ERROR); 372 } 373 Poll::Pending => {} 374 } 375 } 376 } 377 } 378 379 pin_project! { 380 #[allow(missing_debug_implementations)] 381 pub struct H2Stream<F, B> 382 where 383 B: HttpBody, 384 { 385 reply: SendResponse<SendBuf<B::Data>>, 386 #[pin] 387 state: H2StreamState<F, B>, 388 } 389 } 390 391 pin_project! { 392 #[project = H2StreamStateProj] 393 enum H2StreamState<F, B> 394 where 395 B: HttpBody, 396 { 397 Service { 398 #[pin] 399 fut: F, 400 connect_parts: Option<ConnectParts>, 401 }, 402 Body { 403 #[pin] 404 pipe: PipeToSendStream<B>, 405 }, 406 } 407 } 408 409 struct ConnectParts { 410 pending: Pending, 411 ping: Recorder, 412 recv_stream: RecvStream, 413 } 414 415 impl<F, B> H2Stream<F, B> 416 where 417 B: HttpBody, 418 { new( fut: F, connect_parts: Option<ConnectParts>, respond: SendResponse<SendBuf<B::Data>>, ) -> H2Stream<F, B>419 fn new( 420 fut: F, 421 connect_parts: Option<ConnectParts>, 422 respond: SendResponse<SendBuf<B::Data>>, 423 ) -> H2Stream<F, B> { 424 H2Stream { 425 reply: respond, 426 state: H2StreamState::Service { fut, connect_parts }, 427 } 428 } 429 } 430 431 macro_rules! reply { 432 ($me:expr, $res:expr, $eos:expr) => {{ 433 match $me.reply.send_response($res, $eos) { 434 Ok(tx) => tx, 435 Err(e) => { 436 debug!("send response error: {}", e); 437 $me.reply.send_reset(Reason::INTERNAL_ERROR); 438 return Poll::Ready(Err(crate::Error::new_h2(e))); 439 } 440 } 441 }}; 442 } 443 444 impl<F, B, E> H2Stream<F, B> 445 where 446 F: Future<Output = Result<Response<B>, E>>, 447 B: HttpBody, 448 B::Data: 'static, 449 B::Error: Into<Box<dyn StdError + Send + Sync>>, 450 E: Into<Box<dyn StdError + Send + Sync>>, 451 { poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>>452 fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { 453 let mut me = self.project(); 454 loop { 455 let next = match me.state.as_mut().project() { 456 H2StreamStateProj::Service { 457 fut: h, 458 connect_parts, 459 } => { 460 let res = match h.poll(cx) { 461 Poll::Ready(Ok(r)) => r, 462 Poll::Pending => { 463 // Response is not yet ready, so we want to check if the client has sent a 464 // RST_STREAM frame which would cancel the current request. 465 if let Poll::Ready(reason) = 466 me.reply.poll_reset(cx).map_err(crate::Error::new_h2)? 467 { 468 debug!("stream received RST_STREAM: {:?}", reason); 469 return Poll::Ready(Err(crate::Error::new_h2(reason.into()))); 470 } 471 return Poll::Pending; 472 } 473 Poll::Ready(Err(e)) => { 474 let err = crate::Error::new_user_service(e); 475 warn!("http2 service errored: {}", err); 476 me.reply.send_reset(err.h2_reason()); 477 return Poll::Ready(Err(err)); 478 } 479 }; 480 481 let (head, body) = res.into_parts(); 482 let mut res = ::http::Response::from_parts(head, ()); 483 super::strip_connection_headers(res.headers_mut(), false); 484 485 // set Date header if it isn't already set... 486 res.headers_mut() 487 .entry(::http::header::DATE) 488 .or_insert_with(date::update_and_header_value); 489 490 if let Some(connect_parts) = connect_parts.take() { 491 if res.status().is_success() { 492 if headers::content_length_parse_all(res.headers()) 493 .map_or(false, |len| len != 0) 494 { 495 warn!("h2 successful response to CONNECT request with body not supported"); 496 me.reply.send_reset(h2::Reason::INTERNAL_ERROR); 497 return Poll::Ready(Err(crate::Error::new_user_header())); 498 } 499 let send_stream = reply!(me, res, false); 500 connect_parts.pending.fulfill(Upgraded::new( 501 H2Upgraded { 502 ping: connect_parts.ping, 503 recv_stream: connect_parts.recv_stream, 504 send_stream: unsafe { UpgradedSendStream::new(send_stream) }, 505 buf: Bytes::new(), 506 }, 507 Bytes::new(), 508 )); 509 return Poll::Ready(Ok(())); 510 } 511 } 512 513 if !body.is_end_stream() { 514 // automatically set Content-Length from body... 515 if let Some(len) = body.size_hint().exact() { 516 headers::set_content_length_if_missing(res.headers_mut(), len); 517 } 518 519 let body_tx = reply!(me, res, false); 520 H2StreamState::Body { 521 pipe: PipeToSendStream::new(body, body_tx), 522 } 523 } else { 524 reply!(me, res, true); 525 return Poll::Ready(Ok(())); 526 } 527 } 528 H2StreamStateProj::Body { pipe } => { 529 return pipe.poll(cx); 530 } 531 }; 532 me.state.set(next); 533 } 534 } 535 } 536 537 impl<F, B, E> Future for H2Stream<F, B> 538 where 539 F: Future<Output = Result<Response<B>, E>>, 540 B: HttpBody, 541 B::Data: 'static, 542 B::Error: Into<Box<dyn StdError + Send + Sync>>, 543 E: Into<Box<dyn StdError + Send + Sync>>, 544 { 545 type Output = (); 546 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>547 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 548 self.poll2(cx).map(|res| { 549 if let Err(e) = res { 550 debug!("stream error: {}", e); 551 } 552 }) 553 } 554 } 555