1 //! Lower-level client connection API.
2 //!
3 //! The types in this module are to provide a lower-level API based around a
4 //! single connection. Connecting to a host, pooling connections, and the like
5 //! are not handled at this level. This module provides the building blocks to
6 //! customize those things externally.
7 //!
8 //! If don't have need to manage connections yourself, consider using the
9 //! higher-level [Client](super) API.
10 //!
11 //! ## Example
12 //! A simple example that uses the `SendRequest` struct to talk HTTP over a Tokio TCP stream
13 //! ```no_run
14 //! # #[cfg(all(feature = "client", feature = "http1", feature = "runtime"))]
15 //! # mod rt {
16 //! use tower::ServiceExt;
17 //! use http::{Request, StatusCode};
18 //! use hyper::{client::conn, Body};
19 //! use tokio::net::TcpStream;
20 //!
21 //! #[tokio::main]
22 //! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23 //!     let target_stream = TcpStream::connect("example.com:80").await?;
24 //!
25 //!     let (mut request_sender, connection) = conn::handshake(target_stream).await?;
26 //!
27 //!     // spawn a task to poll the connection and drive the HTTP state
28 //!     tokio::spawn(async move {
29 //!         if let Err(e) = connection.await {
30 //!             eprintln!("Error in connection: {}", e);
31 //!         }
32 //!     });
33 //!
34 //!     let request = Request::builder()
35 //!         // We need to manually add the host header because SendRequest does not
36 //!         .header("Host", "example.com")
37 //!         .method("GET")
38 //!         .body(Body::from(""))?;
39 //!     let response = request_sender.send_request(request).await?;
40 //!     assert!(response.status() == StatusCode::OK);
41 //!
42 //!     // To send via the same connection again, it may not work as it may not be ready,
43 //!     // so we have to wait until the request_sender becomes ready.
44 //!     request_sender.ready().await?;
45 //!     let request = Request::builder()
46 //!         .header("Host", "example.com")
47 //!         .method("GET")
48 //!         .body(Body::from(""))?;
49 //!     let response = request_sender.send_request(request).await?;
50 //!     assert!(response.status() == StatusCode::OK);
51 //!     Ok(())
52 //! }
53 //!
54 //! # }
55 //! ```
56 
57 #[cfg(all(feature = "backports", feature = "http1"))]
58 pub mod http1;
59 #[cfg(all(feature = "backports", feature = "http2"))]
60 pub mod http2;
61 
62 #[cfg(not(all(feature = "http1", feature = "http2")))]
63 use std::convert::Infallible;
64 use std::error::Error as StdError;
65 use std::fmt;
66 use std::future::Future;
67 #[cfg(not(all(feature = "http1", feature = "http2")))]
68 use std::marker::PhantomData;
69 use std::marker::Unpin;
70 use std::pin::Pin;
71 use std::sync::Arc;
72 use std::task::{Context, Poll};
73 #[cfg(all(feature = "runtime", feature = "http2"))]
74 use std::time::Duration;
75 
76 use bytes::Bytes;
77 use futures_util::future::{self, Either, FutureExt as _};
78 use httparse::ParserConfig;
79 use pin_project_lite::pin_project;
80 use tokio::io::{AsyncRead, AsyncWrite};
81 use tower_service::Service;
82 use tracing::{debug, trace};
83 
84 use super::dispatch;
85 use crate::body::HttpBody;
86 use crate::common::exec::{BoxSendFuture, Exec};
87 use crate::proto;
88 use crate::rt::Executor;
89 #[cfg(feature = "http1")]
90 use crate::upgrade::Upgraded;
91 use crate::{Body, Request, Response};
92 
93 #[cfg(feature = "http1")]
94 type Http1Dispatcher<T, B> =
95     proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
96 
97 #[cfg(not(feature = "http1"))]
98 type Http1Dispatcher<T, B> = (Infallible, PhantomData<(T, Pin<Box<B>>)>);
99 
100 #[cfg(feature = "http2")]
101 type Http2ClientTask<B> = proto::h2::ClientTask<B>;
102 
103 #[cfg(not(feature = "http2"))]
104 type Http2ClientTask<B> = (Infallible, PhantomData<Pin<Box<B>>>);
105 
106 pin_project! {
107     #[project = ProtoClientProj]
108     enum ProtoClient<T, B>
109     where
110         B: HttpBody,
111     {
112         H1 {
113             #[pin]
114             h1: Http1Dispatcher<T, B>,
115         },
116         H2 {
117             #[pin]
118             h2: Http2ClientTask<B>,
119         },
120     }
121 }
122 
123 /// Returns a handshake future over some IO.
124 ///
125 /// This is a shortcut for `Builder::new().handshake(io)`.
126 /// See [`client::conn`](crate::client::conn) for more.
127 #[cfg_attr(
128     feature = "deprecated",
129     deprecated(
130         note = "This function will be replaced with `client::conn::http1::handshake` and `client::conn::http2::handshake` in 1.0, enable the \"backports\" feature to use them now."
131     )
132 )]
133 #[cfg_attr(feature = "deprecated", allow(deprecated))]
handshake<T>( io: T, ) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static,134 pub async fn handshake<T>(
135     io: T,
136 ) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
137 where
138     T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
139 {
140     #[allow(deprecated)]
141     Builder::new().handshake(io).await
142 }
143 
144 /// The sender side of an established connection.
145 #[cfg_attr(
146     feature = "deprecated",
147     deprecated(
148         note = "This type will be replaced with `client::conn::http1::SendRequest` and `client::conn::http2::SendRequest` in 1.0, enable the \"backports\" feature to use them now."
149     )
150 )]
151 pub struct SendRequest<B> {
152     dispatch: dispatch::Sender<Request<B>, Response<Body>>,
153 }
154 
155 /// A future that processes all HTTP state for the IO object.
156 ///
157 /// In most cases, this should just be spawned into an executor, so that it
158 /// can process incoming and outgoing messages, notice hangups, and the like.
159 #[must_use = "futures do nothing unless polled"]
160 #[cfg_attr(
161     feature = "deprecated",
162     deprecated(
163         note = "This type will be replaced with `client::conn::http1::Connection` and `client::conn::http2::Connection` in 1.0, enable the \"backports\" feature to use them now."
164     )
165 )]
166 pub struct Connection<T, B>
167 where
168     T: AsyncRead + AsyncWrite + Send + 'static,
169     B: HttpBody + 'static,
170 {
171     inner: Option<ProtoClient<T, B>>,
172 }
173 
174 /// A builder to configure an HTTP connection.
175 ///
176 /// After setting options, the builder is used to create a handshake future.
177 #[derive(Clone, Debug)]
178 #[cfg_attr(
179     feature = "deprecated",
180     deprecated(
181         note = "This type will be replaced with `client::conn::http1::Builder` and `client::conn::http2::Builder` in 1.0, enable the \"backports\" feature to use them now."
182     )
183 )]
184 pub struct Builder {
185     pub(super) exec: Exec,
186     h09_responses: bool,
187     h1_parser_config: ParserConfig,
188     h1_writev: Option<bool>,
189     h1_title_case_headers: bool,
190     h1_preserve_header_case: bool,
191     #[cfg(feature = "ffi")]
192     h1_preserve_header_order: bool,
193     h1_read_buf_exact_size: Option<usize>,
194     h1_max_buf_size: Option<usize>,
195     #[cfg(feature = "ffi")]
196     h1_headers_raw: bool,
197     #[cfg(feature = "http2")]
198     h2_builder: proto::h2::client::Config,
199     version: Proto,
200 }
201 
202 #[derive(Clone, Debug)]
203 enum Proto {
204     #[cfg(feature = "http1")]
205     Http1,
206     #[cfg(feature = "http2")]
207     Http2,
208 }
209 
210 /// A future returned by `SendRequest::send_request`.
211 ///
212 /// Yields a `Response` if successful.
213 #[must_use = "futures do nothing unless polled"]
214 pub struct ResponseFuture {
215     inner: ResponseFutureState,
216 }
217 
218 enum ResponseFutureState {
219     Waiting(dispatch::Promise<Response<Body>>),
220     // Option is to be able to `take()` it in `poll`
221     Error(Option<crate::Error>),
222 }
223 
224 /// Deconstructed parts of a `Connection`.
225 ///
226 /// This allows taking apart a `Connection` at a later time, in order to
227 /// reclaim the IO object, and additional related pieces.
228 #[derive(Debug)]
229 pub struct Parts<T> {
230     /// The original IO object used in the handshake.
231     pub io: T,
232     /// A buffer of bytes that have been read but not processed as HTTP.
233     ///
234     /// For instance, if the `Connection` is used for an HTTP upgrade request,
235     /// it is possible the server sent back the first bytes of the new protocol
236     /// along with the response upgrade.
237     ///
238     /// You will want to check for any existing bytes if you plan to continue
239     /// communicating on the IO object.
240     pub read_buf: Bytes,
241     _inner: (),
242 }
243 
244 // ========== internal client api
245 
246 // A `SendRequest` that can be cloned to send HTTP2 requests.
247 // private for now, probably not a great idea of a type...
248 #[must_use = "futures do nothing unless polled"]
249 #[cfg(feature = "http2")]
250 pub(super) struct Http2SendRequest<B> {
251     dispatch: dispatch::UnboundedSender<Request<B>, Response<Body>>,
252 }
253 
254 // ===== impl SendRequest
255 
256 #[cfg_attr(feature = "deprecated", allow(deprecated))]
257 impl<B> SendRequest<B> {
258     /// Polls to determine whether this sender can be used yet for a request.
259     ///
260     /// If the associated connection is closed, this returns an Error.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>261     pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
262         self.dispatch.poll_ready(cx)
263     }
264 
when_ready(self) -> crate::Result<Self>265     pub(super) async fn when_ready(self) -> crate::Result<Self> {
266         let mut me = Some(self);
267         future::poll_fn(move |cx| {
268             ready!(me.as_mut().unwrap().poll_ready(cx))?;
269             Poll::Ready(Ok(me.take().unwrap()))
270         })
271         .await
272     }
273 
is_ready(&self) -> bool274     pub(super) fn is_ready(&self) -> bool {
275         self.dispatch.is_ready()
276     }
277 
is_closed(&self) -> bool278     pub(super) fn is_closed(&self) -> bool {
279         self.dispatch.is_closed()
280     }
281 
282     #[cfg(feature = "http2")]
into_http2(self) -> Http2SendRequest<B>283     pub(super) fn into_http2(self) -> Http2SendRequest<B> {
284         Http2SendRequest {
285             dispatch: self.dispatch.unbound(),
286         }
287     }
288 }
289 
290 #[cfg_attr(feature = "deprecated", allow(deprecated))]
291 impl<B> SendRequest<B>
292 where
293     B: HttpBody + 'static,
294 {
295     /// Sends a `Request` on the associated connection.
296     ///
297     /// Returns a future that if successful, yields the `Response`.
298     ///
299     /// # Note
300     ///
301     /// There are some key differences in what automatic things the `Client`
302     /// does for you that will not be done here:
303     ///
304     /// - `Client` requires absolute-form `Uri`s, since the scheme and
305     ///   authority are needed to connect. They aren't required here.
306     /// - Since the `Client` requires absolute-form `Uri`s, it can add
307     ///   the `Host` header based on it. You must add a `Host` header yourself
308     ///   before calling this method.
309     /// - Since absolute-form `Uri`s are not required, if received, they will
310     ///   be serialized as-is.
311     ///
312     /// # Example
313     ///
314     /// ```
315     /// # use http::header::HOST;
316     /// # use hyper::client::conn::SendRequest;
317     /// # use hyper::Body;
318     /// use hyper::Request;
319     ///
320     /// # async fn doc(mut tx: SendRequest<Body>) -> hyper::Result<()> {
321     /// // build a Request
322     /// let req = Request::builder()
323     ///     .uri("/foo/bar")
324     ///     .header(HOST, "hyper.rs")
325     ///     .body(Body::empty())
326     ///     .unwrap();
327     ///
328     /// // send it and await a Response
329     /// let res = tx.send_request(req).await?;
330     /// // assert the Response
331     /// assert!(res.status().is_success());
332     /// # Ok(())
333     /// # }
334     /// # fn main() {}
335     /// ```
send_request(&mut self, req: Request<B>) -> ResponseFuture336     pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture {
337         let inner = match self.dispatch.send(req) {
338             Ok(rx) => ResponseFutureState::Waiting(rx),
339             Err(_req) => {
340                 debug!("connection was not ready");
341                 let err = crate::Error::new_canceled().with("connection was not ready");
342                 ResponseFutureState::Error(Some(err))
343             }
344         };
345 
346         ResponseFuture { inner }
347     }
348 
send_request_retryable( &mut self, req: Request<B>, ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin where B: Send,349     pub(super) fn send_request_retryable(
350         &mut self,
351         req: Request<B>,
352     ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
353     where
354         B: Send,
355     {
356         match self.dispatch.try_send(req) {
357             Ok(rx) => {
358                 Either::Left(rx.then(move |res| {
359                     match res {
360                         Ok(Ok(res)) => future::ok(res),
361                         Ok(Err(err)) => future::err(err),
362                         // this is definite bug if it happens, but it shouldn't happen!
363                         Err(_) => panic!("dispatch dropped without returning error"),
364                     }
365                 }))
366             }
367             Err(req) => {
368                 debug!("connection was not ready");
369                 let err = crate::Error::new_canceled().with("connection was not ready");
370                 Either::Right(future::err((err, Some(req))))
371             }
372         }
373     }
374 }
375 
376 #[cfg_attr(feature = "deprecated", allow(deprecated))]
377 impl<B> Service<Request<B>> for SendRequest<B>
378 where
379     B: HttpBody + 'static,
380 {
381     type Response = Response<Body>;
382     type Error = crate::Error;
383     type Future = ResponseFuture;
384 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>385     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
386         self.poll_ready(cx)
387     }
388 
call(&mut self, req: Request<B>) -> Self::Future389     fn call(&mut self, req: Request<B>) -> Self::Future {
390         self.send_request(req)
391     }
392 }
393 
394 #[cfg_attr(feature = "deprecated", allow(deprecated))]
395 impl<B> fmt::Debug for SendRequest<B> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result396     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
397         f.debug_struct("SendRequest").finish()
398     }
399 }
400 
401 // ===== impl Http2SendRequest
402 
403 #[cfg(feature = "http2")]
404 impl<B> Http2SendRequest<B> {
is_ready(&self) -> bool405     pub(super) fn is_ready(&self) -> bool {
406         self.dispatch.is_ready()
407     }
408 
is_closed(&self) -> bool409     pub(super) fn is_closed(&self) -> bool {
410         self.dispatch.is_closed()
411     }
412 }
413 
414 #[cfg(feature = "http2")]
415 impl<B> Http2SendRequest<B>
416 where
417     B: HttpBody + 'static,
418 {
send_request_retryable( &mut self, req: Request<B>, ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> where B: Send,419     pub(super) fn send_request_retryable(
420         &mut self,
421         req: Request<B>,
422     ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
423     where
424         B: Send,
425     {
426         match self.dispatch.try_send(req) {
427             Ok(rx) => {
428                 Either::Left(rx.then(move |res| {
429                     match res {
430                         Ok(Ok(res)) => future::ok(res),
431                         Ok(Err(err)) => future::err(err),
432                         // this is definite bug if it happens, but it shouldn't happen!
433                         Err(_) => panic!("dispatch dropped without returning error"),
434                     }
435                 }))
436             }
437             Err(req) => {
438                 debug!("connection was not ready");
439                 let err = crate::Error::new_canceled().with("connection was not ready");
440                 Either::Right(future::err((err, Some(req))))
441             }
442         }
443     }
444 }
445 
446 #[cfg(feature = "http2")]
447 impl<B> fmt::Debug for Http2SendRequest<B> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result448     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449         f.debug_struct("Http2SendRequest").finish()
450     }
451 }
452 
453 #[cfg(feature = "http2")]
454 impl<B> Clone for Http2SendRequest<B> {
clone(&self) -> Self455     fn clone(&self) -> Self {
456         Http2SendRequest {
457             dispatch: self.dispatch.clone(),
458         }
459     }
460 }
461 
462 // ===== impl Connection
463 
464 #[cfg_attr(feature = "deprecated", allow(deprecated))]
465 impl<T, B> Connection<T, B>
466 where
467     T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
468     B: HttpBody + Unpin + Send + 'static,
469     B::Data: Send,
470     B::Error: Into<Box<dyn StdError + Send + Sync>>,
471 {
472     /// Return the inner IO object, and additional information.
473     ///
474     /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
into_parts(self) -> Parts<T>475     pub fn into_parts(self) -> Parts<T> {
476         match self.inner.expect("already upgraded") {
477             #[cfg(feature = "http1")]
478             ProtoClient::H1 { h1 } => {
479                 let (io, read_buf, _) = h1.into_inner();
480                 Parts {
481                     io,
482                     read_buf,
483                     _inner: (),
484                 }
485             }
486             ProtoClient::H2 { .. } => {
487                 panic!("http2 cannot into_inner");
488             }
489 
490             #[cfg(not(feature = "http1"))]
491             ProtoClient::H1 { h1 } => match h1.0 {},
492         }
493     }
494 
495     /// Poll the connection for completion, but without calling `shutdown`
496     /// on the underlying IO.
497     ///
498     /// This is useful to allow running a connection while doing an HTTP
499     /// upgrade. Once the upgrade is completed, the connection would be "done",
500     /// but it is not desired to actually shutdown the IO object. Instead you
501     /// would take it back using `into_parts`.
502     ///
503     /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
504     /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
505     /// to work with this function; or use the `without_shutdown` wrapper.
poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>506     pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
507         match *self.inner.as_mut().expect("already upgraded") {
508             #[cfg(feature = "http1")]
509             ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx),
510             #[cfg(feature = "http2")]
511             ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()),
512 
513             #[cfg(not(feature = "http1"))]
514             ProtoClient::H1 { ref mut h1 } => match h1.0 {},
515             #[cfg(not(feature = "http2"))]
516             ProtoClient::H2 { ref mut h2, .. } => match h2.0 {},
517         }
518     }
519 
520     /// Prevent shutdown of the underlying IO object at the end of service the request,
521     /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>>522     pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>> {
523         let mut conn = Some(self);
524         future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
525             ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
526             Poll::Ready(Ok(conn.take().unwrap().into_parts()))
527         })
528     }
529 
530     /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
531     ///
532     /// This setting is configured by the server peer by sending the
533     /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
534     /// This method returns the currently acknowledged value received from the
535     /// remote.
536     ///
537     /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
538     /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
539     #[cfg(feature = "http2")]
http2_is_extended_connect_protocol_enabled(&self) -> bool540     pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool {
541         match self.inner.as_ref().unwrap() {
542             ProtoClient::H1 { .. } => false,
543             ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(),
544         }
545     }
546 }
547 
548 #[cfg_attr(feature = "deprecated", allow(deprecated))]
549 impl<T, B> Future for Connection<T, B>
550 where
551     T: AsyncRead + AsyncWrite + Unpin + Send,
552     B: HttpBody + Send + 'static,
553     B::Data: Send,
554     B::Error: Into<Box<dyn StdError + Send + Sync>>,
555 {
556     type Output = crate::Result<()>;
557 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>558     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
559         match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
560             proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
561             #[cfg(feature = "http1")]
562             proto::Dispatched::Upgrade(pending) => match self.inner.take() {
563                 Some(ProtoClient::H1 { h1 }) => {
564                     let (io, buf, _) = h1.into_inner();
565                     pending.fulfill(Upgraded::new(io, buf));
566                     Poll::Ready(Ok(()))
567                 }
568                 _ => {
569                     drop(pending);
570                     unreachable!("Upgrade expects h1");
571                 }
572             },
573         }
574     }
575 }
576 
577 #[cfg_attr(feature = "deprecated", allow(deprecated))]
578 impl<T, B> fmt::Debug for Connection<T, B>
579 where
580     T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
581     B: HttpBody + 'static,
582 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result583     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
584         f.debug_struct("Connection").finish()
585     }
586 }
587 
588 // ===== impl Builder
589 
590 #[cfg_attr(feature = "deprecated", allow(deprecated))]
591 impl Builder {
592     /// Creates a new connection builder.
593     #[inline]
new() -> Builder594     pub fn new() -> Builder {
595         Builder {
596             exec: Exec::Default,
597             h09_responses: false,
598             h1_writev: None,
599             h1_read_buf_exact_size: None,
600             h1_parser_config: Default::default(),
601             h1_title_case_headers: false,
602             h1_preserve_header_case: false,
603             #[cfg(feature = "ffi")]
604             h1_preserve_header_order: false,
605             h1_max_buf_size: None,
606             #[cfg(feature = "ffi")]
607             h1_headers_raw: false,
608             #[cfg(feature = "http2")]
609             h2_builder: Default::default(),
610             #[cfg(feature = "http1")]
611             version: Proto::Http1,
612             #[cfg(not(feature = "http1"))]
613             version: Proto::Http2,
614         }
615     }
616 
617     /// Provide an executor to execute background HTTP2 tasks.
executor<E>(&mut self, exec: E) -> &mut Builder where E: Executor<BoxSendFuture> + Send + Sync + 'static,618     pub fn executor<E>(&mut self, exec: E) -> &mut Builder
619     where
620         E: Executor<BoxSendFuture> + Send + Sync + 'static,
621     {
622         self.exec = Exec::Executor(Arc::new(exec));
623         self
624     }
625 
626     /// Set whether HTTP/0.9 responses should be tolerated.
627     ///
628     /// Default is false.
http09_responses(&mut self, enabled: bool) -> &mut Builder629     pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
630         self.h09_responses = enabled;
631         self
632     }
633 
634     /// Set whether HTTP/1 connections will accept spaces between header names
635     /// and the colon that follow them in responses.
636     ///
637     /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
638     /// to say about it:
639     ///
640     /// > No whitespace is allowed between the header field-name and colon. In
641     /// > the past, differences in the handling of such whitespace have led to
642     /// > security vulnerabilities in request routing and response handling. A
643     /// > server MUST reject any received request message that contains
644     /// > whitespace between a header field-name and colon with a response code
645     /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
646     /// > response message before forwarding the message downstream.
647     ///
648     /// Note that this setting does not affect HTTP/2.
649     ///
650     /// Default is false.
651     ///
652     /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
http1_allow_spaces_after_header_name_in_responses( &mut self, enabled: bool, ) -> &mut Builder653     pub fn http1_allow_spaces_after_header_name_in_responses(
654         &mut self,
655         enabled: bool,
656     ) -> &mut Builder {
657         self.h1_parser_config
658             .allow_spaces_after_header_name_in_responses(enabled);
659         self
660     }
661 
662     /// Set whether HTTP/1 connections will accept obsolete line folding for
663     /// header values.
664     ///
665     /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
666     /// parsing.
667     ///
668     /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
669     /// to say about it:
670     ///
671     /// > A server that receives an obs-fold in a request message that is not
672     /// > within a message/http container MUST either reject the message by
673     /// > sending a 400 (Bad Request), preferably with a representation
674     /// > explaining that obsolete line folding is unacceptable, or replace
675     /// > each received obs-fold with one or more SP octets prior to
676     /// > interpreting the field value or forwarding the message downstream.
677     ///
678     /// > A proxy or gateway that receives an obs-fold in a response message
679     /// > that is not within a message/http container MUST either discard the
680     /// > message and replace it with a 502 (Bad Gateway) response, preferably
681     /// > with a representation explaining that unacceptable line folding was
682     /// > received, or replace each received obs-fold with one or more SP
683     /// > octets prior to interpreting the field value or forwarding the
684     /// > message downstream.
685     ///
686     /// > A user agent that receives an obs-fold in a response message that is
687     /// > not within a message/http container MUST replace each received
688     /// > obs-fold with one or more SP octets prior to interpreting the field
689     /// > value.
690     ///
691     /// Note that this setting does not affect HTTP/2.
692     ///
693     /// Default is false.
694     ///
695     /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
http1_allow_obsolete_multiline_headers_in_responses( &mut self, enabled: bool, ) -> &mut Builder696     pub fn http1_allow_obsolete_multiline_headers_in_responses(
697         &mut self,
698         enabled: bool,
699     ) -> &mut Builder {
700         self.h1_parser_config
701             .allow_obsolete_multiline_headers_in_responses(enabled);
702         self
703     }
704 
705     /// Set whether HTTP/1 connections will silently ignored malformed header lines.
706     ///
707     /// If this is enabled and and a header line does not start with a valid header
708     /// name, or does not include a colon at all, the line will be silently ignored
709     /// and no error will be reported.
710     ///
711     /// Note that this setting does not affect HTTP/2.
712     ///
713     /// Default is false.
http1_ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder714     pub fn http1_ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
715         self.h1_parser_config
716             .ignore_invalid_headers_in_responses(enabled);
717         self
718     }
719 
720     /// Set whether HTTP/1 connections should try to use vectored writes,
721     /// or always flatten into a single buffer.
722     ///
723     /// Note that setting this to false may mean more copies of body data,
724     /// but may also improve performance when an IO transport doesn't
725     /// support vectored writes well, such as most TLS implementations.
726     ///
727     /// Setting this to true will force hyper to use queued strategy
728     /// which may eliminate unnecessary cloning on some TLS backends
729     ///
730     /// Default is `auto`. In this mode hyper will try to guess which
731     /// mode to use
http1_writev(&mut self, enabled: bool) -> &mut Builder732     pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
733         self.h1_writev = Some(enabled);
734         self
735     }
736 
737     /// Set whether HTTP/1 connections will write header names as title case at
738     /// the socket level.
739     ///
740     /// Note that this setting does not affect HTTP/2.
741     ///
742     /// Default is false.
http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder743     pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder {
744         self.h1_title_case_headers = enabled;
745         self
746     }
747 
748     /// Set whether to support preserving original header cases.
749     ///
750     /// Currently, this will record the original cases received, and store them
751     /// in a private extension on the `Response`. It will also look for and use
752     /// such an extension in any provided `Request`.
753     ///
754     /// Since the relevant extension is still private, there is no way to
755     /// interact with the original cases. The only effect this can have now is
756     /// to forward the cases in a proxy-like fashion.
757     ///
758     /// Note that this setting does not affect HTTP/2.
759     ///
760     /// Default is false.
http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder761     pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
762         self.h1_preserve_header_case = enabled;
763         self
764     }
765 
766     /// Set whether to support preserving original header order.
767     ///
768     /// Currently, this will record the order in which headers are received, and store this
769     /// ordering in a private extension on the `Response`. It will also look for and use
770     /// such an extension in any provided `Request`.
771     ///
772     /// Note that this setting does not affect HTTP/2.
773     ///
774     /// Default is false.
775     #[cfg(feature = "ffi")]
http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder776     pub fn http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
777         self.h1_preserve_header_order = enabled;
778         self
779     }
780 
781     /// Sets the exact size of the read buffer to *always* use.
782     ///
783     /// Note that setting this option unsets the `http1_max_buf_size` option.
784     ///
785     /// Default is an adaptive read buffer.
http1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder786     pub fn http1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
787         self.h1_read_buf_exact_size = sz;
788         self.h1_max_buf_size = None;
789         self
790     }
791 
792     /// Set the maximum buffer size for the connection.
793     ///
794     /// Default is ~400kb.
795     ///
796     /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
797     ///
798     /// # Panics
799     ///
800     /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
801     #[cfg(feature = "http1")]
802     #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
http1_max_buf_size(&mut self, max: usize) -> &mut Self803     pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
804         assert!(
805             max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
806             "the max_buf_size cannot be smaller than the minimum that h1 specifies."
807         );
808 
809         self.h1_max_buf_size = Some(max);
810         self.h1_read_buf_exact_size = None;
811         self
812     }
813 
814     #[cfg(feature = "ffi")]
http1_headers_raw(&mut self, enabled: bool) -> &mut Self815     pub(crate) fn http1_headers_raw(&mut self, enabled: bool) -> &mut Self {
816         self.h1_headers_raw = enabled;
817         self
818     }
819 
820     /// Sets whether HTTP2 is required.
821     ///
822     /// Default is false.
823     #[cfg(feature = "http2")]
824     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_only(&mut self, enabled: bool) -> &mut Builder825     pub fn http2_only(&mut self, enabled: bool) -> &mut Builder {
826         if enabled {
827             self.version = Proto::Http2
828         }
829         self
830     }
831 
832     /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
833     /// stream-level flow control.
834     ///
835     /// Passing `None` will do nothing.
836     ///
837     /// If not set, hyper will use a default.
838     ///
839     /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
840     #[cfg(feature = "http2")]
841     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self842     pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
843         if let Some(sz) = sz.into() {
844             self.h2_builder.adaptive_window = false;
845             self.h2_builder.initial_stream_window_size = sz;
846         }
847         self
848     }
849 
850     /// Sets the max connection-level flow control for HTTP2
851     ///
852     /// Passing `None` will do nothing.
853     ///
854     /// If not set, hyper will use a default.
855     #[cfg(feature = "http2")]
856     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_initial_connection_window_size( &mut self, sz: impl Into<Option<u32>>, ) -> &mut Self857     pub fn http2_initial_connection_window_size(
858         &mut self,
859         sz: impl Into<Option<u32>>,
860     ) -> &mut Self {
861         if let Some(sz) = sz.into() {
862             self.h2_builder.adaptive_window = false;
863             self.h2_builder.initial_conn_window_size = sz;
864         }
865         self
866     }
867 
868     /// Sets whether to use an adaptive flow control.
869     ///
870     /// Enabling this will override the limits set in
871     /// `http2_initial_stream_window_size` and
872     /// `http2_initial_connection_window_size`.
873     #[cfg(feature = "http2")]
874     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_adaptive_window(&mut self, enabled: bool) -> &mut Self875     pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
876         use proto::h2::SPEC_WINDOW_SIZE;
877 
878         self.h2_builder.adaptive_window = enabled;
879         if enabled {
880             self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
881             self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
882         }
883         self
884     }
885 
886     /// Sets the maximum frame size to use for HTTP2.
887     ///
888     /// Passing `None` will do nothing.
889     ///
890     /// If not set, hyper will use a default.
891     #[cfg(feature = "http2")]
892     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self893     pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
894         if let Some(sz) = sz.into() {
895             self.h2_builder.max_frame_size = sz;
896         }
897         self
898     }
899 
900     /// Sets an interval for HTTP2 Ping frames should be sent to keep a
901     /// connection alive.
902     ///
903     /// Pass `None` to disable HTTP2 keep-alive.
904     ///
905     /// Default is currently disabled.
906     ///
907     /// # Cargo Feature
908     ///
909     /// Requires the `runtime` cargo feature to be enabled.
910     #[cfg(feature = "runtime")]
911     #[cfg(feature = "http2")]
912     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_keep_alive_interval( &mut self, interval: impl Into<Option<Duration>>, ) -> &mut Self913     pub fn http2_keep_alive_interval(
914         &mut self,
915         interval: impl Into<Option<Duration>>,
916     ) -> &mut Self {
917         self.h2_builder.keep_alive_interval = interval.into();
918         self
919     }
920 
921     /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
922     ///
923     /// If the ping is not acknowledged within the timeout, the connection will
924     /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
925     ///
926     /// Default is 20 seconds.
927     ///
928     /// # Cargo Feature
929     ///
930     /// Requires the `runtime` cargo feature to be enabled.
931     #[cfg(feature = "runtime")]
932     #[cfg(feature = "http2")]
933     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self934     pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
935         self.h2_builder.keep_alive_timeout = timeout;
936         self
937     }
938 
939     /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
940     ///
941     /// If disabled, keep-alive pings are only sent while there are open
942     /// request/responses streams. If enabled, pings are also sent when no
943     /// streams are active. Does nothing if `http2_keep_alive_interval` is
944     /// disabled.
945     ///
946     /// Default is `false`.
947     ///
948     /// # Cargo Feature
949     ///
950     /// Requires the `runtime` cargo feature to be enabled.
951     #[cfg(feature = "runtime")]
952     #[cfg(feature = "http2")]
953     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self954     pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
955         self.h2_builder.keep_alive_while_idle = enabled;
956         self
957     }
958 
959     /// Sets the maximum number of HTTP2 concurrent locally reset streams.
960     ///
961     /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
962     /// details.
963     ///
964     /// The default value is determined by the `h2` crate.
965     ///
966     /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
967     #[cfg(feature = "http2")]
968     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self969     pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
970         self.h2_builder.max_concurrent_reset_streams = Some(max);
971         self
972     }
973 
974     /// Set the maximum write buffer size for each HTTP/2 stream.
975     ///
976     /// Default is currently 1MB, but may change.
977     ///
978     /// # Panics
979     ///
980     /// The value must be no larger than `u32::MAX`.
981     #[cfg(feature = "http2")]
982     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_max_send_buf_size(&mut self, max: usize) -> &mut Self983     pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
984         assert!(max <= std::u32::MAX as usize);
985         self.h2_builder.max_send_buffer_size = max;
986         self
987     }
988 
989     /// Constructs a connection with the configured options and IO.
990     /// See [`client::conn`](crate::client::conn) for more.
991     ///
992     /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
993     /// do nothing.
handshake<T, B>( &self, io: T, ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: HttpBody + 'static, B::Data: Send, B::Error: Into<Box<dyn StdError + Send + Sync>>,994     pub fn handshake<T, B>(
995         &self,
996         io: T,
997     ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
998     where
999         T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1000         B: HttpBody + 'static,
1001         B::Data: Send,
1002         B::Error: Into<Box<dyn StdError + Send + Sync>>,
1003     {
1004         let opts = self.clone();
1005 
1006         async move {
1007             trace!("client handshake {:?}", opts.version);
1008 
1009             let (tx, rx) = dispatch::channel();
1010             let proto = match opts.version {
1011                 #[cfg(feature = "http1")]
1012                 Proto::Http1 => {
1013                     let mut conn = proto::Conn::new(io);
1014                     conn.set_h1_parser_config(opts.h1_parser_config);
1015                     if let Some(writev) = opts.h1_writev {
1016                         if writev {
1017                             conn.set_write_strategy_queue();
1018                         } else {
1019                             conn.set_write_strategy_flatten();
1020                         }
1021                     }
1022                     if opts.h1_title_case_headers {
1023                         conn.set_title_case_headers();
1024                     }
1025                     if opts.h1_preserve_header_case {
1026                         conn.set_preserve_header_case();
1027                     }
1028                     #[cfg(feature = "ffi")]
1029                     if opts.h1_preserve_header_order {
1030                         conn.set_preserve_header_order();
1031                     }
1032                     if opts.h09_responses {
1033                         conn.set_h09_responses();
1034                     }
1035 
1036                     #[cfg(feature = "ffi")]
1037                     conn.set_raw_headers(opts.h1_headers_raw);
1038 
1039                     if let Some(sz) = opts.h1_read_buf_exact_size {
1040                         conn.set_read_buf_exact_size(sz);
1041                     }
1042                     if let Some(max) = opts.h1_max_buf_size {
1043                         conn.set_max_buf_size(max);
1044                     }
1045                     let cd = proto::h1::dispatch::Client::new(rx);
1046                     let dispatch = proto::h1::Dispatcher::new(cd, conn);
1047                     ProtoClient::H1 { h1: dispatch }
1048                 }
1049                 #[cfg(feature = "http2")]
1050                 Proto::Http2 => {
1051                     let h2 =
1052                         proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
1053                             .await?;
1054                     ProtoClient::H2 { h2 }
1055                 }
1056             };
1057 
1058             Ok((
1059                 SendRequest { dispatch: tx },
1060                 Connection { inner: Some(proto) },
1061             ))
1062         }
1063     }
1064 }
1065 
1066 // ===== impl ResponseFuture
1067 
1068 impl Future for ResponseFuture {
1069     type Output = crate::Result<Response<Body>>;
1070 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1071     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1072         match self.inner {
1073             ResponseFutureState::Waiting(ref mut rx) => {
1074                 Pin::new(rx).poll(cx).map(|res| match res {
1075                     Ok(Ok(resp)) => Ok(resp),
1076                     Ok(Err(err)) => Err(err),
1077                     // this is definite bug if it happens, but it shouldn't happen!
1078                     Err(_canceled) => panic!("dispatch dropped without returning error"),
1079                 })
1080             }
1081             ResponseFutureState::Error(ref mut err) => {
1082                 Poll::Ready(Err(err.take().expect("polled after ready")))
1083             }
1084         }
1085     }
1086 }
1087 
1088 impl fmt::Debug for ResponseFuture {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1089     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1090         f.debug_struct("ResponseFuture").finish()
1091     }
1092 }
1093 
1094 // ===== impl ProtoClient
1095 
1096 impl<T, B> Future for ProtoClient<T, B>
1097 where
1098     T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
1099     B: HttpBody + Send + 'static,
1100     B::Data: Send,
1101     B::Error: Into<Box<dyn StdError + Send + Sync>>,
1102 {
1103     type Output = crate::Result<proto::Dispatched>;
1104 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1105     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1106         match self.project() {
1107             #[cfg(feature = "http1")]
1108             ProtoClientProj::H1 { h1 } => h1.poll(cx),
1109             #[cfg(feature = "http2")]
1110             ProtoClientProj::H2 { h2, .. } => h2.poll(cx),
1111 
1112             #[cfg(not(feature = "http1"))]
1113             ProtoClientProj::H1 { h1 } => match h1.0 {},
1114             #[cfg(not(feature = "http2"))]
1115             ProtoClientProj::H2 { h2, .. } => match h2.0 {},
1116         }
1117     }
1118 }
1119 
1120 // assert trait markers
1121 
1122 trait AssertSend: Send {}
1123 trait AssertSendSync: Send + Sync {}
1124 
1125 #[cfg_attr(feature = "deprecated", allow(deprecated))]
1126 #[doc(hidden)]
1127 impl<B: Send> AssertSendSync for SendRequest<B> {}
1128 
1129 #[cfg_attr(feature = "deprecated", allow(deprecated))]
1130 #[doc(hidden)]
1131 impl<T: Send, B: Send> AssertSend for Connection<T, B>
1132 where
1133     T: AsyncRead + AsyncWrite + Send + 'static,
1134     B: HttpBody + 'static,
1135     B::Data: Send,
1136 {
1137 }
1138 
1139 #[cfg_attr(feature = "deprecated", allow(deprecated))]
1140 #[doc(hidden)]
1141 impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
1142 where
1143     T: AsyncRead + AsyncWrite + Send + 'static,
1144     B: HttpBody + 'static,
1145     B::Data: Send + Sync + 'static,
1146 {
1147 }
1148 
1149 #[cfg_attr(feature = "deprecated", allow(deprecated))]
1150 #[doc(hidden)]
1151 impl AssertSendSync for Builder {}
1152 
1153 #[doc(hidden)]
1154 impl AssertSend for ResponseFuture {}
1155