1 //! Server implementation of the HTTP/2 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2 server requires the caller to manage accepting the
6 //! connections as well as getting the connections to a state that is ready to
7 //! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpListener`] to accept
11 //! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12 //! upgrades.
13 //!
14 //! Once a connection is obtained, it is passed to [`handshake`],
15 //! which will begin the [HTTP/2 handshake]. This returns a future that
16 //! completes once the handshake process is performed and HTTP/2 streams may
17 //! be received.
18 //!
19 //! [`handshake`] uses default configuration values. There are a number of
20 //! settings that can be changed by using [`Builder`] instead.
21 //!
22 //! # Inbound streams
23 //!
24 //! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25 //! does this by implementing [`futures::Stream`]. When a new stream is
26 //! received, a call to [`Connection::accept`] will return `(request, response)`.
27 //! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28 //! HTTP request head as well as provides a way to receive the inbound data
29 //! stream and the trailers. The `response` handle (of type [`SendResponse`])
30 //! allows responding to the request, stream the response payload, send
31 //! trailers, and send push promises.
32 //!
33 //! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34 //! can be operated independently.
35 //!
36 //! # Managing the connection
37 //!
38 //! The [`Connection`] instance is used to manage connection state. The caller
39 //! is required to call either [`Connection::accept`] or
40 //! [`Connection::poll_close`] in order to advance the connection state. Simply
41 //! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42 //! connection state is advanced.
43 //!
44 //! It is not required to call **both** [`Connection::accept`] and
45 //! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46 //! then only [`Connection::accept`] should be called. When the caller **does
47 //! not** want to accept a new stream, [`Connection::poll_close`] should be
48 //! called.
49 //!
50 //! The [`Connection`] instance should only be dropped once
51 //! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52 //! returns `Ready(None)`, there will no longer be any more inbound streams. At
53 //! this point, only [`Connection::poll_close`] should be called.
54 //!
55 //! # Shutting down the server
56 //!
57 //! Graceful shutdown of the server is [not yet
58 //! implemented](https://github.com/hyperium/h2/issues/69).
59 //!
60 //! # Example
61 //!
62 //! A basic HTTP/2 server example that runs over TCP and assumes [prior
63 //! knowledge], i.e. both the client and the server assume that the TCP socket
64 //! will use the HTTP/2 protocol without prior negotiation.
65 //!
66 //! ```no_run
67 //! use h2::server;
68 //! use http::{Response, StatusCode};
69 //! use tokio::net::TcpListener;
70 //!
71 //! #[tokio::main]
72 //! pub async fn main() {
73 //! let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74 //!
75 //! // Accept all incoming TCP connections.
76 //! loop {
77 //! if let Ok((socket, _peer_addr)) = listener.accept().await {
78 //! // Spawn a new task to process each connection.
79 //! tokio::spawn(async {
80 //! // Start the HTTP/2 connection handshake
81 //! let mut h2 = server::handshake(socket).await.unwrap();
82 //! // Accept all inbound HTTP/2 streams sent over the
83 //! // connection.
84 //! while let Some(request) = h2.accept().await {
85 //! let (request, mut respond) = request.unwrap();
86 //! println!("Received request: {:?}", request);
87 //!
88 //! // Build a response with no body
89 //! let response = Response::builder()
90 //! .status(StatusCode::OK)
91 //! .body(())
92 //! .unwrap();
93 //!
94 //! // Send the response back to the client
95 //! respond.send_response(response, true)
96 //! .unwrap();
97 //! }
98 //!
99 //! });
100 //! }
101 //! }
102 //! }
103 //! ```
104 //!
105 //! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106 //! [`handshake`]: fn.handshake.html
107 //! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108 //! [`Builder`]: struct.Builder.html
109 //! [`Connection`]: struct.Connection.html
110 //! [`Connection::poll`]: struct.Connection.html#method.poll
111 //! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112 //! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113 //! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114 //! [`RecvStream`]: ../struct.RecvStream.html
115 //! [`SendStream`]: ../struct.SendStream.html
116 //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118 use crate::codec::{Codec, UserError};
119 use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120 use crate::proto::{self, Config, Error, Prioritized};
121 use crate::{FlowControl, PingPong, RecvStream, SendStream};
122
123 use bytes::{Buf, Bytes};
124 use http::{HeaderMap, Method, Request, Response};
125 use std::future::Future;
126 use std::pin::Pin;
127 use std::task::{Context, Poll};
128 use std::time::Duration;
129 use std::{fmt, io};
130 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
131 use tracing::instrument::{Instrument, Instrumented};
132
133 /// In progress HTTP/2 connection handshake future.
134 ///
135 /// This type implements `Future`, yielding a `Connection` instance once the
136 /// handshake has completed.
137 ///
138 /// The handshake is completed once the connection preface is fully received
139 /// from the client **and** the initial settings frame is sent to the client.
140 ///
141 /// The handshake future does not wait for the initial settings frame from the
142 /// client.
143 ///
144 /// See [module] level docs for more details.
145 ///
146 /// [module]: index.html
147 #[must_use = "futures do nothing unless polled"]
148 pub struct Handshake<T, B: Buf = Bytes> {
149 /// The config to pass to Connection::new after handshake succeeds.
150 builder: Builder,
151 /// The current state of the handshake.
152 state: Handshaking<T, B>,
153 /// Span tracking the handshake
154 span: tracing::Span,
155 }
156
157 /// Accepts inbound HTTP/2 streams on a connection.
158 ///
159 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
160 /// implements the HTTP/2 server logic for that connection. It is responsible
161 /// for receiving inbound streams initiated by the client as well as driving the
162 /// internal state forward.
163 ///
164 /// `Connection` values are created by calling [`handshake`]. Once a
165 /// `Connection` value is obtained, the caller must call [`poll`] or
166 /// [`poll_close`] in order to drive the internal connection state forward.
167 ///
168 /// See [module level] documentation for more details
169 ///
170 /// [module level]: index.html
171 /// [`handshake`]: struct.Connection.html#method.handshake
172 /// [`poll`]: struct.Connection.html#method.poll
173 /// [`poll_close`]: struct.Connection.html#method.poll_close
174 ///
175 /// # Examples
176 ///
177 /// ```
178 /// # use tokio::io::{AsyncRead, AsyncWrite};
179 /// # use h2::server;
180 /// # use h2::server::*;
181 /// #
182 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
183 /// let mut server = server::handshake(my_io).await.unwrap();
184 /// while let Some(request) = server.accept().await {
185 /// tokio::spawn(async move {
186 /// let (request, respond) = request.unwrap();
187 /// // Process the request and send the response back to the client
188 /// // using `respond`.
189 /// });
190 /// }
191 /// # }
192 /// #
193 /// # pub fn main() {}
194 /// ```
195 #[must_use = "streams do nothing unless polled"]
196 pub struct Connection<T, B: Buf> {
197 connection: proto::Connection<T, Peer, B>,
198 }
199
200 /// Builds server connections with custom configuration values.
201 ///
202 /// Methods can be chained in order to set the configuration values.
203 ///
204 /// The server is constructed by calling [`handshake`] and passing the I/O
205 /// handle that will back the HTTP/2 server.
206 ///
207 /// New instances of `Builder` are obtained via [`Builder::new`].
208 ///
209 /// See function level documentation for details on the various server
210 /// configuration settings.
211 ///
212 /// [`Builder::new`]: struct.Builder.html#method.new
213 /// [`handshake`]: struct.Builder.html#method.handshake
214 ///
215 /// # Examples
216 ///
217 /// ```
218 /// # use tokio::io::{AsyncRead, AsyncWrite};
219 /// # use h2::server::*;
220 /// #
221 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
222 /// # -> Handshake<T>
223 /// # {
224 /// // `server_fut` is a future representing the completion of the HTTP/2
225 /// // handshake.
226 /// let server_fut = Builder::new()
227 /// .initial_window_size(1_000_000)
228 /// .max_concurrent_streams(1000)
229 /// .handshake(my_io);
230 /// # server_fut
231 /// # }
232 /// #
233 /// # pub fn main() {}
234 /// ```
235 #[derive(Clone, Debug)]
236 pub struct Builder {
237 /// Time to keep locally reset streams around before reaping.
238 reset_stream_duration: Duration,
239
240 /// Maximum number of locally reset streams to keep at a time.
241 reset_stream_max: usize,
242
243 /// Maximum number of remotely reset streams to allow in the pending
244 /// accept queue.
245 pending_accept_reset_stream_max: usize,
246
247 /// Initial `Settings` frame to send as part of the handshake.
248 settings: Settings,
249
250 /// Initial target window size for new connections.
251 initial_target_connection_window_size: Option<u32>,
252
253 /// Maximum amount of bytes to "buffer" for writing per stream.
254 max_send_buffer_size: usize,
255
256 /// Maximum number of locally reset streams due to protocol error across
257 /// the lifetime of the connection.
258 ///
259 /// When this gets exceeded, we issue GOAWAYs.
260 local_max_error_reset_streams: Option<usize>,
261 }
262
263 /// Send a response back to the client
264 ///
265 /// A `SendResponse` instance is provided when receiving a request and is used
266 /// to send the associated response back to the client. It is also used to
267 /// explicitly reset the stream with a custom reason.
268 ///
269 /// It will also be used to initiate push promises linked with the associated
270 /// stream.
271 ///
272 /// If the `SendResponse` instance is dropped without sending a response, then
273 /// the HTTP/2 stream will be reset.
274 ///
275 /// See [module] level docs for more details.
276 ///
277 /// [module]: index.html
278 #[derive(Debug)]
279 pub struct SendResponse<B: Buf> {
280 inner: proto::StreamRef<B>,
281 }
282
283 /// Send a response to a promised request
284 ///
285 /// A `SendPushedResponse` instance is provided when promising a request and is used
286 /// to send the associated response to the client. It is also used to
287 /// explicitly reset the stream with a custom reason.
288 ///
289 /// It can not be used to initiate push promises.
290 ///
291 /// If the `SendPushedResponse` instance is dropped without sending a response, then
292 /// the HTTP/2 stream will be reset.
293 ///
294 /// See [module] level docs for more details.
295 ///
296 /// [module]: index.html
297 pub struct SendPushedResponse<B: Buf> {
298 inner: SendResponse<B>,
299 }
300
301 // Manual implementation necessary because of rust-lang/rust#26925
302 impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result303 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
304 write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
305 }
306 }
307
308 /// Stages of an in-progress handshake.
309 enum Handshaking<T, B: Buf> {
310 /// State 1. Connection is flushing pending SETTINGS frame.
311 Flushing(Instrumented<Flush<T, Prioritized<B>>>),
312 /// State 2. Connection is waiting for the client preface.
313 ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
314 /// State 3. Handshake is done, polling again would panic.
315 Done,
316 }
317
318 /// Flush a Sink
319 struct Flush<T, B> {
320 codec: Option<Codec<T, B>>,
321 }
322
323 /// Read the client connection preface
324 struct ReadPreface<T, B> {
325 codec: Option<Codec<T, B>>,
326 pos: usize,
327 }
328
329 #[derive(Debug)]
330 pub(crate) struct Peer;
331
332 const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
333
334 /// Creates a new configured HTTP/2 server with default configuration
335 /// values backed by `io`.
336 ///
337 /// It is expected that `io` already be in an appropriate state to commence
338 /// the [HTTP/2 handshake]. See [Handshake] for more details.
339 ///
340 /// Returns a future which resolves to the [`Connection`] instance once the
341 /// HTTP/2 handshake has been completed. The returned [`Connection`]
342 /// instance will be using default configuration values. Use [`Builder`] to
343 /// customize the configuration values used by a [`Connection`] instance.
344 ///
345 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
346 /// [Handshake]: ../index.html#handshake
347 /// [`Connection`]: struct.Connection.html
348 ///
349 /// # Examples
350 ///
351 /// ```
352 /// # use tokio::io::{AsyncRead, AsyncWrite};
353 /// # use h2::server;
354 /// # use h2::server::*;
355 /// #
356 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
357 /// # {
358 /// let connection = server::handshake(my_io).await.unwrap();
359 /// // The HTTP/2 handshake has completed, now use `connection` to
360 /// // accept inbound HTTP/2 streams.
361 /// # }
362 /// #
363 /// # pub fn main() {}
364 /// ```
handshake<T>(io: T) -> Handshake<T, Bytes> where T: AsyncRead + AsyncWrite + Unpin,365 pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
366 where
367 T: AsyncRead + AsyncWrite + Unpin,
368 {
369 Builder::new().handshake(io)
370 }
371
372 // ===== impl Connection =====
373
374 impl<T, B> Connection<T, B>
375 where
376 T: AsyncRead + AsyncWrite + Unpin,
377 B: Buf,
378 {
handshake2(io: T, builder: Builder) -> Handshake<T, B>379 fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
380 let span = tracing::trace_span!("server_handshake");
381 let entered = span.enter();
382
383 // Create the codec.
384 let mut codec = Codec::new(io);
385
386 if let Some(max) = builder.settings.max_frame_size() {
387 codec.set_max_recv_frame_size(max as usize);
388 }
389
390 if let Some(max) = builder.settings.max_header_list_size() {
391 codec.set_max_recv_header_list_size(max as usize);
392 }
393
394 // Send initial settings frame.
395 codec
396 .buffer(builder.settings.clone().into())
397 .expect("invalid SETTINGS frame");
398
399 // Create the handshake future.
400 let state =
401 Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
402
403 drop(entered);
404
405 Handshake {
406 builder,
407 state,
408 span,
409 }
410 }
411
412 /// Accept the next incoming request on this connection.
accept( &mut self, ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>413 pub async fn accept(
414 &mut self,
415 ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
416 futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
417 }
418
419 #[doc(hidden)]
poll_accept( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>>420 pub fn poll_accept(
421 &mut self,
422 cx: &mut Context<'_>,
423 ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
424 // Always try to advance the internal state. Getting Pending also is
425 // needed to allow this function to return Pending.
426 if self.poll_closed(cx)?.is_ready() {
427 // If the socket is closed, don't return anything
428 // TODO: drop any pending streams
429 return Poll::Ready(None);
430 }
431
432 if let Some(inner) = self.connection.next_incoming() {
433 tracing::trace!("received incoming");
434 let (head, _) = inner.take_request().into_parts();
435 let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
436
437 let request = Request::from_parts(head, body);
438 let respond = SendResponse { inner };
439
440 return Poll::Ready(Some(Ok((request, respond))));
441 }
442
443 Poll::Pending
444 }
445
446 /// Sets the target window size for the whole connection.
447 ///
448 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
449 /// frame will be immediately sent to the remote, increasing the connection
450 /// level window by `size - current_value`.
451 ///
452 /// If `size` is less than the current value, nothing will happen
453 /// immediately. However, as window capacity is released by
454 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
455 /// out until the number of "in flight" bytes drops below `size`.
456 ///
457 /// The default value is 65,535.
458 ///
459 /// See [`FlowControl`] documentation for more details.
460 ///
461 /// [`FlowControl`]: ../struct.FlowControl.html
462 /// [library level]: ../index.html#flow-control
set_target_window_size(&mut self, size: u32)463 pub fn set_target_window_size(&mut self, size: u32) {
464 assert!(size <= proto::MAX_WINDOW_SIZE);
465 self.connection.set_target_window_size(size);
466 }
467
468 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
469 /// flow control for received data.
470 ///
471 /// The `SETTINGS` will be sent to the remote, and only applied once the
472 /// remote acknowledges the change.
473 ///
474 /// This can be used to increase or decrease the window size for existing
475 /// streams.
476 ///
477 /// # Errors
478 ///
479 /// Returns an error if a previous call is still pending acknowledgement
480 /// from the remote endpoint.
set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error>481 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
482 assert!(size <= proto::MAX_WINDOW_SIZE);
483 self.connection.set_initial_window_size(size)?;
484 Ok(())
485 }
486
487 /// Enables the [extended CONNECT protocol].
488 ///
489 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
490 ///
491 /// # Errors
492 ///
493 /// Returns an error if a previous call is still pending acknowledgement
494 /// from the remote endpoint.
enable_connect_protocol(&mut self) -> Result<(), crate::Error>495 pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> {
496 self.connection.set_enable_connect_protocol()?;
497 Ok(())
498 }
499
500 /// Returns `Ready` when the underlying connection has closed.
501 ///
502 /// If any new inbound streams are received during a call to `poll_closed`,
503 /// they will be queued and returned on the next call to [`poll_accept`].
504 ///
505 /// This function will advance the internal connection state, driving
506 /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
507 ///
508 /// See [here](index.html#managing-the-connection) for more details.
509 ///
510 /// [`poll_accept`]: struct.Connection.html#method.poll_accept
511 /// [`RecvStream`]: ../struct.RecvStream.html
512 /// [`SendStream`]: ../struct.SendStream.html
poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>>513 pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
514 self.connection.poll(cx).map_err(Into::into)
515 }
516
517 /// Sets the connection to a GOAWAY state.
518 ///
519 /// Does not terminate the connection. Must continue being polled to close
520 /// connection.
521 ///
522 /// After flushing the GOAWAY frame, the connection is closed. Any
523 /// outstanding streams do not prevent the connection from closing. This
524 /// should usually be reserved for shutting down when something bad
525 /// external to `h2` has happened, and open streams cannot be properly
526 /// handled.
527 ///
528 /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
abrupt_shutdown(&mut self, reason: Reason)529 pub fn abrupt_shutdown(&mut self, reason: Reason) {
530 self.connection.go_away_from_user(reason);
531 }
532
533 /// Starts a [graceful shutdown][1] process.
534 ///
535 /// Must continue being polled to close connection.
536 ///
537 /// It's possible to receive more requests after calling this method, since
538 /// they might have been in-flight from the client already. After about
539 /// 1 RTT, no new requests should be accepted. Once all active streams
540 /// have completed, the connection is closed.
541 ///
542 /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
graceful_shutdown(&mut self)543 pub fn graceful_shutdown(&mut self) {
544 self.connection.go_away_gracefully();
545 }
546
547 /// Takes a `PingPong` instance from the connection.
548 ///
549 /// # Note
550 ///
551 /// This may only be called once. Calling multiple times will return `None`.
ping_pong(&mut self) -> Option<PingPong>552 pub fn ping_pong(&mut self) -> Option<PingPong> {
553 self.connection.take_user_pings().map(PingPong::new)
554 }
555
556 /// Returns the maximum number of concurrent streams that may be initiated
557 /// by the server on this connection.
558 ///
559 /// This limit is configured by the client peer by sending the
560 /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
561 /// This method returns the currently acknowledged value received from the
562 /// remote.
563 ///
564 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
max_concurrent_send_streams(&self) -> usize565 pub fn max_concurrent_send_streams(&self) -> usize {
566 self.connection.max_send_streams()
567 }
568
569 /// Returns the maximum number of concurrent streams that may be initiated
570 /// by the client on this connection.
571 ///
572 /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
573 /// parameter][1] sent in a `SETTINGS` frame that has been
574 /// acknowledged by the remote peer. The value to be sent is configured by
575 /// the [`Builder::max_concurrent_streams`][2] method before handshaking
576 /// with the remote peer.
577 ///
578 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
579 /// [2]: ../struct.Builder.html#method.max_concurrent_streams
max_concurrent_recv_streams(&self) -> usize580 pub fn max_concurrent_recv_streams(&self) -> usize {
581 self.connection.max_recv_streams()
582 }
583
584 // Could disappear at anytime.
585 #[doc(hidden)]
586 #[cfg(feature = "unstable")]
num_wired_streams(&self) -> usize587 pub fn num_wired_streams(&self) -> usize {
588 self.connection.num_wired_streams()
589 }
590 }
591
592 #[cfg(feature = "stream")]
593 impl<T, B> futures_core::Stream for Connection<T, B>
594 where
595 T: AsyncRead + AsyncWrite + Unpin,
596 B: Buf,
597 {
598 type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
599
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>600 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
601 self.poll_accept(cx)
602 }
603 }
604
605 impl<T, B> fmt::Debug for Connection<T, B>
606 where
607 T: fmt::Debug,
608 B: fmt::Debug + Buf,
609 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result610 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
611 fmt.debug_struct("Connection")
612 .field("connection", &self.connection)
613 .finish()
614 }
615 }
616
617 // ===== impl Builder =====
618
619 impl Builder {
620 /// Returns a new server builder instance initialized with default
621 /// configuration values.
622 ///
623 /// Configuration methods can be chained on the return value.
624 ///
625 /// # Examples
626 ///
627 /// ```
628 /// # use tokio::io::{AsyncRead, AsyncWrite};
629 /// # use h2::server::*;
630 /// #
631 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
632 /// # -> Handshake<T>
633 /// # {
634 /// // `server_fut` is a future representing the completion of the HTTP/2
635 /// // handshake.
636 /// let server_fut = Builder::new()
637 /// .initial_window_size(1_000_000)
638 /// .max_concurrent_streams(1000)
639 /// .handshake(my_io);
640 /// # server_fut
641 /// # }
642 /// #
643 /// # pub fn main() {}
644 /// ```
new() -> Builder645 pub fn new() -> Builder {
646 Builder {
647 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
648 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
649 pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
650 settings: Settings::default(),
651 initial_target_connection_window_size: None,
652 max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
653
654 local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
655 }
656 }
657
658 /// Indicates the initial window size (in octets) for stream-level
659 /// flow control for received data.
660 ///
661 /// The initial window of a stream is used as part of flow control. For more
662 /// details, see [`FlowControl`].
663 ///
664 /// The default value is 65,535.
665 ///
666 /// [`FlowControl`]: ../struct.FlowControl.html
667 ///
668 /// # Examples
669 ///
670 /// ```
671 /// # use tokio::io::{AsyncRead, AsyncWrite};
672 /// # use h2::server::*;
673 /// #
674 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
675 /// # -> Handshake<T>
676 /// # {
677 /// // `server_fut` is a future representing the completion of the HTTP/2
678 /// // handshake.
679 /// let server_fut = Builder::new()
680 /// .initial_window_size(1_000_000)
681 /// .handshake(my_io);
682 /// # server_fut
683 /// # }
684 /// #
685 /// # pub fn main() {}
686 /// ```
initial_window_size(&mut self, size: u32) -> &mut Self687 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
688 self.settings.set_initial_window_size(Some(size));
689 self
690 }
691
692 /// Indicates the initial window size (in octets) for connection-level flow control
693 /// for received data.
694 ///
695 /// The initial window of a connection is used as part of flow control. For more details,
696 /// see [`FlowControl`].
697 ///
698 /// The default value is 65,535.
699 ///
700 /// [`FlowControl`]: ../struct.FlowControl.html
701 ///
702 /// # Examples
703 ///
704 /// ```
705 /// # use tokio::io::{AsyncRead, AsyncWrite};
706 /// # use h2::server::*;
707 /// #
708 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
709 /// # -> Handshake<T>
710 /// # {
711 /// // `server_fut` is a future representing the completion of the HTTP/2
712 /// // handshake.
713 /// let server_fut = Builder::new()
714 /// .initial_connection_window_size(1_000_000)
715 /// .handshake(my_io);
716 /// # server_fut
717 /// # }
718 /// #
719 /// # pub fn main() {}
720 /// ```
initial_connection_window_size(&mut self, size: u32) -> &mut Self721 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
722 self.initial_target_connection_window_size = Some(size);
723 self
724 }
725
726 /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
727 /// configured server is able to accept.
728 ///
729 /// The sender may send data frames that are **smaller** than this value,
730 /// but any data larger than `max` will be broken up into multiple `DATA`
731 /// frames.
732 ///
733 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
734 ///
735 /// # Examples
736 ///
737 /// ```
738 /// # use tokio::io::{AsyncRead, AsyncWrite};
739 /// # use h2::server::*;
740 /// #
741 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
742 /// # -> Handshake<T>
743 /// # {
744 /// // `server_fut` is a future representing the completion of the HTTP/2
745 /// // handshake.
746 /// let server_fut = Builder::new()
747 /// .max_frame_size(1_000_000)
748 /// .handshake(my_io);
749 /// # server_fut
750 /// # }
751 /// #
752 /// # pub fn main() {}
753 /// ```
754 ///
755 /// # Panics
756 ///
757 /// This function panics if `max` is not within the legal range specified
758 /// above.
max_frame_size(&mut self, max: u32) -> &mut Self759 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
760 self.settings.set_max_frame_size(Some(max));
761 self
762 }
763
764 /// Sets the max size of received header frames.
765 ///
766 /// This advisory setting informs a peer of the maximum size of header list
767 /// that the sender is prepared to accept, in octets. The value is based on
768 /// the uncompressed size of header fields, including the length of the name
769 /// and value in octets plus an overhead of 32 octets for each header field.
770 ///
771 /// This setting is also used to limit the maximum amount of data that is
772 /// buffered to decode HEADERS frames.
773 ///
774 /// # Examples
775 ///
776 /// ```
777 /// # use tokio::io::{AsyncRead, AsyncWrite};
778 /// # use h2::server::*;
779 /// #
780 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
781 /// # -> Handshake<T>
782 /// # {
783 /// // `server_fut` is a future representing the completion of the HTTP/2
784 /// // handshake.
785 /// let server_fut = Builder::new()
786 /// .max_header_list_size(16 * 1024)
787 /// .handshake(my_io);
788 /// # server_fut
789 /// # }
790 /// #
791 /// # pub fn main() {}
792 /// ```
max_header_list_size(&mut self, max: u32) -> &mut Self793 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
794 self.settings.set_max_header_list_size(Some(max));
795 self
796 }
797
798 /// Sets the maximum number of concurrent streams.
799 ///
800 /// The maximum concurrent streams setting only controls the maximum number
801 /// of streams that can be initiated by the remote peer. In other words,
802 /// when this setting is set to 100, this does not limit the number of
803 /// concurrent streams that can be created by the caller.
804 ///
805 /// It is recommended that this value be no smaller than 100, so as to not
806 /// unnecessarily limit parallelism. However, any value is legal, including
807 /// 0. If `max` is set to 0, then the remote will not be permitted to
808 /// initiate streams.
809 ///
810 /// Note that streams in the reserved state, i.e., push promises that have
811 /// been reserved but the stream has not started, do not count against this
812 /// setting.
813 ///
814 /// Also note that if the remote *does* exceed the value set here, it is not
815 /// a protocol level error. Instead, the `h2` library will immediately reset
816 /// the stream.
817 ///
818 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
819 ///
820 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
821 ///
822 /// # Examples
823 ///
824 /// ```
825 /// # use tokio::io::{AsyncRead, AsyncWrite};
826 /// # use h2::server::*;
827 /// #
828 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
829 /// # -> Handshake<T>
830 /// # {
831 /// // `server_fut` is a future representing the completion of the HTTP/2
832 /// // handshake.
833 /// let server_fut = Builder::new()
834 /// .max_concurrent_streams(1000)
835 /// .handshake(my_io);
836 /// # server_fut
837 /// # }
838 /// #
839 /// # pub fn main() {}
840 /// ```
max_concurrent_streams(&mut self, max: u32) -> &mut Self841 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
842 self.settings.set_max_concurrent_streams(Some(max));
843 self
844 }
845
846 /// Sets the maximum number of concurrent locally reset streams.
847 ///
848 /// When a stream is explicitly reset by either calling
849 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
850 /// before completing the stream, the HTTP/2 specification requires that
851 /// any further frames received for that stream must be ignored for "some
852 /// time".
853 ///
854 /// In order to satisfy the specification, internal state must be maintained
855 /// to implement the behavior. This state grows linearly with the number of
856 /// streams that are locally reset.
857 ///
858 /// The `max_concurrent_reset_streams` setting configures sets an upper
859 /// bound on the amount of state that is maintained. When this max value is
860 /// reached, the oldest reset stream is purged from memory.
861 ///
862 /// Once the stream has been fully purged from memory, any additional frames
863 /// received for that stream will result in a connection level protocol
864 /// error, forcing the connection to terminate.
865 ///
866 /// The default value is 10.
867 ///
868 /// # Examples
869 ///
870 /// ```
871 /// # use tokio::io::{AsyncRead, AsyncWrite};
872 /// # use h2::server::*;
873 /// #
874 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
875 /// # -> Handshake<T>
876 /// # {
877 /// // `server_fut` is a future representing the completion of the HTTP/2
878 /// // handshake.
879 /// let server_fut = Builder::new()
880 /// .max_concurrent_reset_streams(1000)
881 /// .handshake(my_io);
882 /// # server_fut
883 /// # }
884 /// #
885 /// # pub fn main() {}
886 /// ```
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self887 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
888 self.reset_stream_max = max;
889 self
890 }
891
892 /// Sets the maximum number of local resets due to protocol errors made by the remote end.
893 ///
894 /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
895 /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
896 /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
897 ///
898 /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of
899 /// `ENHANCE_YOUR_CALM` to the client.
900 ///
901 /// If you really want to disable this, supply [`Option::None`] here.
902 /// Disabling this is not recommended and may expose you to DOS attacks.
903 ///
904 /// The default value is currently 1024, but could change.
max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self905 pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
906 self.local_max_error_reset_streams = max;
907 self
908 }
909
910 /// Sets the maximum number of pending-accept remotely-reset streams.
911 ///
912 /// Streams that have been received by the peer, but not accepted by the
913 /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
914 /// could send a request and then shortly after, realize it is not needed,
915 /// sending a CANCEL.
916 ///
917 /// However, since those streams are now "closed", they don't count towards
918 /// the max concurrent streams. So, they will sit in the accept queue,
919 /// using memory.
920 ///
921 /// When the number of remotely-reset streams sitting in the pending-accept
922 /// queue reaches this maximum value, a connection error with the code of
923 /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
924 /// `Future`.
925 ///
926 /// The default value is currently 20, but could change.
927 ///
928 /// # Examples
929 ///
930 ///
931 /// ```
932 /// # use tokio::io::{AsyncRead, AsyncWrite};
933 /// # use h2::server::*;
934 /// #
935 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
936 /// # -> Handshake<T>
937 /// # {
938 /// // `server_fut` is a future representing the completion of the HTTP/2
939 /// // handshake.
940 /// let server_fut = Builder::new()
941 /// .max_pending_accept_reset_streams(100)
942 /// .handshake(my_io);
943 /// # server_fut
944 /// # }
945 /// #
946 /// # pub fn main() {}
947 /// ```
max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self948 pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
949 self.pending_accept_reset_stream_max = max;
950 self
951 }
952
953 /// Sets the maximum send buffer size per stream.
954 ///
955 /// Once a stream has buffered up to (or over) the maximum, the stream's
956 /// flow control will not "poll" additional capacity. Once bytes for the
957 /// stream have been written to the connection, the send buffer capacity
958 /// will be freed up again.
959 ///
960 /// The default is currently ~400KB, but may change.
961 ///
962 /// # Panics
963 ///
964 /// This function panics if `max` is larger than `u32::MAX`.
max_send_buffer_size(&mut self, max: usize) -> &mut Self965 pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
966 assert!(max <= std::u32::MAX as usize);
967 self.max_send_buffer_size = max;
968 self
969 }
970
971 /// Sets the maximum number of concurrent locally reset streams.
972 ///
973 /// When a stream is explicitly reset by either calling
974 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
975 /// before completing the stream, the HTTP/2 specification requires that
976 /// any further frames received for that stream must be ignored for "some
977 /// time".
978 ///
979 /// In order to satisfy the specification, internal state must be maintained
980 /// to implement the behavior. This state grows linearly with the number of
981 /// streams that are locally reset.
982 ///
983 /// The `reset_stream_duration` setting configures the max amount of time
984 /// this state will be maintained in memory. Once the duration elapses, the
985 /// stream state is purged from memory.
986 ///
987 /// Once the stream has been fully purged from memory, any additional frames
988 /// received for that stream will result in a connection level protocol
989 /// error, forcing the connection to terminate.
990 ///
991 /// The default value is 30 seconds.
992 ///
993 /// # Examples
994 ///
995 /// ```
996 /// # use tokio::io::{AsyncRead, AsyncWrite};
997 /// # use h2::server::*;
998 /// # use std::time::Duration;
999 /// #
1000 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1001 /// # -> Handshake<T>
1002 /// # {
1003 /// // `server_fut` is a future representing the completion of the HTTP/2
1004 /// // handshake.
1005 /// let server_fut = Builder::new()
1006 /// .reset_stream_duration(Duration::from_secs(10))
1007 /// .handshake(my_io);
1008 /// # server_fut
1009 /// # }
1010 /// #
1011 /// # pub fn main() {}
1012 /// ```
reset_stream_duration(&mut self, dur: Duration) -> &mut Self1013 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1014 self.reset_stream_duration = dur;
1015 self
1016 }
1017
1018 /// Enables the [extended CONNECT protocol].
1019 ///
1020 /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
enable_connect_protocol(&mut self) -> &mut Self1021 pub fn enable_connect_protocol(&mut self) -> &mut Self {
1022 self.settings.set_enable_connect_protocol(Some(1));
1023 self
1024 }
1025
1026 /// Creates a new configured HTTP/2 server backed by `io`.
1027 ///
1028 /// It is expected that `io` already be in an appropriate state to commence
1029 /// the [HTTP/2 handshake]. See [Handshake] for more details.
1030 ///
1031 /// Returns a future which resolves to the [`Connection`] instance once the
1032 /// HTTP/2 handshake has been completed.
1033 ///
1034 /// This function also allows the caller to configure the send payload data
1035 /// type. See [Outbound data type] for more details.
1036 ///
1037 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1038 /// [Handshake]: ../index.html#handshake
1039 /// [`Connection`]: struct.Connection.html
1040 /// [Outbound data type]: ../index.html#outbound-data-type.
1041 ///
1042 /// # Examples
1043 ///
1044 /// Basic usage:
1045 ///
1046 /// ```
1047 /// # use tokio::io::{AsyncRead, AsyncWrite};
1048 /// # use h2::server::*;
1049 /// #
1050 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1051 /// # -> Handshake<T>
1052 /// # {
1053 /// // `server_fut` is a future representing the completion of the HTTP/2
1054 /// // handshake.
1055 /// let server_fut = Builder::new()
1056 /// .handshake(my_io);
1057 /// # server_fut
1058 /// # }
1059 /// #
1060 /// # pub fn main() {}
1061 /// ```
1062 ///
1063 /// Configures the send-payload data type. In this case, the outbound data
1064 /// type will be `&'static [u8]`.
1065 ///
1066 /// ```
1067 /// # use tokio::io::{AsyncRead, AsyncWrite};
1068 /// # use h2::server::*;
1069 /// #
1070 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1071 /// # -> Handshake<T, &'static [u8]>
1072 /// # {
1073 /// // `server_fut` is a future representing the completion of the HTTP/2
1074 /// // handshake.
1075 /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1076 /// .handshake(my_io);
1077 /// # server_fut
1078 /// # }
1079 /// #
1080 /// # pub fn main() {}
1081 /// ```
handshake<T, B>(&self, io: T) -> Handshake<T, B> where T: AsyncRead + AsyncWrite + Unpin, B: Buf,1082 pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1083 where
1084 T: AsyncRead + AsyncWrite + Unpin,
1085 B: Buf,
1086 {
1087 Connection::handshake2(io, self.clone())
1088 }
1089 }
1090
1091 impl Default for Builder {
default() -> Builder1092 fn default() -> Builder {
1093 Builder::new()
1094 }
1095 }
1096
1097 // ===== impl SendResponse =====
1098
1099 impl<B: Buf> SendResponse<B> {
1100 /// Send a response to a client request.
1101 ///
1102 /// On success, a [`SendStream`] instance is returned. This instance can be
1103 /// used to stream the response body and send trailers.
1104 ///
1105 /// If a body or trailers will be sent on the returned [`SendStream`]
1106 /// instance, then `end_of_stream` must be set to `false` when calling this
1107 /// function.
1108 ///
1109 /// The [`SendResponse`] instance is already associated with a received
1110 /// request. This function may only be called once per instance and only if
1111 /// [`send_reset`] has not been previously called.
1112 ///
1113 /// [`SendResponse`]: #
1114 /// [`SendStream`]: ../struct.SendStream.html
1115 /// [`send_reset`]: #method.send_reset
send_response( &mut self, response: Response<()>, end_of_stream: bool, ) -> Result<SendStream<B>, crate::Error>1116 pub fn send_response(
1117 &mut self,
1118 response: Response<()>,
1119 end_of_stream: bool,
1120 ) -> Result<SendStream<B>, crate::Error> {
1121 self.inner
1122 .send_response(response, end_of_stream)
1123 .map(|_| SendStream::new(self.inner.clone()))
1124 .map_err(Into::into)
1125 }
1126
1127 /// Push a request and response to the client
1128 ///
1129 /// On success, a [`SendResponse`] instance is returned.
1130 ///
1131 /// [`SendResponse`]: #
push_request( &mut self, request: Request<()>, ) -> Result<SendPushedResponse<B>, crate::Error>1132 pub fn push_request(
1133 &mut self,
1134 request: Request<()>,
1135 ) -> Result<SendPushedResponse<B>, crate::Error> {
1136 self.inner
1137 .send_push_promise(request)
1138 .map(|inner| SendPushedResponse {
1139 inner: SendResponse { inner },
1140 })
1141 .map_err(Into::into)
1142 }
1143
1144 /// Send a stream reset to the peer.
1145 ///
1146 /// This essentially cancels the stream, including any inbound or outbound
1147 /// data streams.
1148 ///
1149 /// If this function is called before [`send_response`], a call to
1150 /// [`send_response`] will result in an error.
1151 ///
1152 /// If this function is called while a [`SendStream`] instance is active,
1153 /// any further use of the instance will result in an error.
1154 ///
1155 /// This function should only be called once.
1156 ///
1157 /// [`send_response`]: #method.send_response
1158 /// [`SendStream`]: ../struct.SendStream.html
send_reset(&mut self, reason: Reason)1159 pub fn send_reset(&mut self, reason: Reason) {
1160 self.inner.send_reset(reason)
1161 }
1162
1163 /// Polls to be notified when the client resets this stream.
1164 ///
1165 /// If stream is still open, this returns `Poll::Pending`, and
1166 /// registers the task to be notified if a `RST_STREAM` is received.
1167 ///
1168 /// If a `RST_STREAM` frame is received for this stream, calling this
1169 /// method will yield the `Reason` for the reset.
1170 ///
1171 /// # Error
1172 ///
1173 /// Calling this method after having called `send_response` will return
1174 /// a user error.
poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>>1175 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1176 self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1177 }
1178
1179 /// Returns the stream ID of the response stream.
1180 ///
1181 /// # Panics
1182 ///
1183 /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1184 pub fn stream_id(&self) -> crate::StreamId {
1185 crate::StreamId::from_internal(self.inner.stream_id())
1186 }
1187 }
1188
1189 // ===== impl SendPushedResponse =====
1190
1191 impl<B: Buf> SendPushedResponse<B> {
1192 /// Send a response to a promised request.
1193 ///
1194 /// On success, a [`SendStream`] instance is returned. This instance can be
1195 /// used to stream the response body and send trailers.
1196 ///
1197 /// If a body or trailers will be sent on the returned [`SendStream`]
1198 /// instance, then `end_of_stream` must be set to `false` when calling this
1199 /// function.
1200 ///
1201 /// The [`SendPushedResponse`] instance is associated with a promised
1202 /// request. This function may only be called once per instance and only if
1203 /// [`send_reset`] has not been previously called.
1204 ///
1205 /// [`SendPushedResponse`]: #
1206 /// [`SendStream`]: ../struct.SendStream.html
1207 /// [`send_reset`]: #method.send_reset
send_response( &mut self, response: Response<()>, end_of_stream: bool, ) -> Result<SendStream<B>, crate::Error>1208 pub fn send_response(
1209 &mut self,
1210 response: Response<()>,
1211 end_of_stream: bool,
1212 ) -> Result<SendStream<B>, crate::Error> {
1213 self.inner.send_response(response, end_of_stream)
1214 }
1215
1216 /// Send a stream reset to the peer.
1217 ///
1218 /// This essentially cancels the stream, including any inbound or outbound
1219 /// data streams.
1220 ///
1221 /// If this function is called before [`send_response`], a call to
1222 /// [`send_response`] will result in an error.
1223 ///
1224 /// If this function is called while a [`SendStream`] instance is active,
1225 /// any further use of the instance will result in an error.
1226 ///
1227 /// This function should only be called once.
1228 ///
1229 /// [`send_response`]: #method.send_response
1230 /// [`SendStream`]: ../struct.SendStream.html
send_reset(&mut self, reason: Reason)1231 pub fn send_reset(&mut self, reason: Reason) {
1232 self.inner.send_reset(reason)
1233 }
1234
1235 /// Polls to be notified when the client resets this stream.
1236 ///
1237 /// If stream is still open, this returns `Poll::Pending`, and
1238 /// registers the task to be notified if a `RST_STREAM` is received.
1239 ///
1240 /// If a `RST_STREAM` frame is received for this stream, calling this
1241 /// method will yield the `Reason` for the reset.
1242 ///
1243 /// # Error
1244 ///
1245 /// Calling this method after having called `send_response` will return
1246 /// a user error.
poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>>1247 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1248 self.inner.poll_reset(cx)
1249 }
1250
1251 /// Returns the stream ID of the response stream.
1252 ///
1253 /// # Panics
1254 ///
1255 /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1256 pub fn stream_id(&self) -> crate::StreamId {
1257 self.inner.stream_id()
1258 }
1259 }
1260
1261 // ===== impl Flush =====
1262
1263 impl<T, B: Buf> Flush<T, B> {
new(codec: Codec<T, B>) -> Self1264 fn new(codec: Codec<T, B>) -> Self {
1265 Flush { codec: Some(codec) }
1266 }
1267 }
1268
1269 impl<T, B> Future for Flush<T, B>
1270 where
1271 T: AsyncWrite + Unpin,
1272 B: Buf,
1273 {
1274 type Output = Result<Codec<T, B>, crate::Error>;
1275
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1276 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1277 // Flush the codec
1278 ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1279
1280 // Return the codec
1281 Poll::Ready(Ok(self.codec.take().unwrap()))
1282 }
1283 }
1284
1285 impl<T, B: Buf> ReadPreface<T, B> {
new(codec: Codec<T, B>) -> Self1286 fn new(codec: Codec<T, B>) -> Self {
1287 ReadPreface {
1288 codec: Some(codec),
1289 pos: 0,
1290 }
1291 }
1292
inner_mut(&mut self) -> &mut T1293 fn inner_mut(&mut self) -> &mut T {
1294 self.codec.as_mut().unwrap().get_mut()
1295 }
1296 }
1297
1298 impl<T, B> Future for ReadPreface<T, B>
1299 where
1300 T: AsyncRead + Unpin,
1301 B: Buf,
1302 {
1303 type Output = Result<Codec<T, B>, crate::Error>;
1304
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1305 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1306 let mut buf = [0; 24];
1307 let mut rem = PREFACE.len() - self.pos;
1308
1309 while rem > 0 {
1310 let mut buf = ReadBuf::new(&mut buf[..rem]);
1311 ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1312 .map_err(crate::Error::from_io)?;
1313 let n = buf.filled().len();
1314 if n == 0 {
1315 return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1316 io::ErrorKind::UnexpectedEof,
1317 "connection closed before reading preface",
1318 ))));
1319 }
1320
1321 if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1322 proto_err!(conn: "read_preface: invalid preface");
1323 // TODO: Should this just write the GO_AWAY frame directly?
1324 return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1325 }
1326
1327 self.pos += n;
1328 rem -= n; // TODO test
1329 }
1330
1331 Poll::Ready(Ok(self.codec.take().unwrap()))
1332 }
1333 }
1334
1335 // ===== impl Handshake =====
1336
1337 impl<T, B: Buf> Future for Handshake<T, B>
1338 where
1339 T: AsyncRead + AsyncWrite + Unpin,
1340 B: Buf,
1341 {
1342 type Output = Result<Connection<T, B>, crate::Error>;
1343
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1344 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1345 let span = self.span.clone(); // XXX(eliza): T_T
1346 let _e = span.enter();
1347 tracing::trace!(state = ?self.state);
1348
1349 loop {
1350 match &mut self.state {
1351 Handshaking::Flushing(flush) => {
1352 // We're currently flushing a pending SETTINGS frame. Poll the
1353 // flush future, and, if it's completed, advance our state to wait
1354 // for the client preface.
1355 let codec = match Pin::new(flush).poll(cx)? {
1356 Poll::Pending => {
1357 tracing::trace!(flush.poll = %"Pending");
1358 return Poll::Pending;
1359 }
1360 Poll::Ready(flushed) => {
1361 tracing::trace!(flush.poll = %"Ready");
1362 flushed
1363 }
1364 };
1365 self.state = Handshaking::ReadingPreface(
1366 ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1367 );
1368 }
1369 Handshaking::ReadingPreface(read) => {
1370 let codec = ready!(Pin::new(read).poll(cx)?);
1371
1372 self.state = Handshaking::Done;
1373
1374 let connection = proto::Connection::new(
1375 codec,
1376 Config {
1377 next_stream_id: 2.into(),
1378 // Server does not need to locally initiate any streams
1379 initial_max_send_streams: 0,
1380 max_send_buffer_size: self.builder.max_send_buffer_size,
1381 reset_stream_duration: self.builder.reset_stream_duration,
1382 reset_stream_max: self.builder.reset_stream_max,
1383 remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1384 local_error_reset_streams_max: self
1385 .builder
1386 .local_max_error_reset_streams,
1387 settings: self.builder.settings.clone(),
1388 },
1389 );
1390
1391 tracing::trace!("connection established!");
1392 let mut c = Connection { connection };
1393 if let Some(sz) = self.builder.initial_target_connection_window_size {
1394 c.set_target_window_size(sz);
1395 }
1396
1397 return Poll::Ready(Ok(c));
1398 }
1399 Handshaking::Done => {
1400 panic!("Handshaking::poll() called again after handshaking was complete")
1401 }
1402 }
1403 }
1404 }
1405 }
1406
1407 impl<T, B> fmt::Debug for Handshake<T, B>
1408 where
1409 T: AsyncRead + AsyncWrite + fmt::Debug,
1410 B: fmt::Debug + Buf,
1411 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1412 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1413 write!(fmt, "server::Handshake")
1414 }
1415 }
1416
1417 impl Peer {
convert_send_message( id: StreamId, response: Response<()>, end_of_stream: bool, ) -> frame::Headers1418 pub fn convert_send_message(
1419 id: StreamId,
1420 response: Response<()>,
1421 end_of_stream: bool,
1422 ) -> frame::Headers {
1423 use http::response::Parts;
1424
1425 // Extract the components of the HTTP request
1426 let (
1427 Parts {
1428 status, headers, ..
1429 },
1430 _,
1431 ) = response.into_parts();
1432
1433 // Build the set pseudo header set. All requests will include `method`
1434 // and `path`.
1435 let pseudo = Pseudo::response(status);
1436
1437 // Create the HEADERS frame
1438 let mut frame = frame::Headers::new(id, pseudo, headers);
1439
1440 if end_of_stream {
1441 frame.set_end_stream()
1442 }
1443
1444 frame
1445 }
1446
convert_push_message( stream_id: StreamId, promised_id: StreamId, request: Request<()>, ) -> Result<frame::PushPromise, UserError>1447 pub fn convert_push_message(
1448 stream_id: StreamId,
1449 promised_id: StreamId,
1450 request: Request<()>,
1451 ) -> Result<frame::PushPromise, UserError> {
1452 use http::request::Parts;
1453
1454 if let Err(e) = frame::PushPromise::validate_request(&request) {
1455 use PushPromiseHeaderError::*;
1456 match e {
1457 NotSafeAndCacheable => tracing::debug!(
1458 ?promised_id,
1459 "convert_push_message: method {} is not safe and cacheable",
1460 request.method(),
1461 ),
1462 InvalidContentLength(e) => tracing::debug!(
1463 ?promised_id,
1464 "convert_push_message; promised request has invalid content-length {:?}",
1465 e,
1466 ),
1467 }
1468 return Err(UserError::MalformedHeaders);
1469 }
1470
1471 // Extract the components of the HTTP request
1472 let (
1473 Parts {
1474 method,
1475 uri,
1476 headers,
1477 ..
1478 },
1479 _,
1480 ) = request.into_parts();
1481
1482 let pseudo = Pseudo::request(method, uri, None);
1483
1484 Ok(frame::PushPromise::new(
1485 stream_id,
1486 promised_id,
1487 pseudo,
1488 headers,
1489 ))
1490 }
1491 }
1492
1493 impl proto::Peer for Peer {
1494 type Poll = Request<()>;
1495
1496 const NAME: &'static str = "Server";
1497
1498 /*
1499 fn is_server() -> bool {
1500 true
1501 }
1502 */
1503
1504 fn r#dyn() -> proto::DynPeer {
1505 proto::DynPeer::Server
1506 }
1507
convert_poll_message( pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, ) -> Result<Self::Poll, Error>1508 fn convert_poll_message(
1509 pseudo: Pseudo,
1510 fields: HeaderMap,
1511 stream_id: StreamId,
1512 ) -> Result<Self::Poll, Error> {
1513 use http::{uri, Version};
1514
1515 let mut b = Request::builder();
1516
1517 macro_rules! malformed {
1518 ($($arg:tt)*) => {{
1519 tracing::debug!($($arg)*);
1520 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1521 }}
1522 }
1523
1524 b = b.version(Version::HTTP_2);
1525
1526 let is_connect;
1527 if let Some(method) = pseudo.method {
1528 is_connect = method == Method::CONNECT;
1529 b = b.method(method);
1530 } else {
1531 malformed!("malformed headers: missing method");
1532 }
1533
1534 let has_protocol = pseudo.protocol.is_some();
1535 if has_protocol {
1536 if is_connect {
1537 // Assert that we have the right type.
1538 b = b.extension::<crate::ext::Protocol>(pseudo.protocol.unwrap());
1539 } else {
1540 malformed!("malformed headers: :protocol on non-CONNECT request");
1541 }
1542 }
1543
1544 if pseudo.status.is_some() {
1545 malformed!("malformed headers: :status field on request");
1546 }
1547
1548 // Convert the URI
1549 let mut parts = uri::Parts::default();
1550
1551 // A request translated from HTTP/1 must not include the :authority
1552 // header
1553 if let Some(authority) = pseudo.authority {
1554 let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1555 parts.authority = Some(maybe_authority.or_else(|why| {
1556 malformed!(
1557 "malformed headers: malformed authority ({:?}): {}",
1558 authority,
1559 why,
1560 )
1561 })?);
1562 }
1563
1564 // A :scheme is required, except CONNECT.
1565 if let Some(scheme) = pseudo.scheme {
1566 if is_connect && !has_protocol {
1567 malformed!("malformed headers: :scheme in CONNECT");
1568 }
1569 let maybe_scheme = scheme.parse();
1570 let scheme = maybe_scheme.or_else(|why| {
1571 malformed!(
1572 "malformed headers: malformed scheme ({:?}): {}",
1573 scheme,
1574 why,
1575 )
1576 })?;
1577
1578 // It's not possible to build an `Uri` from a scheme and path. So,
1579 // after validating is was a valid scheme, we just have to drop it
1580 // if there isn't an :authority.
1581 if parts.authority.is_some() {
1582 parts.scheme = Some(scheme);
1583 }
1584 } else if !is_connect || has_protocol {
1585 malformed!("malformed headers: missing scheme");
1586 }
1587
1588 if let Some(path) = pseudo.path {
1589 if is_connect && !has_protocol {
1590 malformed!("malformed headers: :path in CONNECT");
1591 }
1592
1593 // This cannot be empty
1594 if path.is_empty() {
1595 malformed!("malformed headers: missing path");
1596 }
1597
1598 let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1599 parts.path_and_query = Some(maybe_path.or_else(|why| {
1600 malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1601 })?);
1602 } else if is_connect && has_protocol {
1603 malformed!("malformed headers: missing path in extended CONNECT");
1604 }
1605
1606 b = b.uri(parts);
1607
1608 let mut request = match b.body(()) {
1609 Ok(request) => request,
1610 Err(e) => {
1611 // TODO: Should there be more specialized handling for different
1612 // kinds of errors
1613 proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1614 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1615 }
1616 };
1617
1618 *request.headers_mut() = fields;
1619
1620 Ok(request)
1621 }
1622 }
1623
1624 // ===== impl Handshaking =====
1625
1626 impl<T, B> fmt::Debug for Handshaking<T, B>
1627 where
1628 B: Buf,
1629 {
1630 #[inline]
fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error>1631 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1632 match *self {
1633 Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1634 Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1635 Handshaking::Done => f.write_str("Done"),
1636 }
1637 }
1638 }
1639