1 use std::error::Error as StdError;
2 use std::future::Future;
3 use std::marker::Unpin;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 #[cfg(feature = "runtime")]
7 use std::time::Duration;
8 
9 use bytes::Bytes;
10 use h2::server::{Connection, Handshake, SendResponse};
11 use h2::{Reason, RecvStream};
12 use http::{Method, Request};
13 use pin_project_lite::pin_project;
14 use tokio::io::{AsyncRead, AsyncWrite};
15 use tracing::{debug, trace, warn};
16 
17 use super::{ping, PipeToSendStream, SendBuf};
18 use crate::body::HttpBody;
19 use crate::common::date;
20 use crate::common::exec::ConnStreamExec;
21 use crate::ext::Protocol;
22 use crate::headers;
23 use crate::proto::h2::ping::Recorder;
24 use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
25 use crate::proto::Dispatched;
26 use crate::service::HttpService;
27 
28 use crate::upgrade::{OnUpgrade, Pending, Upgraded};
29 use crate::{Body, Response};
30 
31 // Our defaults are chosen for the "majority" case, which usually are not
32 // resource constrained, and so the spec default of 64kb can be too limiting
33 // for performance.
34 //
35 // At the same time, a server more often has multiple clients connected, and
36 // so is more likely to use more resources than a client would.
37 const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb
38 const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb
39 const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
40 const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb
41 const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 16 << 20; // 16 MB "sane default" taken from golang http2
42 
43 #[derive(Clone, Debug)]
44 pub(crate) struct Config {
45     pub(crate) adaptive_window: bool,
46     pub(crate) initial_conn_window_size: u32,
47     pub(crate) initial_stream_window_size: u32,
48     pub(crate) max_frame_size: u32,
49     pub(crate) enable_connect_protocol: bool,
50     pub(crate) max_concurrent_streams: Option<u32>,
51     pub(crate) max_pending_accept_reset_streams: Option<usize>,
52     #[cfg(feature = "runtime")]
53     pub(crate) keep_alive_interval: Option<Duration>,
54     #[cfg(feature = "runtime")]
55     pub(crate) keep_alive_timeout: Duration,
56     pub(crate) max_send_buffer_size: usize,
57     pub(crate) max_header_list_size: u32,
58 }
59 
60 impl Default for Config {
default() -> Config61     fn default() -> Config {
62         Config {
63             adaptive_window: false,
64             initial_conn_window_size: DEFAULT_CONN_WINDOW,
65             initial_stream_window_size: DEFAULT_STREAM_WINDOW,
66             max_frame_size: DEFAULT_MAX_FRAME_SIZE,
67             enable_connect_protocol: false,
68             max_concurrent_streams: None,
69             max_pending_accept_reset_streams: None,
70             #[cfg(feature = "runtime")]
71             keep_alive_interval: None,
72             #[cfg(feature = "runtime")]
73             keep_alive_timeout: Duration::from_secs(20),
74             max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
75             max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
76         }
77     }
78 }
79 
80 pin_project! {
81     pub(crate) struct Server<T, S, B, E>
82     where
83         S: HttpService<Body>,
84         B: HttpBody,
85     {
86         exec: E,
87         service: S,
88         state: State<T, B>,
89     }
90 }
91 
92 enum State<T, B>
93 where
94     B: HttpBody,
95 {
96     Handshaking {
97         ping_config: ping::Config,
98         hs: Handshake<T, SendBuf<B::Data>>,
99     },
100     Serving(Serving<T, B>),
101     Closed,
102 }
103 
104 struct Serving<T, B>
105 where
106     B: HttpBody,
107 {
108     ping: Option<(ping::Recorder, ping::Ponger)>,
109     conn: Connection<T, SendBuf<B::Data>>,
110     closing: Option<crate::Error>,
111 }
112 
113 impl<T, S, B, E> Server<T, S, B, E>
114 where
115     T: AsyncRead + AsyncWrite + Unpin,
116     S: HttpService<Body, ResBody = B>,
117     S::Error: Into<Box<dyn StdError + Send + Sync>>,
118     B: HttpBody + 'static,
119     E: ConnStreamExec<S::Future, B>,
120 {
new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E>121     pub(crate) fn new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E> {
122         let mut builder = h2::server::Builder::default();
123         builder
124             .initial_window_size(config.initial_stream_window_size)
125             .initial_connection_window_size(config.initial_conn_window_size)
126             .max_frame_size(config.max_frame_size)
127             .max_header_list_size(config.max_header_list_size)
128             .max_send_buffer_size(config.max_send_buffer_size);
129         if let Some(max) = config.max_concurrent_streams {
130             builder.max_concurrent_streams(max);
131         }
132         if let Some(max) = config.max_pending_accept_reset_streams {
133             builder.max_pending_accept_reset_streams(max);
134         }
135         if config.enable_connect_protocol {
136             builder.enable_connect_protocol();
137         }
138         let handshake = builder.handshake(io);
139 
140         let bdp = if config.adaptive_window {
141             Some(config.initial_stream_window_size)
142         } else {
143             None
144         };
145 
146         let ping_config = ping::Config {
147             bdp_initial_window: bdp,
148             #[cfg(feature = "runtime")]
149             keep_alive_interval: config.keep_alive_interval,
150             #[cfg(feature = "runtime")]
151             keep_alive_timeout: config.keep_alive_timeout,
152             // If keep-alive is enabled for servers, always enabled while
153             // idle, so it can more aggressively close dead connections.
154             #[cfg(feature = "runtime")]
155             keep_alive_while_idle: true,
156         };
157 
158         Server {
159             exec,
160             state: State::Handshaking {
161                 ping_config,
162                 hs: handshake,
163             },
164             service,
165         }
166     }
167 
graceful_shutdown(&mut self)168     pub(crate) fn graceful_shutdown(&mut self) {
169         trace!("graceful_shutdown");
170         match self.state {
171             State::Handshaking { .. } => {
172                 // fall-through, to replace state with Closed
173             }
174             State::Serving(ref mut srv) => {
175                 if srv.closing.is_none() {
176                     srv.conn.graceful_shutdown();
177                 }
178                 return;
179             }
180             State::Closed => {
181                 return;
182             }
183         }
184         self.state = State::Closed;
185     }
186 }
187 
188 impl<T, S, B, E> Future for Server<T, S, B, E>
189 where
190     T: AsyncRead + AsyncWrite + Unpin,
191     S: HttpService<Body, ResBody = B>,
192     S::Error: Into<Box<dyn StdError + Send + Sync>>,
193     B: HttpBody + 'static,
194     E: ConnStreamExec<S::Future, B>,
195 {
196     type Output = crate::Result<Dispatched>;
197 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>198     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
199         let me = &mut *self;
200         loop {
201             let next = match me.state {
202                 State::Handshaking {
203                     ref mut hs,
204                     ref ping_config,
205                 } => {
206                     let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
207                     let ping = if ping_config.is_enabled() {
208                         let pp = conn.ping_pong().expect("conn.ping_pong");
209                         Some(ping::channel(pp, ping_config.clone()))
210                     } else {
211                         None
212                     };
213                     State::Serving(Serving {
214                         ping,
215                         conn,
216                         closing: None,
217                     })
218                 }
219                 State::Serving(ref mut srv) => {
220                     ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
221                     return Poll::Ready(Ok(Dispatched::Shutdown));
222                 }
223                 State::Closed => {
224                     // graceful_shutdown was called before handshaking finished,
225                     // nothing to do here...
226                     return Poll::Ready(Ok(Dispatched::Shutdown));
227                 }
228             };
229             me.state = next;
230         }
231     }
232 }
233 
234 impl<T, B> Serving<T, B>
235 where
236     T: AsyncRead + AsyncWrite + Unpin,
237     B: HttpBody + 'static,
238 {
poll_server<S, E>( &mut self, cx: &mut Context<'_>, service: &mut S, exec: &mut E, ) -> Poll<crate::Result<()>> where S: HttpService<Body, ResBody = B>, S::Error: Into<Box<dyn StdError + Send + Sync>>, E: ConnStreamExec<S::Future, B>,239     fn poll_server<S, E>(
240         &mut self,
241         cx: &mut Context<'_>,
242         service: &mut S,
243         exec: &mut E,
244     ) -> Poll<crate::Result<()>>
245     where
246         S: HttpService<Body, ResBody = B>,
247         S::Error: Into<Box<dyn StdError + Send + Sync>>,
248         E: ConnStreamExec<S::Future, B>,
249     {
250         if self.closing.is_none() {
251             loop {
252                 self.poll_ping(cx);
253 
254                 // Check that the service is ready to accept a new request.
255                 //
256                 // - If not, just drive the connection some.
257                 // - If ready, try to accept a new request from the connection.
258                 match service.poll_ready(cx) {
259                     Poll::Ready(Ok(())) => (),
260                     Poll::Pending => {
261                         // use `poll_closed` instead of `poll_accept`,
262                         // in order to avoid accepting a request.
263                         ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
264                         trace!("incoming connection complete");
265                         return Poll::Ready(Ok(()));
266                     }
267                     Poll::Ready(Err(err)) => {
268                         let err = crate::Error::new_user_service(err);
269                         debug!("service closed: {}", err);
270 
271                         let reason = err.h2_reason();
272                         if reason == Reason::NO_ERROR {
273                             // NO_ERROR is only used for graceful shutdowns...
274                             trace!("interpreting NO_ERROR user error as graceful_shutdown");
275                             self.conn.graceful_shutdown();
276                         } else {
277                             trace!("abruptly shutting down with {:?}", reason);
278                             self.conn.abrupt_shutdown(reason);
279                         }
280                         self.closing = Some(err);
281                         break;
282                     }
283                 }
284 
285                 // When the service is ready, accepts an incoming request.
286                 match ready!(self.conn.poll_accept(cx)) {
287                     Some(Ok((req, mut respond))) => {
288                         trace!("incoming request");
289                         let content_length = headers::content_length_parse_all(req.headers());
290                         let ping = self
291                             .ping
292                             .as_ref()
293                             .map(|ping| ping.0.clone())
294                             .unwrap_or_else(ping::disabled);
295 
296                         // Record the headers received
297                         ping.record_non_data();
298 
299                         let is_connect = req.method() == Method::CONNECT;
300                         let (mut parts, stream) = req.into_parts();
301                         let (mut req, connect_parts) = if !is_connect {
302                             (
303                                 Request::from_parts(
304                                     parts,
305                                     crate::Body::h2(stream, content_length.into(), ping),
306                                 ),
307                                 None,
308                             )
309                         } else {
310                             if content_length.map_or(false, |len| len != 0) {
311                                 warn!("h2 connect request with non-zero body not supported");
312                                 respond.send_reset(h2::Reason::INTERNAL_ERROR);
313                                 return Poll::Ready(Ok(()));
314                             }
315                             let (pending, upgrade) = crate::upgrade::pending();
316                             debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
317                             parts.extensions.insert(upgrade);
318                             (
319                                 Request::from_parts(parts, crate::Body::empty()),
320                                 Some(ConnectParts {
321                                     pending,
322                                     ping,
323                                     recv_stream: stream,
324                                 }),
325                             )
326                         };
327 
328                         if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
329                             req.extensions_mut().insert(Protocol::from_inner(protocol));
330                         }
331 
332                         let fut = H2Stream::new(service.call(req), connect_parts, respond);
333                         exec.execute_h2stream(fut);
334                     }
335                     Some(Err(e)) => {
336                         return Poll::Ready(Err(crate::Error::new_h2(e)));
337                     }
338                     None => {
339                         // no more incoming streams...
340                         if let Some((ref ping, _)) = self.ping {
341                             ping.ensure_not_timed_out()?;
342                         }
343 
344                         trace!("incoming connection complete");
345                         return Poll::Ready(Ok(()));
346                     }
347                 }
348             }
349         }
350 
351         debug_assert!(
352             self.closing.is_some(),
353             "poll_server broke loop without closing"
354         );
355 
356         ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
357 
358         Poll::Ready(Err(self.closing.take().expect("polled after error")))
359     }
360 
poll_ping(&mut self, cx: &mut Context<'_>)361     fn poll_ping(&mut self, cx: &mut Context<'_>) {
362         if let Some((_, ref mut estimator)) = self.ping {
363             match estimator.poll(cx) {
364                 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
365                     self.conn.set_target_window_size(wnd);
366                     let _ = self.conn.set_initial_window_size(wnd);
367                 }
368                 #[cfg(feature = "runtime")]
369                 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
370                     debug!("keep-alive timed out, closing connection");
371                     self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
372                 }
373                 Poll::Pending => {}
374             }
375         }
376     }
377 }
378 
379 pin_project! {
380     #[allow(missing_debug_implementations)]
381     pub struct H2Stream<F, B>
382     where
383         B: HttpBody,
384     {
385         reply: SendResponse<SendBuf<B::Data>>,
386         #[pin]
387         state: H2StreamState<F, B>,
388     }
389 }
390 
391 pin_project! {
392     #[project = H2StreamStateProj]
393     enum H2StreamState<F, B>
394     where
395         B: HttpBody,
396     {
397         Service {
398             #[pin]
399             fut: F,
400             connect_parts: Option<ConnectParts>,
401         },
402         Body {
403             #[pin]
404             pipe: PipeToSendStream<B>,
405         },
406     }
407 }
408 
409 struct ConnectParts {
410     pending: Pending,
411     ping: Recorder,
412     recv_stream: RecvStream,
413 }
414 
415 impl<F, B> H2Stream<F, B>
416 where
417     B: HttpBody,
418 {
new( fut: F, connect_parts: Option<ConnectParts>, respond: SendResponse<SendBuf<B::Data>>, ) -> H2Stream<F, B>419     fn new(
420         fut: F,
421         connect_parts: Option<ConnectParts>,
422         respond: SendResponse<SendBuf<B::Data>>,
423     ) -> H2Stream<F, B> {
424         H2Stream {
425             reply: respond,
426             state: H2StreamState::Service { fut, connect_parts },
427         }
428     }
429 }
430 
431 macro_rules! reply {
432     ($me:expr, $res:expr, $eos:expr) => {{
433         match $me.reply.send_response($res, $eos) {
434             Ok(tx) => tx,
435             Err(e) => {
436                 debug!("send response error: {}", e);
437                 $me.reply.send_reset(Reason::INTERNAL_ERROR);
438                 return Poll::Ready(Err(crate::Error::new_h2(e)));
439             }
440         }
441     }};
442 }
443 
444 impl<F, B, E> H2Stream<F, B>
445 where
446     F: Future<Output = Result<Response<B>, E>>,
447     B: HttpBody,
448     B::Data: 'static,
449     B::Error: Into<Box<dyn StdError + Send + Sync>>,
450     E: Into<Box<dyn StdError + Send + Sync>>,
451 {
poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>>452     fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
453         let mut me = self.project();
454         loop {
455             let next = match me.state.as_mut().project() {
456                 H2StreamStateProj::Service {
457                     fut: h,
458                     connect_parts,
459                 } => {
460                     let res = match h.poll(cx) {
461                         Poll::Ready(Ok(r)) => r,
462                         Poll::Pending => {
463                             // Response is not yet ready, so we want to check if the client has sent a
464                             // RST_STREAM frame which would cancel the current request.
465                             if let Poll::Ready(reason) =
466                                 me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
467                             {
468                                 debug!("stream received RST_STREAM: {:?}", reason);
469                                 return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
470                             }
471                             return Poll::Pending;
472                         }
473                         Poll::Ready(Err(e)) => {
474                             let err = crate::Error::new_user_service(e);
475                             warn!("http2 service errored: {}", err);
476                             me.reply.send_reset(err.h2_reason());
477                             return Poll::Ready(Err(err));
478                         }
479                     };
480 
481                     let (head, body) = res.into_parts();
482                     let mut res = ::http::Response::from_parts(head, ());
483                     super::strip_connection_headers(res.headers_mut(), false);
484 
485                     // set Date header if it isn't already set...
486                     res.headers_mut()
487                         .entry(::http::header::DATE)
488                         .or_insert_with(date::update_and_header_value);
489 
490                     if let Some(connect_parts) = connect_parts.take() {
491                         if res.status().is_success() {
492                             if headers::content_length_parse_all(res.headers())
493                                 .map_or(false, |len| len != 0)
494                             {
495                                 warn!("h2 successful response to CONNECT request with body not supported");
496                                 me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
497                                 return Poll::Ready(Err(crate::Error::new_user_header()));
498                             }
499                             let send_stream = reply!(me, res, false);
500                             connect_parts.pending.fulfill(Upgraded::new(
501                                 H2Upgraded {
502                                     ping: connect_parts.ping,
503                                     recv_stream: connect_parts.recv_stream,
504                                     send_stream: unsafe { UpgradedSendStream::new(send_stream) },
505                                     buf: Bytes::new(),
506                                 },
507                                 Bytes::new(),
508                             ));
509                             return Poll::Ready(Ok(()));
510                         }
511                     }
512 
513                     if !body.is_end_stream() {
514                         // automatically set Content-Length from body...
515                         if let Some(len) = body.size_hint().exact() {
516                             headers::set_content_length_if_missing(res.headers_mut(), len);
517                         }
518 
519                         let body_tx = reply!(me, res, false);
520                         H2StreamState::Body {
521                             pipe: PipeToSendStream::new(body, body_tx),
522                         }
523                     } else {
524                         reply!(me, res, true);
525                         return Poll::Ready(Ok(()));
526                     }
527                 }
528                 H2StreamStateProj::Body { pipe } => {
529                     return pipe.poll(cx);
530                 }
531             };
532             me.state.set(next);
533         }
534     }
535 }
536 
537 impl<F, B, E> Future for H2Stream<F, B>
538 where
539     F: Future<Output = Result<Response<B>, E>>,
540     B: HttpBody,
541     B::Data: 'static,
542     B::Error: Into<Box<dyn StdError + Send + Sync>>,
543     E: Into<Box<dyn StdError + Send + Sync>>,
544 {
545     type Output = ();
546 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>547     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
548         self.poll2(cx).map(|res| {
549             if let Err(e) = res {
550                 debug!("stream error: {}", e);
551             }
552         })
553     }
554 }
555