1 //! Server implementation of the HTTP/2 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2 server requires the caller to manage accepting the
6 //! connections as well as getting the connections to a state that is ready to
7 //! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpListener`] to accept
11 //! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12 //! upgrades.
13 //!
14 //! Once a connection is obtained, it is passed to [`handshake`],
15 //! which will begin the [HTTP/2 handshake]. This returns a future that
16 //! completes once the handshake process is performed and HTTP/2 streams may
17 //! be received.
18 //!
19 //! [`handshake`] uses default configuration values. There are a number of
20 //! settings that can be changed by using [`Builder`] instead.
21 //!
22 //! # Inbound streams
23 //!
24 //! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25 //! does this by implementing [`futures::Stream`]. When a new stream is
26 //! received, a call to [`Connection::accept`] will return `(request, response)`.
27 //! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28 //! HTTP request head as well as provides a way to receive the inbound data
29 //! stream and the trailers. The `response` handle (of type [`SendResponse`])
30 //! allows responding to the request, stream the response payload, send
31 //! trailers, and send push promises.
32 //!
33 //! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34 //! can be operated independently.
35 //!
36 //! # Managing the connection
37 //!
38 //! The [`Connection`] instance is used to manage connection state. The caller
39 //! is required to call either [`Connection::accept`] or
40 //! [`Connection::poll_close`] in order to advance the connection state. Simply
41 //! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42 //! connection state is advanced.
43 //!
44 //! It is not required to call **both** [`Connection::accept`] and
45 //! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46 //! then only [`Connection::accept`] should be called. When the caller **does
47 //! not** want to accept a new stream, [`Connection::poll_close`] should be
48 //! called.
49 //!
50 //! The [`Connection`] instance should only be dropped once
51 //! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52 //! returns `Ready(None)`, there will no longer be any more inbound streams. At
53 //! this point, only [`Connection::poll_close`] should be called.
54 //!
55 //! # Shutting down the server
56 //!
57 //! Graceful shutdown of the server is [not yet
58 //! implemented](https://github.com/hyperium/h2/issues/69).
59 //!
60 //! # Example
61 //!
62 //! A basic HTTP/2 server example that runs over TCP and assumes [prior
63 //! knowledge], i.e. both the client and the server assume that the TCP socket
64 //! will use the HTTP/2 protocol without prior negotiation.
65 //!
66 //! ```no_run
67 //! use h2::server;
68 //! use http::{Response, StatusCode};
69 //! use tokio::net::TcpListener;
70 //!
71 //! #[tokio::main]
72 //! pub async fn main() {
73 //!     let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74 //!
75 //!     // Accept all incoming TCP connections.
76 //!     loop {
77 //!         if let Ok((socket, _peer_addr)) = listener.accept().await {
78 //!             // Spawn a new task to process each connection.
79 //!             tokio::spawn(async {
80 //!                 // Start the HTTP/2 connection handshake
81 //!                 let mut h2 = server::handshake(socket).await.unwrap();
82 //!                 // Accept all inbound HTTP/2 streams sent over the
83 //!                 // connection.
84 //!                 while let Some(request) = h2.accept().await {
85 //!                     let (request, mut respond) = request.unwrap();
86 //!                     println!("Received request: {:?}", request);
87 //!
88 //!                     // Build a response with no body
89 //!                     let response = Response::builder()
90 //!                         .status(StatusCode::OK)
91 //!                         .body(())
92 //!                         .unwrap();
93 //!
94 //!                     // Send the response back to the client
95 //!                     respond.send_response(response, true)
96 //!                         .unwrap();
97 //!                 }
98 //!
99 //!             });
100 //!         }
101 //!     }
102 //! }
103 //! ```
104 //!
105 //! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106 //! [`handshake`]: fn.handshake.html
107 //! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108 //! [`Builder`]: struct.Builder.html
109 //! [`Connection`]: struct.Connection.html
110 //! [`Connection::poll`]: struct.Connection.html#method.poll
111 //! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112 //! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113 //! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114 //! [`RecvStream`]: ../struct.RecvStream.html
115 //! [`SendStream`]: ../struct.SendStream.html
116 //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117 
118 use crate::codec::{Codec, UserError};
119 use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120 use crate::proto::{self, Config, Error, Prioritized};
121 use crate::{FlowControl, PingPong, RecvStream, SendStream};
122 
123 use bytes::{Buf, Bytes};
124 use http::{HeaderMap, Method, Request, Response};
125 use std::future::Future;
126 use std::pin::Pin;
127 use std::task::{Context, Poll};
128 use std::time::Duration;
129 use std::{fmt, io};
130 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
131 use tracing::instrument::{Instrument, Instrumented};
132 
133 /// In progress HTTP/2 connection handshake future.
134 ///
135 /// This type implements `Future`, yielding a `Connection` instance once the
136 /// handshake has completed.
137 ///
138 /// The handshake is completed once the connection preface is fully received
139 /// from the client **and** the initial settings frame is sent to the client.
140 ///
141 /// The handshake future does not wait for the initial settings frame from the
142 /// client.
143 ///
144 /// See [module] level docs for more details.
145 ///
146 /// [module]: index.html
147 #[must_use = "futures do nothing unless polled"]
148 pub struct Handshake<T, B: Buf = Bytes> {
149     /// The config to pass to Connection::new after handshake succeeds.
150     builder: Builder,
151     /// The current state of the handshake.
152     state: Handshaking<T, B>,
153     /// Span tracking the handshake
154     span: tracing::Span,
155 }
156 
157 /// Accepts inbound HTTP/2 streams on a connection.
158 ///
159 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
160 /// implements the HTTP/2 server logic for that connection. It is responsible
161 /// for receiving inbound streams initiated by the client as well as driving the
162 /// internal state forward.
163 ///
164 /// `Connection` values are created by calling [`handshake`]. Once a
165 /// `Connection` value is obtained, the caller must call [`poll`] or
166 /// [`poll_close`] in order to drive the internal connection state forward.
167 ///
168 /// See [module level] documentation for more details
169 ///
170 /// [module level]: index.html
171 /// [`handshake`]: struct.Connection.html#method.handshake
172 /// [`poll`]: struct.Connection.html#method.poll
173 /// [`poll_close`]: struct.Connection.html#method.poll_close
174 ///
175 /// # Examples
176 ///
177 /// ```
178 /// # use tokio::io::{AsyncRead, AsyncWrite};
179 /// # use h2::server;
180 /// # use h2::server::*;
181 /// #
182 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
183 /// let mut server = server::handshake(my_io).await.unwrap();
184 /// while let Some(request) = server.accept().await {
185 ///     tokio::spawn(async move {
186 ///         let (request, respond) = request.unwrap();
187 ///         // Process the request and send the response back to the client
188 ///         // using `respond`.
189 ///     });
190 /// }
191 /// # }
192 /// #
193 /// # pub fn main() {}
194 /// ```
195 #[must_use = "streams do nothing unless polled"]
196 pub struct Connection<T, B: Buf> {
197     connection: proto::Connection<T, Peer, B>,
198 }
199 
200 /// Builds server connections with custom configuration values.
201 ///
202 /// Methods can be chained in order to set the configuration values.
203 ///
204 /// The server is constructed by calling [`handshake`] and passing the I/O
205 /// handle that will back the HTTP/2 server.
206 ///
207 /// New instances of `Builder` are obtained via [`Builder::new`].
208 ///
209 /// See function level documentation for details on the various server
210 /// configuration settings.
211 ///
212 /// [`Builder::new`]: struct.Builder.html#method.new
213 /// [`handshake`]: struct.Builder.html#method.handshake
214 ///
215 /// # Examples
216 ///
217 /// ```
218 /// # use tokio::io::{AsyncRead, AsyncWrite};
219 /// # use h2::server::*;
220 /// #
221 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
222 /// # -> Handshake<T>
223 /// # {
224 /// // `server_fut` is a future representing the completion of the HTTP/2
225 /// // handshake.
226 /// let server_fut = Builder::new()
227 ///     .initial_window_size(1_000_000)
228 ///     .max_concurrent_streams(1000)
229 ///     .handshake(my_io);
230 /// # server_fut
231 /// # }
232 /// #
233 /// # pub fn main() {}
234 /// ```
235 #[derive(Clone, Debug)]
236 pub struct Builder {
237     /// Time to keep locally reset streams around before reaping.
238     reset_stream_duration: Duration,
239 
240     /// Maximum number of locally reset streams to keep at a time.
241     reset_stream_max: usize,
242 
243     /// Maximum number of remotely reset streams to allow in the pending
244     /// accept queue.
245     pending_accept_reset_stream_max: usize,
246 
247     /// Initial `Settings` frame to send as part of the handshake.
248     settings: Settings,
249 
250     /// Initial target window size for new connections.
251     initial_target_connection_window_size: Option<u32>,
252 
253     /// Maximum amount of bytes to "buffer" for writing per stream.
254     max_send_buffer_size: usize,
255 
256     /// Maximum number of locally reset streams due to protocol error across
257     /// the lifetime of the connection.
258     ///
259     /// When this gets exceeded, we issue GOAWAYs.
260     local_max_error_reset_streams: Option<usize>,
261 }
262 
263 /// Send a response back to the client
264 ///
265 /// A `SendResponse` instance is provided when receiving a request and is used
266 /// to send the associated response back to the client. It is also used to
267 /// explicitly reset the stream with a custom reason.
268 ///
269 /// It will also be used to initiate push promises linked with the associated
270 /// stream.
271 ///
272 /// If the `SendResponse` instance is dropped without sending a response, then
273 /// the HTTP/2 stream will be reset.
274 ///
275 /// See [module] level docs for more details.
276 ///
277 /// [module]: index.html
278 #[derive(Debug)]
279 pub struct SendResponse<B: Buf> {
280     inner: proto::StreamRef<B>,
281 }
282 
283 /// Send a response to a promised request
284 ///
285 /// A `SendPushedResponse` instance is provided when promising a request and is used
286 /// to send the associated response to the client. It is also used to
287 /// explicitly reset the stream with a custom reason.
288 ///
289 /// It can not be used to initiate push promises.
290 ///
291 /// If the `SendPushedResponse` instance is dropped without sending a response, then
292 /// the HTTP/2 stream will be reset.
293 ///
294 /// See [module] level docs for more details.
295 ///
296 /// [module]: index.html
297 pub struct SendPushedResponse<B: Buf> {
298     inner: SendResponse<B>,
299 }
300 
301 // Manual implementation necessary because of rust-lang/rust#26925
302 impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result303     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
304         write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
305     }
306 }
307 
308 /// Stages of an in-progress handshake.
309 enum Handshaking<T, B: Buf> {
310     /// State 1. Connection is flushing pending SETTINGS frame.
311     Flushing(Instrumented<Flush<T, Prioritized<B>>>),
312     /// State 2. Connection is waiting for the client preface.
313     ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
314     /// State 3. Handshake is done, polling again would panic.
315     Done,
316 }
317 
318 /// Flush a Sink
319 struct Flush<T, B> {
320     codec: Option<Codec<T, B>>,
321 }
322 
323 /// Read the client connection preface
324 struct ReadPreface<T, B> {
325     codec: Option<Codec<T, B>>,
326     pos: usize,
327 }
328 
329 #[derive(Debug)]
330 pub(crate) struct Peer;
331 
332 const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
333 
334 /// Creates a new configured HTTP/2 server with default configuration
335 /// values backed by `io`.
336 ///
337 /// It is expected that `io` already be in an appropriate state to commence
338 /// the [HTTP/2 handshake]. See [Handshake] for more details.
339 ///
340 /// Returns a future which resolves to the [`Connection`] instance once the
341 /// HTTP/2 handshake has been completed. The returned [`Connection`]
342 /// instance will be using default configuration values. Use [`Builder`] to
343 /// customize the configuration values used by a [`Connection`] instance.
344 ///
345 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
346 /// [Handshake]: ../index.html#handshake
347 /// [`Connection`]: struct.Connection.html
348 ///
349 /// # Examples
350 ///
351 /// ```
352 /// # use tokio::io::{AsyncRead, AsyncWrite};
353 /// # use h2::server;
354 /// # use h2::server::*;
355 /// #
356 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
357 /// # {
358 /// let connection = server::handshake(my_io).await.unwrap();
359 /// // The HTTP/2 handshake has completed, now use `connection` to
360 /// // accept inbound HTTP/2 streams.
361 /// # }
362 /// #
363 /// # pub fn main() {}
364 /// ```
handshake<T>(io: T) -> Handshake<T, Bytes> where T: AsyncRead + AsyncWrite + Unpin,365 pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
366 where
367     T: AsyncRead + AsyncWrite + Unpin,
368 {
369     Builder::new().handshake(io)
370 }
371 
372 // ===== impl Connection =====
373 
374 impl<T, B> Connection<T, B>
375 where
376     T: AsyncRead + AsyncWrite + Unpin,
377     B: Buf,
378 {
handshake2(io: T, builder: Builder) -> Handshake<T, B>379     fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
380         let span = tracing::trace_span!("server_handshake");
381         let entered = span.enter();
382 
383         // Create the codec.
384         let mut codec = Codec::new(io);
385 
386         if let Some(max) = builder.settings.max_frame_size() {
387             codec.set_max_recv_frame_size(max as usize);
388         }
389 
390         if let Some(max) = builder.settings.max_header_list_size() {
391             codec.set_max_recv_header_list_size(max as usize);
392         }
393 
394         // Send initial settings frame.
395         codec
396             .buffer(builder.settings.clone().into())
397             .expect("invalid SETTINGS frame");
398 
399         // Create the handshake future.
400         let state =
401             Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
402 
403         drop(entered);
404 
405         Handshake {
406             builder,
407             state,
408             span,
409         }
410     }
411 
412     /// Accept the next incoming request on this connection.
accept( &mut self, ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>413     pub async fn accept(
414         &mut self,
415     ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
416         futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
417     }
418 
419     #[doc(hidden)]
poll_accept( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>>420     pub fn poll_accept(
421         &mut self,
422         cx: &mut Context<'_>,
423     ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
424         // Always try to advance the internal state. Getting Pending also is
425         // needed to allow this function to return Pending.
426         if self.poll_closed(cx)?.is_ready() {
427             // If the socket is closed, don't return anything
428             // TODO: drop any pending streams
429             return Poll::Ready(None);
430         }
431 
432         if let Some(inner) = self.connection.next_incoming() {
433             tracing::trace!("received incoming");
434             let (head, _) = inner.take_request().into_parts();
435             let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
436 
437             let request = Request::from_parts(head, body);
438             let respond = SendResponse { inner };
439 
440             return Poll::Ready(Some(Ok((request, respond))));
441         }
442 
443         Poll::Pending
444     }
445 
446     /// Sets the target window size for the whole connection.
447     ///
448     /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
449     /// frame will be immediately sent to the remote, increasing the connection
450     /// level window by `size - current_value`.
451     ///
452     /// If `size` is less than the current value, nothing will happen
453     /// immediately. However, as window capacity is released by
454     /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
455     /// out until the number of "in flight" bytes drops below `size`.
456     ///
457     /// The default value is 65,535.
458     ///
459     /// See [`FlowControl`] documentation for more details.
460     ///
461     /// [`FlowControl`]: ../struct.FlowControl.html
462     /// [library level]: ../index.html#flow-control
set_target_window_size(&mut self, size: u32)463     pub fn set_target_window_size(&mut self, size: u32) {
464         assert!(size <= proto::MAX_WINDOW_SIZE);
465         self.connection.set_target_window_size(size);
466     }
467 
468     /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
469     /// flow control for received data.
470     ///
471     /// The `SETTINGS` will be sent to the remote, and only applied once the
472     /// remote acknowledges the change.
473     ///
474     /// This can be used to increase or decrease the window size for existing
475     /// streams.
476     ///
477     /// # Errors
478     ///
479     /// Returns an error if a previous call is still pending acknowledgement
480     /// from the remote endpoint.
set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error>481     pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
482         assert!(size <= proto::MAX_WINDOW_SIZE);
483         self.connection.set_initial_window_size(size)?;
484         Ok(())
485     }
486 
487     /// Enables the [extended CONNECT protocol].
488     ///
489     /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
490     ///
491     /// # Errors
492     ///
493     /// Returns an error if a previous call is still pending acknowledgement
494     /// from the remote endpoint.
enable_connect_protocol(&mut self) -> Result<(), crate::Error>495     pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> {
496         self.connection.set_enable_connect_protocol()?;
497         Ok(())
498     }
499 
500     /// Returns `Ready` when the underlying connection has closed.
501     ///
502     /// If any new inbound streams are received during a call to `poll_closed`,
503     /// they will be queued and returned on the next call to [`poll_accept`].
504     ///
505     /// This function will advance the internal connection state, driving
506     /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
507     ///
508     /// See [here](index.html#managing-the-connection) for more details.
509     ///
510     /// [`poll_accept`]: struct.Connection.html#method.poll_accept
511     /// [`RecvStream`]: ../struct.RecvStream.html
512     /// [`SendStream`]: ../struct.SendStream.html
poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>>513     pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
514         self.connection.poll(cx).map_err(Into::into)
515     }
516 
517     /// Sets the connection to a GOAWAY state.
518     ///
519     /// Does not terminate the connection. Must continue being polled to close
520     /// connection.
521     ///
522     /// After flushing the GOAWAY frame, the connection is closed. Any
523     /// outstanding streams do not prevent the connection from closing. This
524     /// should usually be reserved for shutting down when something bad
525     /// external to `h2` has happened, and open streams cannot be properly
526     /// handled.
527     ///
528     /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
abrupt_shutdown(&mut self, reason: Reason)529     pub fn abrupt_shutdown(&mut self, reason: Reason) {
530         self.connection.go_away_from_user(reason);
531     }
532 
533     /// Starts a [graceful shutdown][1] process.
534     ///
535     /// Must continue being polled to close connection.
536     ///
537     /// It's possible to receive more requests after calling this method, since
538     /// they might have been in-flight from the client already. After about
539     /// 1 RTT, no new requests should be accepted. Once all active streams
540     /// have completed, the connection is closed.
541     ///
542     /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
graceful_shutdown(&mut self)543     pub fn graceful_shutdown(&mut self) {
544         self.connection.go_away_gracefully();
545     }
546 
547     /// Takes a `PingPong` instance from the connection.
548     ///
549     /// # Note
550     ///
551     /// This may only be called once. Calling multiple times will return `None`.
ping_pong(&mut self) -> Option<PingPong>552     pub fn ping_pong(&mut self) -> Option<PingPong> {
553         self.connection.take_user_pings().map(PingPong::new)
554     }
555 
556     /// Returns the maximum number of concurrent streams that may be initiated
557     /// by the server on this connection.
558     ///
559     /// This limit is configured by the client peer by sending the
560     /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
561     /// This method returns the currently acknowledged value received from the
562     /// remote.
563     ///
564     /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
max_concurrent_send_streams(&self) -> usize565     pub fn max_concurrent_send_streams(&self) -> usize {
566         self.connection.max_send_streams()
567     }
568 
569     /// Returns the maximum number of concurrent streams that may be initiated
570     /// by the client on this connection.
571     ///
572     /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
573     /// parameter][1] sent in a `SETTINGS` frame that has been
574     /// acknowledged by the remote peer. The value to be sent is configured by
575     /// the [`Builder::max_concurrent_streams`][2] method before handshaking
576     /// with the remote peer.
577     ///
578     /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
579     /// [2]: ../struct.Builder.html#method.max_concurrent_streams
max_concurrent_recv_streams(&self) -> usize580     pub fn max_concurrent_recv_streams(&self) -> usize {
581         self.connection.max_recv_streams()
582     }
583 
584     // Could disappear at anytime.
585     #[doc(hidden)]
586     #[cfg(feature = "unstable")]
num_wired_streams(&self) -> usize587     pub fn num_wired_streams(&self) -> usize {
588         self.connection.num_wired_streams()
589     }
590 }
591 
592 #[cfg(feature = "stream")]
593 impl<T, B> futures_core::Stream for Connection<T, B>
594 where
595     T: AsyncRead + AsyncWrite + Unpin,
596     B: Buf,
597 {
598     type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
599 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>600     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
601         self.poll_accept(cx)
602     }
603 }
604 
605 impl<T, B> fmt::Debug for Connection<T, B>
606 where
607     T: fmt::Debug,
608     B: fmt::Debug + Buf,
609 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result610     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
611         fmt.debug_struct("Connection")
612             .field("connection", &self.connection)
613             .finish()
614     }
615 }
616 
617 // ===== impl Builder =====
618 
619 impl Builder {
620     /// Returns a new server builder instance initialized with default
621     /// configuration values.
622     ///
623     /// Configuration methods can be chained on the return value.
624     ///
625     /// # Examples
626     ///
627     /// ```
628     /// # use tokio::io::{AsyncRead, AsyncWrite};
629     /// # use h2::server::*;
630     /// #
631     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
632     /// # -> Handshake<T>
633     /// # {
634     /// // `server_fut` is a future representing the completion of the HTTP/2
635     /// // handshake.
636     /// let server_fut = Builder::new()
637     ///     .initial_window_size(1_000_000)
638     ///     .max_concurrent_streams(1000)
639     ///     .handshake(my_io);
640     /// # server_fut
641     /// # }
642     /// #
643     /// # pub fn main() {}
644     /// ```
new() -> Builder645     pub fn new() -> Builder {
646         Builder {
647             reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
648             reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
649             pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
650             settings: Settings::default(),
651             initial_target_connection_window_size: None,
652             max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
653 
654             local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
655         }
656     }
657 
658     /// Indicates the initial window size (in octets) for stream-level
659     /// flow control for received data.
660     ///
661     /// The initial window of a stream is used as part of flow control. For more
662     /// details, see [`FlowControl`].
663     ///
664     /// The default value is 65,535.
665     ///
666     /// [`FlowControl`]: ../struct.FlowControl.html
667     ///
668     /// # Examples
669     ///
670     /// ```
671     /// # use tokio::io::{AsyncRead, AsyncWrite};
672     /// # use h2::server::*;
673     /// #
674     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
675     /// # -> Handshake<T>
676     /// # {
677     /// // `server_fut` is a future representing the completion of the HTTP/2
678     /// // handshake.
679     /// let server_fut = Builder::new()
680     ///     .initial_window_size(1_000_000)
681     ///     .handshake(my_io);
682     /// # server_fut
683     /// # }
684     /// #
685     /// # pub fn main() {}
686     /// ```
initial_window_size(&mut self, size: u32) -> &mut Self687     pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
688         self.settings.set_initial_window_size(Some(size));
689         self
690     }
691 
692     /// Indicates the initial window size (in octets) for connection-level flow control
693     /// for received data.
694     ///
695     /// The initial window of a connection is used as part of flow control. For more details,
696     /// see [`FlowControl`].
697     ///
698     /// The default value is 65,535.
699     ///
700     /// [`FlowControl`]: ../struct.FlowControl.html
701     ///
702     /// # Examples
703     ///
704     /// ```
705     /// # use tokio::io::{AsyncRead, AsyncWrite};
706     /// # use h2::server::*;
707     /// #
708     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
709     /// # -> Handshake<T>
710     /// # {
711     /// // `server_fut` is a future representing the completion of the HTTP/2
712     /// // handshake.
713     /// let server_fut = Builder::new()
714     ///     .initial_connection_window_size(1_000_000)
715     ///     .handshake(my_io);
716     /// # server_fut
717     /// # }
718     /// #
719     /// # pub fn main() {}
720     /// ```
initial_connection_window_size(&mut self, size: u32) -> &mut Self721     pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
722         self.initial_target_connection_window_size = Some(size);
723         self
724     }
725 
726     /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
727     /// configured server is able to accept.
728     ///
729     /// The sender may send data frames that are **smaller** than this value,
730     /// but any data larger than `max` will be broken up into multiple `DATA`
731     /// frames.
732     ///
733     /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
734     ///
735     /// # Examples
736     ///
737     /// ```
738     /// # use tokio::io::{AsyncRead, AsyncWrite};
739     /// # use h2::server::*;
740     /// #
741     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
742     /// # -> Handshake<T>
743     /// # {
744     /// // `server_fut` is a future representing the completion of the HTTP/2
745     /// // handshake.
746     /// let server_fut = Builder::new()
747     ///     .max_frame_size(1_000_000)
748     ///     .handshake(my_io);
749     /// # server_fut
750     /// # }
751     /// #
752     /// # pub fn main() {}
753     /// ```
754     ///
755     /// # Panics
756     ///
757     /// This function panics if `max` is not within the legal range specified
758     /// above.
max_frame_size(&mut self, max: u32) -> &mut Self759     pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
760         self.settings.set_max_frame_size(Some(max));
761         self
762     }
763 
764     /// Sets the max size of received header frames.
765     ///
766     /// This advisory setting informs a peer of the maximum size of header list
767     /// that the sender is prepared to accept, in octets. The value is based on
768     /// the uncompressed size of header fields, including the length of the name
769     /// and value in octets plus an overhead of 32 octets for each header field.
770     ///
771     /// This setting is also used to limit the maximum amount of data that is
772     /// buffered to decode HEADERS frames.
773     ///
774     /// # Examples
775     ///
776     /// ```
777     /// # use tokio::io::{AsyncRead, AsyncWrite};
778     /// # use h2::server::*;
779     /// #
780     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
781     /// # -> Handshake<T>
782     /// # {
783     /// // `server_fut` is a future representing the completion of the HTTP/2
784     /// // handshake.
785     /// let server_fut = Builder::new()
786     ///     .max_header_list_size(16 * 1024)
787     ///     .handshake(my_io);
788     /// # server_fut
789     /// # }
790     /// #
791     /// # pub fn main() {}
792     /// ```
max_header_list_size(&mut self, max: u32) -> &mut Self793     pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
794         self.settings.set_max_header_list_size(Some(max));
795         self
796     }
797 
798     /// Sets the maximum number of concurrent streams.
799     ///
800     /// The maximum concurrent streams setting only controls the maximum number
801     /// of streams that can be initiated by the remote peer. In other words,
802     /// when this setting is set to 100, this does not limit the number of
803     /// concurrent streams that can be created by the caller.
804     ///
805     /// It is recommended that this value be no smaller than 100, so as to not
806     /// unnecessarily limit parallelism. However, any value is legal, including
807     /// 0. If `max` is set to 0, then the remote will not be permitted to
808     /// initiate streams.
809     ///
810     /// Note that streams in the reserved state, i.e., push promises that have
811     /// been reserved but the stream has not started, do not count against this
812     /// setting.
813     ///
814     /// Also note that if the remote *does* exceed the value set here, it is not
815     /// a protocol level error. Instead, the `h2` library will immediately reset
816     /// the stream.
817     ///
818     /// See [Section 5.1.2] in the HTTP/2 spec for more details.
819     ///
820     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
821     ///
822     /// # Examples
823     ///
824     /// ```
825     /// # use tokio::io::{AsyncRead, AsyncWrite};
826     /// # use h2::server::*;
827     /// #
828     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
829     /// # -> Handshake<T>
830     /// # {
831     /// // `server_fut` is a future representing the completion of the HTTP/2
832     /// // handshake.
833     /// let server_fut = Builder::new()
834     ///     .max_concurrent_streams(1000)
835     ///     .handshake(my_io);
836     /// # server_fut
837     /// # }
838     /// #
839     /// # pub fn main() {}
840     /// ```
max_concurrent_streams(&mut self, max: u32) -> &mut Self841     pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
842         self.settings.set_max_concurrent_streams(Some(max));
843         self
844     }
845 
846     /// Sets the maximum number of concurrent locally reset streams.
847     ///
848     /// When a stream is explicitly reset by either calling
849     /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
850     /// before completing the stream, the HTTP/2 specification requires that
851     /// any further frames received for that stream must be ignored for "some
852     /// time".
853     ///
854     /// In order to satisfy the specification, internal state must be maintained
855     /// to implement the behavior. This state grows linearly with the number of
856     /// streams that are locally reset.
857     ///
858     /// The `max_concurrent_reset_streams` setting configures sets an upper
859     /// bound on the amount of state that is maintained. When this max value is
860     /// reached, the oldest reset stream is purged from memory.
861     ///
862     /// Once the stream has been fully purged from memory, any additional frames
863     /// received for that stream will result in a connection level protocol
864     /// error, forcing the connection to terminate.
865     ///
866     /// The default value is 10.
867     ///
868     /// # Examples
869     ///
870     /// ```
871     /// # use tokio::io::{AsyncRead, AsyncWrite};
872     /// # use h2::server::*;
873     /// #
874     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
875     /// # -> Handshake<T>
876     /// # {
877     /// // `server_fut` is a future representing the completion of the HTTP/2
878     /// // handshake.
879     /// let server_fut = Builder::new()
880     ///     .max_concurrent_reset_streams(1000)
881     ///     .handshake(my_io);
882     /// # server_fut
883     /// # }
884     /// #
885     /// # pub fn main() {}
886     /// ```
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self887     pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
888         self.reset_stream_max = max;
889         self
890     }
891 
892     /// Sets the maximum number of local resets due to protocol errors made by the remote end.
893     ///
894     /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
895     /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
896     /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
897     ///
898     /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of
899     /// `ENHANCE_YOUR_CALM` to the client.
900     ///
901     /// If you really want to disable this, supply [`Option::None`] here.
902     /// Disabling this is not recommended and may expose you to DOS attacks.
903     ///
904     /// The default value is currently 1024, but could change.
max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self905     pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
906         self.local_max_error_reset_streams = max;
907         self
908     }
909 
910     /// Sets the maximum number of pending-accept remotely-reset streams.
911     ///
912     /// Streams that have been received by the peer, but not accepted by the
913     /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
914     /// could send a request and then shortly after, realize it is not needed,
915     /// sending a CANCEL.
916     ///
917     /// However, since those streams are now "closed", they don't count towards
918     /// the max concurrent streams. So, they will sit in the accept queue,
919     /// using memory.
920     ///
921     /// When the number of remotely-reset streams sitting in the pending-accept
922     /// queue reaches this maximum value, a connection error with the code of
923     /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
924     /// `Future`.
925     ///
926     /// The default value is currently 20, but could change.
927     ///
928     /// # Examples
929     ///
930     ///
931     /// ```
932     /// # use tokio::io::{AsyncRead, AsyncWrite};
933     /// # use h2::server::*;
934     /// #
935     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
936     /// # -> Handshake<T>
937     /// # {
938     /// // `server_fut` is a future representing the completion of the HTTP/2
939     /// // handshake.
940     /// let server_fut = Builder::new()
941     ///     .max_pending_accept_reset_streams(100)
942     ///     .handshake(my_io);
943     /// # server_fut
944     /// # }
945     /// #
946     /// # pub fn main() {}
947     /// ```
max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self948     pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
949         self.pending_accept_reset_stream_max = max;
950         self
951     }
952 
953     /// Sets the maximum send buffer size per stream.
954     ///
955     /// Once a stream has buffered up to (or over) the maximum, the stream's
956     /// flow control will not "poll" additional capacity. Once bytes for the
957     /// stream have been written to the connection, the send buffer capacity
958     /// will be freed up again.
959     ///
960     /// The default is currently ~400KB, but may change.
961     ///
962     /// # Panics
963     ///
964     /// This function panics if `max` is larger than `u32::MAX`.
max_send_buffer_size(&mut self, max: usize) -> &mut Self965     pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
966         assert!(max <= std::u32::MAX as usize);
967         self.max_send_buffer_size = max;
968         self
969     }
970 
971     /// Sets the maximum number of concurrent locally reset streams.
972     ///
973     /// When a stream is explicitly reset by either calling
974     /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
975     /// before completing the stream, the HTTP/2 specification requires that
976     /// any further frames received for that stream must be ignored for "some
977     /// time".
978     ///
979     /// In order to satisfy the specification, internal state must be maintained
980     /// to implement the behavior. This state grows linearly with the number of
981     /// streams that are locally reset.
982     ///
983     /// The `reset_stream_duration` setting configures the max amount of time
984     /// this state will be maintained in memory. Once the duration elapses, the
985     /// stream state is purged from memory.
986     ///
987     /// Once the stream has been fully purged from memory, any additional frames
988     /// received for that stream will result in a connection level protocol
989     /// error, forcing the connection to terminate.
990     ///
991     /// The default value is 30 seconds.
992     ///
993     /// # Examples
994     ///
995     /// ```
996     /// # use tokio::io::{AsyncRead, AsyncWrite};
997     /// # use h2::server::*;
998     /// # use std::time::Duration;
999     /// #
1000     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1001     /// # -> Handshake<T>
1002     /// # {
1003     /// // `server_fut` is a future representing the completion of the HTTP/2
1004     /// // handshake.
1005     /// let server_fut = Builder::new()
1006     ///     .reset_stream_duration(Duration::from_secs(10))
1007     ///     .handshake(my_io);
1008     /// # server_fut
1009     /// # }
1010     /// #
1011     /// # pub fn main() {}
1012     /// ```
reset_stream_duration(&mut self, dur: Duration) -> &mut Self1013     pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1014         self.reset_stream_duration = dur;
1015         self
1016     }
1017 
1018     /// Enables the [extended CONNECT protocol].
1019     ///
1020     /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
enable_connect_protocol(&mut self) -> &mut Self1021     pub fn enable_connect_protocol(&mut self) -> &mut Self {
1022         self.settings.set_enable_connect_protocol(Some(1));
1023         self
1024     }
1025 
1026     /// Creates a new configured HTTP/2 server backed by `io`.
1027     ///
1028     /// It is expected that `io` already be in an appropriate state to commence
1029     /// the [HTTP/2 handshake]. See [Handshake] for more details.
1030     ///
1031     /// Returns a future which resolves to the [`Connection`] instance once the
1032     /// HTTP/2 handshake has been completed.
1033     ///
1034     /// This function also allows the caller to configure the send payload data
1035     /// type. See [Outbound data type] for more details.
1036     ///
1037     /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1038     /// [Handshake]: ../index.html#handshake
1039     /// [`Connection`]: struct.Connection.html
1040     /// [Outbound data type]: ../index.html#outbound-data-type.
1041     ///
1042     /// # Examples
1043     ///
1044     /// Basic usage:
1045     ///
1046     /// ```
1047     /// # use tokio::io::{AsyncRead, AsyncWrite};
1048     /// # use h2::server::*;
1049     /// #
1050     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1051     /// # -> Handshake<T>
1052     /// # {
1053     /// // `server_fut` is a future representing the completion of the HTTP/2
1054     /// // handshake.
1055     /// let server_fut = Builder::new()
1056     ///     .handshake(my_io);
1057     /// # server_fut
1058     /// # }
1059     /// #
1060     /// # pub fn main() {}
1061     /// ```
1062     ///
1063     /// Configures the send-payload data type. In this case, the outbound data
1064     /// type will be `&'static [u8]`.
1065     ///
1066     /// ```
1067     /// # use tokio::io::{AsyncRead, AsyncWrite};
1068     /// # use h2::server::*;
1069     /// #
1070     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1071     /// # -> Handshake<T, &'static [u8]>
1072     /// # {
1073     /// // `server_fut` is a future representing the completion of the HTTP/2
1074     /// // handshake.
1075     /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1076     ///     .handshake(my_io);
1077     /// # server_fut
1078     /// # }
1079     /// #
1080     /// # pub fn main() {}
1081     /// ```
handshake<T, B>(&self, io: T) -> Handshake<T, B> where T: AsyncRead + AsyncWrite + Unpin, B: Buf,1082     pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1083     where
1084         T: AsyncRead + AsyncWrite + Unpin,
1085         B: Buf,
1086     {
1087         Connection::handshake2(io, self.clone())
1088     }
1089 }
1090 
1091 impl Default for Builder {
default() -> Builder1092     fn default() -> Builder {
1093         Builder::new()
1094     }
1095 }
1096 
1097 // ===== impl SendResponse =====
1098 
1099 impl<B: Buf> SendResponse<B> {
1100     /// Send a response to a client request.
1101     ///
1102     /// On success, a [`SendStream`] instance is returned. This instance can be
1103     /// used to stream the response body and send trailers.
1104     ///
1105     /// If a body or trailers will be sent on the returned [`SendStream`]
1106     /// instance, then `end_of_stream` must be set to `false` when calling this
1107     /// function.
1108     ///
1109     /// The [`SendResponse`] instance is already associated with a received
1110     /// request.  This function may only be called once per instance and only if
1111     /// [`send_reset`] has not been previously called.
1112     ///
1113     /// [`SendResponse`]: #
1114     /// [`SendStream`]: ../struct.SendStream.html
1115     /// [`send_reset`]: #method.send_reset
send_response( &mut self, response: Response<()>, end_of_stream: bool, ) -> Result<SendStream<B>, crate::Error>1116     pub fn send_response(
1117         &mut self,
1118         response: Response<()>,
1119         end_of_stream: bool,
1120     ) -> Result<SendStream<B>, crate::Error> {
1121         self.inner
1122             .send_response(response, end_of_stream)
1123             .map(|_| SendStream::new(self.inner.clone()))
1124             .map_err(Into::into)
1125     }
1126 
1127     /// Push a request and response to the client
1128     ///
1129     /// On success, a [`SendResponse`] instance is returned.
1130     ///
1131     /// [`SendResponse`]: #
push_request( &mut self, request: Request<()>, ) -> Result<SendPushedResponse<B>, crate::Error>1132     pub fn push_request(
1133         &mut self,
1134         request: Request<()>,
1135     ) -> Result<SendPushedResponse<B>, crate::Error> {
1136         self.inner
1137             .send_push_promise(request)
1138             .map(|inner| SendPushedResponse {
1139                 inner: SendResponse { inner },
1140             })
1141             .map_err(Into::into)
1142     }
1143 
1144     /// Send a stream reset to the peer.
1145     ///
1146     /// This essentially cancels the stream, including any inbound or outbound
1147     /// data streams.
1148     ///
1149     /// If this function is called before [`send_response`], a call to
1150     /// [`send_response`] will result in an error.
1151     ///
1152     /// If this function is called while a [`SendStream`] instance is active,
1153     /// any further use of the instance will result in an error.
1154     ///
1155     /// This function should only be called once.
1156     ///
1157     /// [`send_response`]: #method.send_response
1158     /// [`SendStream`]: ../struct.SendStream.html
send_reset(&mut self, reason: Reason)1159     pub fn send_reset(&mut self, reason: Reason) {
1160         self.inner.send_reset(reason)
1161     }
1162 
1163     /// Polls to be notified when the client resets this stream.
1164     ///
1165     /// If stream is still open, this returns `Poll::Pending`, and
1166     /// registers the task to be notified if a `RST_STREAM` is received.
1167     ///
1168     /// If a `RST_STREAM` frame is received for this stream, calling this
1169     /// method will yield the `Reason` for the reset.
1170     ///
1171     /// # Error
1172     ///
1173     /// Calling this method after having called `send_response` will return
1174     /// a user error.
poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>>1175     pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1176         self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1177     }
1178 
1179     /// Returns the stream ID of the response stream.
1180     ///
1181     /// # Panics
1182     ///
1183     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1184     pub fn stream_id(&self) -> crate::StreamId {
1185         crate::StreamId::from_internal(self.inner.stream_id())
1186     }
1187 }
1188 
1189 // ===== impl SendPushedResponse =====
1190 
1191 impl<B: Buf> SendPushedResponse<B> {
1192     /// Send a response to a promised request.
1193     ///
1194     /// On success, a [`SendStream`] instance is returned. This instance can be
1195     /// used to stream the response body and send trailers.
1196     ///
1197     /// If a body or trailers will be sent on the returned [`SendStream`]
1198     /// instance, then `end_of_stream` must be set to `false` when calling this
1199     /// function.
1200     ///
1201     /// The [`SendPushedResponse`] instance is associated with a promised
1202     /// request.  This function may only be called once per instance and only if
1203     /// [`send_reset`] has not been previously called.
1204     ///
1205     /// [`SendPushedResponse`]: #
1206     /// [`SendStream`]: ../struct.SendStream.html
1207     /// [`send_reset`]: #method.send_reset
send_response( &mut self, response: Response<()>, end_of_stream: bool, ) -> Result<SendStream<B>, crate::Error>1208     pub fn send_response(
1209         &mut self,
1210         response: Response<()>,
1211         end_of_stream: bool,
1212     ) -> Result<SendStream<B>, crate::Error> {
1213         self.inner.send_response(response, end_of_stream)
1214     }
1215 
1216     /// Send a stream reset to the peer.
1217     ///
1218     /// This essentially cancels the stream, including any inbound or outbound
1219     /// data streams.
1220     ///
1221     /// If this function is called before [`send_response`], a call to
1222     /// [`send_response`] will result in an error.
1223     ///
1224     /// If this function is called while a [`SendStream`] instance is active,
1225     /// any further use of the instance will result in an error.
1226     ///
1227     /// This function should only be called once.
1228     ///
1229     /// [`send_response`]: #method.send_response
1230     /// [`SendStream`]: ../struct.SendStream.html
send_reset(&mut self, reason: Reason)1231     pub fn send_reset(&mut self, reason: Reason) {
1232         self.inner.send_reset(reason)
1233     }
1234 
1235     /// Polls to be notified when the client resets this stream.
1236     ///
1237     /// If stream is still open, this returns `Poll::Pending`, and
1238     /// registers the task to be notified if a `RST_STREAM` is received.
1239     ///
1240     /// If a `RST_STREAM` frame is received for this stream, calling this
1241     /// method will yield the `Reason` for the reset.
1242     ///
1243     /// # Error
1244     ///
1245     /// Calling this method after having called `send_response` will return
1246     /// a user error.
poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>>1247     pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1248         self.inner.poll_reset(cx)
1249     }
1250 
1251     /// Returns the stream ID of the response stream.
1252     ///
1253     /// # Panics
1254     ///
1255     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1256     pub fn stream_id(&self) -> crate::StreamId {
1257         self.inner.stream_id()
1258     }
1259 }
1260 
1261 // ===== impl Flush =====
1262 
1263 impl<T, B: Buf> Flush<T, B> {
new(codec: Codec<T, B>) -> Self1264     fn new(codec: Codec<T, B>) -> Self {
1265         Flush { codec: Some(codec) }
1266     }
1267 }
1268 
1269 impl<T, B> Future for Flush<T, B>
1270 where
1271     T: AsyncWrite + Unpin,
1272     B: Buf,
1273 {
1274     type Output = Result<Codec<T, B>, crate::Error>;
1275 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1276     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1277         // Flush the codec
1278         ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1279 
1280         // Return the codec
1281         Poll::Ready(Ok(self.codec.take().unwrap()))
1282     }
1283 }
1284 
1285 impl<T, B: Buf> ReadPreface<T, B> {
new(codec: Codec<T, B>) -> Self1286     fn new(codec: Codec<T, B>) -> Self {
1287         ReadPreface {
1288             codec: Some(codec),
1289             pos: 0,
1290         }
1291     }
1292 
inner_mut(&mut self) -> &mut T1293     fn inner_mut(&mut self) -> &mut T {
1294         self.codec.as_mut().unwrap().get_mut()
1295     }
1296 }
1297 
1298 impl<T, B> Future for ReadPreface<T, B>
1299 where
1300     T: AsyncRead + Unpin,
1301     B: Buf,
1302 {
1303     type Output = Result<Codec<T, B>, crate::Error>;
1304 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1305     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1306         let mut buf = [0; 24];
1307         let mut rem = PREFACE.len() - self.pos;
1308 
1309         while rem > 0 {
1310             let mut buf = ReadBuf::new(&mut buf[..rem]);
1311             ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1312                 .map_err(crate::Error::from_io)?;
1313             let n = buf.filled().len();
1314             if n == 0 {
1315                 return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1316                     io::ErrorKind::UnexpectedEof,
1317                     "connection closed before reading preface",
1318                 ))));
1319             }
1320 
1321             if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1322                 proto_err!(conn: "read_preface: invalid preface");
1323                 // TODO: Should this just write the GO_AWAY frame directly?
1324                 return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1325             }
1326 
1327             self.pos += n;
1328             rem -= n; // TODO test
1329         }
1330 
1331         Poll::Ready(Ok(self.codec.take().unwrap()))
1332     }
1333 }
1334 
1335 // ===== impl Handshake =====
1336 
1337 impl<T, B: Buf> Future for Handshake<T, B>
1338 where
1339     T: AsyncRead + AsyncWrite + Unpin,
1340     B: Buf,
1341 {
1342     type Output = Result<Connection<T, B>, crate::Error>;
1343 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1344     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1345         let span = self.span.clone(); // XXX(eliza): T_T
1346         let _e = span.enter();
1347         tracing::trace!(state = ?self.state);
1348 
1349         loop {
1350             match &mut self.state {
1351                 Handshaking::Flushing(flush) => {
1352                     // We're currently flushing a pending SETTINGS frame. Poll the
1353                     // flush future, and, if it's completed, advance our state to wait
1354                     // for the client preface.
1355                     let codec = match Pin::new(flush).poll(cx)? {
1356                         Poll::Pending => {
1357                             tracing::trace!(flush.poll = %"Pending");
1358                             return Poll::Pending;
1359                         }
1360                         Poll::Ready(flushed) => {
1361                             tracing::trace!(flush.poll = %"Ready");
1362                             flushed
1363                         }
1364                     };
1365                     self.state = Handshaking::ReadingPreface(
1366                         ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1367                     );
1368                 }
1369                 Handshaking::ReadingPreface(read) => {
1370                     let codec = ready!(Pin::new(read).poll(cx)?);
1371 
1372                     self.state = Handshaking::Done;
1373 
1374                     let connection = proto::Connection::new(
1375                         codec,
1376                         Config {
1377                             next_stream_id: 2.into(),
1378                             // Server does not need to locally initiate any streams
1379                             initial_max_send_streams: 0,
1380                             max_send_buffer_size: self.builder.max_send_buffer_size,
1381                             reset_stream_duration: self.builder.reset_stream_duration,
1382                             reset_stream_max: self.builder.reset_stream_max,
1383                             remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1384                             local_error_reset_streams_max: self
1385                                 .builder
1386                                 .local_max_error_reset_streams,
1387                             settings: self.builder.settings.clone(),
1388                         },
1389                     );
1390 
1391                     tracing::trace!("connection established!");
1392                     let mut c = Connection { connection };
1393                     if let Some(sz) = self.builder.initial_target_connection_window_size {
1394                         c.set_target_window_size(sz);
1395                     }
1396 
1397                     return Poll::Ready(Ok(c));
1398                 }
1399                 Handshaking::Done => {
1400                     panic!("Handshaking::poll() called again after handshaking was complete")
1401                 }
1402             }
1403         }
1404     }
1405 }
1406 
1407 impl<T, B> fmt::Debug for Handshake<T, B>
1408 where
1409     T: AsyncRead + AsyncWrite + fmt::Debug,
1410     B: fmt::Debug + Buf,
1411 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1412     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1413         write!(fmt, "server::Handshake")
1414     }
1415 }
1416 
1417 impl Peer {
convert_send_message( id: StreamId, response: Response<()>, end_of_stream: bool, ) -> frame::Headers1418     pub fn convert_send_message(
1419         id: StreamId,
1420         response: Response<()>,
1421         end_of_stream: bool,
1422     ) -> frame::Headers {
1423         use http::response::Parts;
1424 
1425         // Extract the components of the HTTP request
1426         let (
1427             Parts {
1428                 status, headers, ..
1429             },
1430             _,
1431         ) = response.into_parts();
1432 
1433         // Build the set pseudo header set. All requests will include `method`
1434         // and `path`.
1435         let pseudo = Pseudo::response(status);
1436 
1437         // Create the HEADERS frame
1438         let mut frame = frame::Headers::new(id, pseudo, headers);
1439 
1440         if end_of_stream {
1441             frame.set_end_stream()
1442         }
1443 
1444         frame
1445     }
1446 
convert_push_message( stream_id: StreamId, promised_id: StreamId, request: Request<()>, ) -> Result<frame::PushPromise, UserError>1447     pub fn convert_push_message(
1448         stream_id: StreamId,
1449         promised_id: StreamId,
1450         request: Request<()>,
1451     ) -> Result<frame::PushPromise, UserError> {
1452         use http::request::Parts;
1453 
1454         if let Err(e) = frame::PushPromise::validate_request(&request) {
1455             use PushPromiseHeaderError::*;
1456             match e {
1457                 NotSafeAndCacheable => tracing::debug!(
1458                     ?promised_id,
1459                     "convert_push_message: method {} is not safe and cacheable",
1460                     request.method(),
1461                 ),
1462                 InvalidContentLength(e) => tracing::debug!(
1463                     ?promised_id,
1464                     "convert_push_message; promised request has invalid content-length {:?}",
1465                     e,
1466                 ),
1467             }
1468             return Err(UserError::MalformedHeaders);
1469         }
1470 
1471         // Extract the components of the HTTP request
1472         let (
1473             Parts {
1474                 method,
1475                 uri,
1476                 headers,
1477                 ..
1478             },
1479             _,
1480         ) = request.into_parts();
1481 
1482         let pseudo = Pseudo::request(method, uri, None);
1483 
1484         Ok(frame::PushPromise::new(
1485             stream_id,
1486             promised_id,
1487             pseudo,
1488             headers,
1489         ))
1490     }
1491 }
1492 
1493 impl proto::Peer for Peer {
1494     type Poll = Request<()>;
1495 
1496     const NAME: &'static str = "Server";
1497 
1498     /*
1499     fn is_server() -> bool {
1500         true
1501     }
1502     */
1503 
1504     fn r#dyn() -> proto::DynPeer {
1505         proto::DynPeer::Server
1506     }
1507 
convert_poll_message( pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, ) -> Result<Self::Poll, Error>1508     fn convert_poll_message(
1509         pseudo: Pseudo,
1510         fields: HeaderMap,
1511         stream_id: StreamId,
1512     ) -> Result<Self::Poll, Error> {
1513         use http::{uri, Version};
1514 
1515         let mut b = Request::builder();
1516 
1517         macro_rules! malformed {
1518             ($($arg:tt)*) => {{
1519                 tracing::debug!($($arg)*);
1520                 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1521             }}
1522         }
1523 
1524         b = b.version(Version::HTTP_2);
1525 
1526         let is_connect;
1527         if let Some(method) = pseudo.method {
1528             is_connect = method == Method::CONNECT;
1529             b = b.method(method);
1530         } else {
1531             malformed!("malformed headers: missing method");
1532         }
1533 
1534         let has_protocol = pseudo.protocol.is_some();
1535         if has_protocol {
1536             if is_connect {
1537                 // Assert that we have the right type.
1538                 b = b.extension::<crate::ext::Protocol>(pseudo.protocol.unwrap());
1539             } else {
1540                 malformed!("malformed headers: :protocol on non-CONNECT request");
1541             }
1542         }
1543 
1544         if pseudo.status.is_some() {
1545             malformed!("malformed headers: :status field on request");
1546         }
1547 
1548         // Convert the URI
1549         let mut parts = uri::Parts::default();
1550 
1551         // A request translated from HTTP/1 must not include the :authority
1552         // header
1553         if let Some(authority) = pseudo.authority {
1554             let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1555             parts.authority = Some(maybe_authority.or_else(|why| {
1556                 malformed!(
1557                     "malformed headers: malformed authority ({:?}): {}",
1558                     authority,
1559                     why,
1560                 )
1561             })?);
1562         }
1563 
1564         // A :scheme is required, except CONNECT.
1565         if let Some(scheme) = pseudo.scheme {
1566             if is_connect && !has_protocol {
1567                 malformed!("malformed headers: :scheme in CONNECT");
1568             }
1569             let maybe_scheme = scheme.parse();
1570             let scheme = maybe_scheme.or_else(|why| {
1571                 malformed!(
1572                     "malformed headers: malformed scheme ({:?}): {}",
1573                     scheme,
1574                     why,
1575                 )
1576             })?;
1577 
1578             // It's not possible to build an `Uri` from a scheme and path. So,
1579             // after validating is was a valid scheme, we just have to drop it
1580             // if there isn't an :authority.
1581             if parts.authority.is_some() {
1582                 parts.scheme = Some(scheme);
1583             }
1584         } else if !is_connect || has_protocol {
1585             malformed!("malformed headers: missing scheme");
1586         }
1587 
1588         if let Some(path) = pseudo.path {
1589             if is_connect && !has_protocol {
1590                 malformed!("malformed headers: :path in CONNECT");
1591             }
1592 
1593             // This cannot be empty
1594             if path.is_empty() {
1595                 malformed!("malformed headers: missing path");
1596             }
1597 
1598             let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1599             parts.path_and_query = Some(maybe_path.or_else(|why| {
1600                 malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1601             })?);
1602         } else if is_connect && has_protocol {
1603             malformed!("malformed headers: missing path in extended CONNECT");
1604         }
1605 
1606         b = b.uri(parts);
1607 
1608         let mut request = match b.body(()) {
1609             Ok(request) => request,
1610             Err(e) => {
1611                 // TODO: Should there be more specialized handling for different
1612                 // kinds of errors
1613                 proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1614                 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1615             }
1616         };
1617 
1618         *request.headers_mut() = fields;
1619 
1620         Ok(request)
1621     }
1622 }
1623 
1624 // ===== impl Handshaking =====
1625 
1626 impl<T, B> fmt::Debug for Handshaking<T, B>
1627 where
1628     B: Buf,
1629 {
1630     #[inline]
fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error>1631     fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1632         match *self {
1633             Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1634             Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1635             Handshaking::Done => f.write_str("Done"),
1636         }
1637     }
1638 }
1639