1 use std::error::Error as StdError;
2 use std::fmt;
3 use std::future::Future;
4 use std::marker::Unpin;
5 use std::mem;
6 use std::pin::Pin;
7 use std::task::{Context, Poll};
8 use std::time::Duration;
9 
10 use futures_channel::oneshot;
11 use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
12 use http::header::{HeaderValue, HOST};
13 use http::uri::{Port, Scheme};
14 use http::{Method, Request, Response, Uri, Version};
15 use tracing::{debug, trace, warn};
16 
17 use crate::body::{Body, HttpBody};
18 use crate::client::connect::CaptureConnectionExtension;
19 use crate::common::{exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, Lazy};
20 #[cfg(feature = "http2")]
21 use crate::ext::Protocol;
22 use crate::rt::Executor;
23 
24 use super::conn;
25 use super::connect::{self, sealed::Connect, Alpn, Connected, Connection};
26 use super::pool::{
27     self, CheckoutIsClosedError, Key as PoolKey, Pool, Poolable, Pooled, Reservation,
28 };
29 #[cfg(feature = "tcp")]
30 use super::HttpConnector;
31 
32 /// A Client to make outgoing HTTP requests.
33 ///
34 /// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
35 /// underlying connection pool will be reused.
36 #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
37 pub struct Client<C, B = Body> {
38     config: Config,
39     #[cfg_attr(feature = "deprecated", allow(deprecated))]
40     conn_builder: conn::Builder,
41     connector: C,
42     pool: Pool<PoolClient<B>>,
43 }
44 
45 #[derive(Clone, Copy, Debug)]
46 struct Config {
47     retry_canceled_requests: bool,
48     set_host: bool,
49     ver: Ver,
50 }
51 
52 /// A `Future` that will resolve to an HTTP Response.
53 ///
54 /// This is returned by `Client::request` (and `Client::get`).
55 #[must_use = "futures do nothing unless polled"]
56 pub struct ResponseFuture {
57     inner: SyncWrapper<Pin<Box<dyn Future<Output = crate::Result<Response<Body>>> + Send>>>,
58 }
59 
60 // ===== impl Client =====
61 
62 #[cfg(feature = "tcp")]
63 impl Client<HttpConnector, Body> {
64     /// Create a new Client with the default [config](Builder).
65     ///
66     /// # Note
67     ///
68     /// The default connector does **not** handle TLS. Speaking to `https`
69     /// destinations will require [configuring a connector that implements
70     /// TLS](https://hyper.rs/guides/client/configuration).
71     #[cfg_attr(docsrs, doc(cfg(feature = "tcp")))]
72     #[inline]
new() -> Client<HttpConnector, Body>73     pub fn new() -> Client<HttpConnector, Body> {
74         Builder::default().build_http()
75     }
76 }
77 
78 #[cfg(feature = "tcp")]
79 impl Default for Client<HttpConnector, Body> {
default() -> Client<HttpConnector, Body>80     fn default() -> Client<HttpConnector, Body> {
81         Client::new()
82     }
83 }
84 
85 impl Client<(), Body> {
86     /// Create a builder to configure a new `Client`.
87     ///
88     /// # Example
89     ///
90     /// ```
91     /// # #[cfg(feature  = "runtime")]
92     /// # fn run () {
93     /// use std::time::Duration;
94     /// use hyper::Client;
95     ///
96     /// let client = Client::builder()
97     ///     .pool_idle_timeout(Duration::from_secs(30))
98     ///     .http2_only(true)
99     ///     .build_http();
100     /// # let infer: Client<_, hyper::Body> = client;
101     /// # drop(infer);
102     /// # }
103     /// # fn main() {}
104     /// ```
105     #[inline]
builder() -> Builder106     pub fn builder() -> Builder {
107         Builder::default()
108     }
109 }
110 
111 impl<C, B> Client<C, B>
112 where
113     C: Connect + Clone + Send + Sync + 'static,
114     B: HttpBody + Send + 'static,
115     B::Data: Send,
116     B::Error: Into<Box<dyn StdError + Send + Sync>>,
117 {
118     /// Send a `GET` request to the supplied `Uri`.
119     ///
120     /// # Note
121     ///
122     /// This requires that the `HttpBody` type have a `Default` implementation.
123     /// It *should* return an "empty" version of itself, such that
124     /// `HttpBody::is_end_stream` is `true`.
125     ///
126     /// # Example
127     ///
128     /// ```
129     /// # #[cfg(feature  = "runtime")]
130     /// # fn run () {
131     /// use hyper::{Client, Uri};
132     ///
133     /// let client = Client::new();
134     ///
135     /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
136     /// # }
137     /// # fn main() {}
138     /// ```
get(&self, uri: Uri) -> ResponseFuture where B: Default,139     pub fn get(&self, uri: Uri) -> ResponseFuture
140     where
141         B: Default,
142     {
143         let body = B::default();
144         if !body.is_end_stream() {
145             warn!("default HttpBody used for get() does not return true for is_end_stream");
146         }
147 
148         let mut req = Request::new(body);
149         *req.uri_mut() = uri;
150         self.request(req)
151     }
152 
153     /// Send a constructed `Request` using this `Client`.
154     ///
155     /// # Example
156     ///
157     /// ```
158     /// # #[cfg(feature  = "runtime")]
159     /// # fn run () {
160     /// use hyper::{Body, Method, Client, Request};
161     ///
162     /// let client = Client::new();
163     ///
164     /// let req = Request::builder()
165     ///     .method(Method::POST)
166     ///     .uri("http://httpbin.org/post")
167     ///     .body(Body::from("Hallo!"))
168     ///     .expect("request builder");
169     ///
170     /// let future = client.request(req);
171     /// # }
172     /// # fn main() {}
173     /// ```
request(&self, mut req: Request<B>) -> ResponseFuture174     pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
175         let is_http_connect = req.method() == Method::CONNECT;
176         match req.version() {
177             Version::HTTP_11 => (),
178             Version::HTTP_10 => {
179                 if is_http_connect {
180                     warn!("CONNECT is not allowed for HTTP/1.0");
181                     return ResponseFuture::new(future::err(
182                         crate::Error::new_user_unsupported_request_method(),
183                     ));
184                 }
185             }
186             Version::HTTP_2 => (),
187             // completely unsupported HTTP version (like HTTP/0.9)!
188             other => return ResponseFuture::error_version(other),
189         };
190 
191         let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
192             Ok(s) => s,
193             Err(err) => {
194                 return ResponseFuture::new(future::err(err));
195             }
196         };
197 
198         ResponseFuture::new(self.clone().retryably_send_request(req, pool_key))
199     }
200 
retryably_send_request( self, mut req: Request<B>, pool_key: PoolKey, ) -> crate::Result<Response<Body>>201     async fn retryably_send_request(
202         self,
203         mut req: Request<B>,
204         pool_key: PoolKey,
205     ) -> crate::Result<Response<Body>> {
206         let uri = req.uri().clone();
207 
208         loop {
209             req = match self.send_request(req, pool_key.clone()).await {
210                 Ok(resp) => return Ok(resp),
211                 Err(ClientError::Normal(err)) => return Err(err),
212                 Err(ClientError::Canceled {
213                     connection_reused,
214                     mut req,
215                     reason,
216                 }) => {
217                     if !self.config.retry_canceled_requests || !connection_reused {
218                         // if client disabled, don't retry
219                         // a fresh connection means we definitely can't retry
220                         return Err(reason);
221                     }
222 
223                     trace!(
224                         "unstarted request canceled, trying again (reason={:?})",
225                         reason
226                     );
227                     *req.uri_mut() = uri.clone();
228                     req
229                 }
230             }
231         }
232     }
233 
send_request( &self, mut req: Request<B>, pool_key: PoolKey, ) -> Result<Response<Body>, ClientError<B>>234     async fn send_request(
235         &self,
236         mut req: Request<B>,
237         pool_key: PoolKey,
238     ) -> Result<Response<Body>, ClientError<B>> {
239         let mut pooled = match self.connection_for(pool_key).await {
240             Ok(pooled) => pooled,
241             Err(ClientConnectError::Normal(err)) => return Err(ClientError::Normal(err)),
242             Err(ClientConnectError::H2CheckoutIsClosed(reason)) => {
243                 return Err(ClientError::Canceled {
244                     connection_reused: true,
245                     req,
246                     reason,
247                 })
248             }
249         };
250         req.extensions_mut()
251             .get_mut::<CaptureConnectionExtension>()
252             .map(|conn| conn.set(&pooled.conn_info));
253         if pooled.is_http1() {
254             if req.version() == Version::HTTP_2 {
255                 warn!("Connection is HTTP/1, but request requires HTTP/2");
256                 return Err(ClientError::Normal(
257                     crate::Error::new_user_unsupported_version()
258                         .with_client_connect_info(pooled.conn_info.clone()),
259                 ));
260             }
261 
262             if self.config.set_host {
263                 let uri = req.uri().clone();
264                 req.headers_mut().entry(HOST).or_insert_with(|| {
265                     let hostname = uri.host().expect("authority implies host");
266                     if let Some(port) = get_non_default_port(&uri) {
267                         let s = format!("{}:{}", hostname, port);
268                         HeaderValue::from_str(&s)
269                     } else {
270                         HeaderValue::from_str(hostname)
271                     }
272                     .expect("uri host is valid header value")
273                 });
274             }
275 
276             // CONNECT always sends authority-form, so check it first...
277             if req.method() == Method::CONNECT {
278                 authority_form(req.uri_mut());
279             } else if pooled.conn_info.is_proxied {
280                 absolute_form(req.uri_mut());
281             } else {
282                 origin_form(req.uri_mut());
283             }
284         } else if req.method() == Method::CONNECT {
285             #[cfg(not(feature = "http2"))]
286             authority_form(req.uri_mut());
287 
288             #[cfg(feature = "http2")]
289             if req.extensions().get::<Protocol>().is_none() {
290                 authority_form(req.uri_mut());
291             }
292         }
293 
294         let mut res = match pooled.send_request_retryable(req).await {
295             Err((err, orig_req)) => {
296                 return Err(ClientError::map_with_reused(pooled.is_reused())((
297                     err.with_client_connect_info(pooled.conn_info.clone()),
298                     orig_req,
299                 )));
300             }
301             Ok(res) => res,
302         };
303 
304         // If the Connector included 'extra' info, add to Response...
305         if let Some(extra) = &pooled.conn_info.extra {
306             extra.set(res.extensions_mut());
307         }
308 
309         // As of [email protected], there is a race condition in the mpsc
310         // channel, such that sending when the receiver is closing can
311         // result in the message being stuck inside the queue. It won't
312         // ever notify until the Sender side is dropped.
313         //
314         // To counteract this, we must check if our senders 'want' channel
315         // has been closed after having tried to send. If so, error out...
316         if pooled.is_closed() {
317             return Ok(res);
318         }
319 
320         // If pooled is HTTP/2, we can toss this reference immediately.
321         //
322         // when pooled is dropped, it will try to insert back into the
323         // pool. To delay that, spawn a future that completes once the
324         // sender is ready again.
325         //
326         // This *should* only be once the related `Connection` has polled
327         // for a new request to start.
328         //
329         // It won't be ready if there is a body to stream.
330         if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
331             drop(pooled);
332         } else if !res.body().is_end_stream() {
333             let (delayed_tx, delayed_rx) = oneshot::channel();
334             res.body_mut().delayed_eof(delayed_rx);
335             let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
336                 // At this point, `pooled` is dropped, and had a chance
337                 // to insert into the pool (if conn was idle)
338                 drop(delayed_tx);
339             });
340 
341             #[cfg_attr(feature = "deprecated", allow(deprecated))]
342             self.conn_builder.exec.execute(on_idle);
343         } else {
344             // There's no body to delay, but the connection isn't
345             // ready yet. Only re-insert when it's ready
346             let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
347 
348             #[cfg_attr(feature = "deprecated", allow(deprecated))]
349             self.conn_builder.exec.execute(on_idle);
350         }
351 
352         Ok(res)
353     }
354 
connection_for( &self, pool_key: PoolKey, ) -> Result<Pooled<PoolClient<B>>, ClientConnectError>355     async fn connection_for(
356         &self,
357         pool_key: PoolKey,
358     ) -> Result<Pooled<PoolClient<B>>, ClientConnectError> {
359         // This actually races 2 different futures to try to get a ready
360         // connection the fastest, and to reduce connection churn.
361         //
362         // - If the pool has an idle connection waiting, that's used
363         //   immediately.
364         // - Otherwise, the Connector is asked to start connecting to
365         //   the destination Uri.
366         // - Meanwhile, the pool Checkout is watching to see if any other
367         //   request finishes and tries to insert an idle connection.
368         // - If a new connection is started, but the Checkout wins after
369         //   (an idle connection became available first), the started
370         //   connection future is spawned into the runtime to complete,
371         //   and then be inserted into the pool as an idle connection.
372         let checkout = self.pool.checkout(pool_key.clone());
373         let connect = self.connect_to(pool_key);
374         let is_ver_h2 = self.config.ver == Ver::Http2;
375 
376         // The order of the `select` is depended on below...
377 
378         match future::select(checkout, connect).await {
379             // Checkout won, connect future may have been started or not.
380             //
381             // If it has, let it finish and insert back into the pool,
382             // so as to not waste the socket...
383             Either::Left((Ok(checked_out), connecting)) => {
384                 // This depends on the `select` above having the correct
385                 // order, such that if the checkout future were ready
386                 // immediately, the connect future will never have been
387                 // started.
388                 //
389                 // If it *wasn't* ready yet, then the connect future will
390                 // have been started...
391                 if connecting.started() {
392                     let bg = connecting
393                         .map_err(|err| {
394                             trace!("background connect error: {}", err);
395                         })
396                         .map(|_pooled| {
397                             // dropping here should just place it in
398                             // the Pool for us...
399                         });
400                     // An execute error here isn't important, we're just trying
401                     // to prevent a waste of a socket...
402                     #[cfg_attr(feature = "deprecated", allow(deprecated))]
403                     self.conn_builder.exec.execute(bg);
404                 }
405                 Ok(checked_out)
406             }
407             // Connect won, checkout can just be dropped.
408             Either::Right((Ok(connected), _checkout)) => Ok(connected),
409             // Either checkout or connect could get canceled:
410             //
411             // 1. Connect is canceled if this is HTTP/2 and there is
412             //    an outstanding HTTP/2 connecting task.
413             // 2. Checkout is canceled if the pool cannot deliver an
414             //    idle connection reliably.
415             //
416             // In both cases, we should just wait for the other future.
417             Either::Left((Err(err), connecting)) => {
418                 if err.is_canceled() {
419                     connecting.await.map_err(ClientConnectError::Normal)
420                 } else {
421                     Err(ClientConnectError::Normal(err))
422                 }
423             }
424             Either::Right((Err(err), checkout)) => {
425                 if err.is_canceled() {
426                     checkout.await.map_err(move |err| {
427                         if is_ver_h2
428                             && err.is_canceled()
429                             && err.find_source::<CheckoutIsClosedError>().is_some()
430                         {
431                             ClientConnectError::H2CheckoutIsClosed(err)
432                         } else {
433                             ClientConnectError::Normal(err)
434                         }
435                     })
436                 } else {
437                     Err(ClientConnectError::Normal(err))
438                 }
439             }
440         }
441     }
442 
connect_to( &self, pool_key: PoolKey, ) -> impl Lazy<Output = crate::Result<Pooled<PoolClient<B>>>> + Unpin443     fn connect_to(
444         &self,
445         pool_key: PoolKey,
446     ) -> impl Lazy<Output = crate::Result<Pooled<PoolClient<B>>>> + Unpin {
447         #[cfg_attr(feature = "deprecated", allow(deprecated))]
448         let executor = self.conn_builder.exec.clone();
449         let pool = self.pool.clone();
450         #[cfg(not(feature = "http2"))]
451         let conn_builder = self.conn_builder.clone();
452         #[cfg(feature = "http2")]
453         let mut conn_builder = self.conn_builder.clone();
454         let ver = self.config.ver;
455         let is_ver_h2 = ver == Ver::Http2;
456         let connector = self.connector.clone();
457         let dst = domain_as_uri(pool_key.clone());
458         hyper_lazy(move || {
459             // Try to take a "connecting lock".
460             //
461             // If the pool_key is for HTTP/2, and there is already a
462             // connection being established, then this can't take a
463             // second lock. The "connect_to" future is Canceled.
464             let connecting = match pool.connecting(&pool_key, ver) {
465                 Some(lock) => lock,
466                 None => {
467                     let canceled =
468                         crate::Error::new_canceled().with("HTTP/2 connection in progress");
469                     return Either::Right(future::err(canceled));
470                 }
471             };
472             Either::Left(
473                 connector
474                     .connect(connect::sealed::Internal, dst)
475                     .map_err(crate::Error::new_connect)
476                     .and_then(move |io| {
477                         let connected = io.connected();
478                         // If ALPN is h2 and we aren't http2_only already,
479                         // then we need to convert our pool checkout into
480                         // a single HTTP2 one.
481                         let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
482                             match connecting.alpn_h2(&pool) {
483                                 Some(lock) => {
484                                     trace!("ALPN negotiated h2, updating pool");
485                                     lock
486                                 }
487                                 None => {
488                                     // Another connection has already upgraded,
489                                     // the pool checkout should finish up for us.
490                                     let canceled = crate::Error::new_canceled()
491                                         .with("ALPN upgraded to HTTP/2");
492                                     return Either::Right(future::err(canceled));
493                                 }
494                             }
495                         } else {
496                             connecting
497                         };
498 
499                         #[cfg_attr(not(feature = "http2"), allow(unused))]
500                         let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
501                         #[cfg(feature = "http2")]
502                         {
503                             conn_builder.http2_only(is_h2);
504                         }
505 
506                         Either::Left(Box::pin(async move {
507                             let (tx, conn) = conn_builder.handshake(io).await?;
508 
509                             trace!("handshake complete, spawning background dispatcher task");
510                             executor.execute(
511                                 conn.map_err(|e| debug!("client connection error: {}", e))
512                                     .map(|_| ()),
513                             );
514 
515                             // Wait for 'conn' to ready up before we
516                             // declare this tx as usable
517                             let tx = tx.when_ready().await?;
518 
519                             let tx = {
520                                 #[cfg(feature = "http2")]
521                                 {
522                                     if is_h2 {
523                                         PoolTx::Http2(tx.into_http2())
524                                     } else {
525                                         PoolTx::Http1(tx)
526                                     }
527                                 }
528                                 #[cfg(not(feature = "http2"))]
529                                 PoolTx::Http1(tx)
530                             };
531 
532                             Ok(pool.pooled(
533                                 connecting,
534                                 PoolClient {
535                                     conn_info: connected,
536                                     tx,
537                                 },
538                             ))
539                         }))
540                     }),
541             )
542         })
543     }
544 }
545 
546 impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
547 where
548     C: Connect + Clone + Send + Sync + 'static,
549     B: HttpBody + Send + 'static,
550     B::Data: Send,
551     B::Error: Into<Box<dyn StdError + Send + Sync>>,
552 {
553     type Response = Response<Body>;
554     type Error = crate::Error;
555     type Future = ResponseFuture;
556 
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>557     fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
558         Poll::Ready(Ok(()))
559     }
560 
call(&mut self, req: Request<B>) -> Self::Future561     fn call(&mut self, req: Request<B>) -> Self::Future {
562         self.request(req)
563     }
564 }
565 
566 impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
567 where
568     C: Connect + Clone + Send + Sync + 'static,
569     B: HttpBody + Send + 'static,
570     B::Data: Send,
571     B::Error: Into<Box<dyn StdError + Send + Sync>>,
572 {
573     type Response = Response<Body>;
574     type Error = crate::Error;
575     type Future = ResponseFuture;
576 
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>577     fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
578         Poll::Ready(Ok(()))
579     }
580 
call(&mut self, req: Request<B>) -> Self::Future581     fn call(&mut self, req: Request<B>) -> Self::Future {
582         self.request(req)
583     }
584 }
585 
586 impl<C: Clone, B> Clone for Client<C, B> {
clone(&self) -> Client<C, B>587     fn clone(&self) -> Client<C, B> {
588         Client {
589             config: self.config.clone(),
590             conn_builder: self.conn_builder.clone(),
591             connector: self.connector.clone(),
592             pool: self.pool.clone(),
593         }
594     }
595 }
596 
597 impl<C, B> fmt::Debug for Client<C, B> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result598     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599         f.debug_struct("Client").finish()
600     }
601 }
602 
603 // ===== impl ResponseFuture =====
604 
605 impl ResponseFuture {
new<F>(value: F) -> Self where F: Future<Output = crate::Result<Response<Body>>> + Send + 'static,606     fn new<F>(value: F) -> Self
607     where
608         F: Future<Output = crate::Result<Response<Body>>> + Send + 'static,
609     {
610         Self {
611             inner: SyncWrapper::new(Box::pin(value)),
612         }
613     }
614 
error_version(ver: Version) -> Self615     fn error_version(ver: Version) -> Self {
616         warn!("Request has unsupported version \"{:?}\"", ver);
617         ResponseFuture::new(Box::pin(future::err(
618             crate::Error::new_user_unsupported_version(),
619         )))
620     }
621 }
622 
623 impl fmt::Debug for ResponseFuture {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result624     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625         f.pad("Future<Response>")
626     }
627 }
628 
629 impl Future for ResponseFuture {
630     type Output = crate::Result<Response<Body>>;
631 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>632     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
633         self.inner.get_mut().as_mut().poll(cx)
634     }
635 }
636 
637 // ===== impl PoolClient =====
638 
639 // FIXME: allow() required due to `impl Trait` leaking types to this lint
640 #[allow(missing_debug_implementations)]
641 struct PoolClient<B> {
642     conn_info: Connected,
643     tx: PoolTx<B>,
644 }
645 
646 enum PoolTx<B> {
647     #[cfg_attr(feature = "deprecated", allow(deprecated))]
648     Http1(conn::SendRequest<B>),
649     #[cfg(feature = "http2")]
650     Http2(conn::Http2SendRequest<B>),
651 }
652 
653 impl<B> PoolClient<B> {
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>654     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
655         match self.tx {
656             PoolTx::Http1(ref mut tx) => tx.poll_ready(cx),
657             #[cfg(feature = "http2")]
658             PoolTx::Http2(_) => Poll::Ready(Ok(())),
659         }
660     }
661 
is_http1(&self) -> bool662     fn is_http1(&self) -> bool {
663         !self.is_http2()
664     }
665 
is_http2(&self) -> bool666     fn is_http2(&self) -> bool {
667         match self.tx {
668             PoolTx::Http1(_) => false,
669             #[cfg(feature = "http2")]
670             PoolTx::Http2(_) => true,
671         }
672     }
673 
is_ready(&self) -> bool674     fn is_ready(&self) -> bool {
675         match self.tx {
676             PoolTx::Http1(ref tx) => tx.is_ready(),
677             #[cfg(feature = "http2")]
678             PoolTx::Http2(ref tx) => tx.is_ready(),
679         }
680     }
681 
is_closed(&self) -> bool682     fn is_closed(&self) -> bool {
683         match self.tx {
684             PoolTx::Http1(ref tx) => tx.is_closed(),
685             #[cfg(feature = "http2")]
686             PoolTx::Http2(ref tx) => tx.is_closed(),
687         }
688     }
689 }
690 
691 impl<B: HttpBody + 'static> PoolClient<B> {
send_request_retryable( &mut self, req: Request<B>, ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> where B: Send,692     fn send_request_retryable(
693         &mut self,
694         req: Request<B>,
695     ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
696     where
697         B: Send,
698     {
699         match self.tx {
700             #[cfg(not(feature = "http2"))]
701             PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req),
702             #[cfg(feature = "http2")]
703             PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request_retryable(req)),
704             #[cfg(feature = "http2")]
705             PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request_retryable(req)),
706         }
707     }
708 }
709 
710 impl<B> Poolable for PoolClient<B>
711 where
712     B: Send + 'static,
713 {
is_open(&self) -> bool714     fn is_open(&self) -> bool {
715         if self.conn_info.poisoned.poisoned() {
716             trace!(
717                 "marking {:?} as closed because it was poisoned",
718                 self.conn_info
719             );
720             return false;
721         }
722         match self.tx {
723             PoolTx::Http1(ref tx) => tx.is_ready(),
724             #[cfg(feature = "http2")]
725             PoolTx::Http2(ref tx) => tx.is_ready(),
726         }
727     }
728 
reserve(self) -> Reservation<Self>729     fn reserve(self) -> Reservation<Self> {
730         match self.tx {
731             PoolTx::Http1(tx) => Reservation::Unique(PoolClient {
732                 conn_info: self.conn_info,
733                 tx: PoolTx::Http1(tx),
734             }),
735             #[cfg(feature = "http2")]
736             PoolTx::Http2(tx) => {
737                 let b = PoolClient {
738                     conn_info: self.conn_info.clone(),
739                     tx: PoolTx::Http2(tx.clone()),
740                 };
741                 let a = PoolClient {
742                     conn_info: self.conn_info,
743                     tx: PoolTx::Http2(tx),
744                 };
745                 Reservation::Shared(a, b)
746             }
747         }
748     }
749 
can_share(&self) -> bool750     fn can_share(&self) -> bool {
751         self.is_http2()
752     }
753 }
754 
755 // ===== impl ClientError =====
756 
757 // FIXME: allow() required due to `impl Trait` leaking types to this lint
758 #[allow(missing_debug_implementations)]
759 enum ClientError<B> {
760     Normal(crate::Error),
761     Canceled {
762         connection_reused: bool,
763         req: Request<B>,
764         reason: crate::Error,
765     },
766 }
767 
768 impl<B> ClientError<B> {
map_with_reused(conn_reused: bool) -> impl Fn((crate::Error, Option<Request<B>>)) -> Self769     fn map_with_reused(conn_reused: bool) -> impl Fn((crate::Error, Option<Request<B>>)) -> Self {
770         move |(err, orig_req)| {
771             if let Some(req) = orig_req {
772                 ClientError::Canceled {
773                     connection_reused: conn_reused,
774                     reason: err,
775                     req,
776                 }
777             } else {
778                 ClientError::Normal(err)
779             }
780         }
781     }
782 }
783 
784 enum ClientConnectError {
785     Normal(crate::Error),
786     H2CheckoutIsClosed(crate::Error),
787 }
788 
789 /// A marker to identify what version a pooled connection is.
790 #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
791 pub(super) enum Ver {
792     Auto,
793     Http2,
794 }
795 
origin_form(uri: &mut Uri)796 fn origin_form(uri: &mut Uri) {
797     let path = match uri.path_and_query() {
798         Some(path) if path.as_str() != "/" => {
799             let mut parts = ::http::uri::Parts::default();
800             parts.path_and_query = Some(path.clone());
801             Uri::from_parts(parts).expect("path is valid uri")
802         }
803         _none_or_just_slash => {
804             debug_assert!(Uri::default() == "/");
805             Uri::default()
806         }
807     };
808     *uri = path
809 }
810 
absolute_form(uri: &mut Uri)811 fn absolute_form(uri: &mut Uri) {
812     debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
813     debug_assert!(
814         uri.authority().is_some(),
815         "absolute_form needs an authority"
816     );
817     // If the URI is to HTTPS, and the connector claimed to be a proxy,
818     // then it *should* have tunneled, and so we don't want to send
819     // absolute-form in that case.
820     if uri.scheme() == Some(&Scheme::HTTPS) {
821         origin_form(uri);
822     }
823 }
824 
authority_form(uri: &mut Uri)825 fn authority_form(uri: &mut Uri) {
826     if let Some(path) = uri.path_and_query() {
827         // `https://hyper.rs` would parse with `/` path, don't
828         // annoy people about that...
829         if path != "/" {
830             warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
831         }
832     }
833     *uri = match uri.authority() {
834         Some(auth) => {
835             let mut parts = ::http::uri::Parts::default();
836             parts.authority = Some(auth.clone());
837             Uri::from_parts(parts).expect("authority is valid")
838         }
839         None => {
840             unreachable!("authority_form with relative uri");
841         }
842     };
843 }
844 
extract_domain(uri: &mut Uri, is_http_connect: bool) -> crate::Result<PoolKey>845 fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> crate::Result<PoolKey> {
846     let uri_clone = uri.clone();
847     match (uri_clone.scheme(), uri_clone.authority()) {
848         (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
849         (None, Some(auth)) if is_http_connect => {
850             let scheme = match auth.port_u16() {
851                 Some(443) => {
852                     set_scheme(uri, Scheme::HTTPS);
853                     Scheme::HTTPS
854                 }
855                 _ => {
856                     set_scheme(uri, Scheme::HTTP);
857                     Scheme::HTTP
858                 }
859             };
860             Ok((scheme, auth.clone()))
861         }
862         _ => {
863             debug!("Client requires absolute-form URIs, received: {:?}", uri);
864             Err(crate::Error::new_user_absolute_uri_required())
865         }
866     }
867 }
868 
domain_as_uri((scheme, auth): PoolKey) -> Uri869 fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
870     http::uri::Builder::new()
871         .scheme(scheme)
872         .authority(auth)
873         .path_and_query("/")
874         .build()
875         .expect("domain is valid Uri")
876 }
877 
set_scheme(uri: &mut Uri, scheme: Scheme)878 fn set_scheme(uri: &mut Uri, scheme: Scheme) {
879     debug_assert!(
880         uri.scheme().is_none(),
881         "set_scheme expects no existing scheme"
882     );
883     let old = mem::replace(uri, Uri::default());
884     let mut parts: ::http::uri::Parts = old.into();
885     parts.scheme = Some(scheme);
886     parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
887     *uri = Uri::from_parts(parts).expect("scheme is valid");
888 }
889 
get_non_default_port(uri: &Uri) -> Option<Port<&str>>890 fn get_non_default_port(uri: &Uri) -> Option<Port<&str>> {
891     match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
892         (Some(443), true) => None,
893         (Some(80), false) => None,
894         _ => uri.port(),
895     }
896 }
897 
is_schema_secure(uri: &Uri) -> bool898 fn is_schema_secure(uri: &Uri) -> bool {
899     uri.scheme_str()
900         .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
901         .unwrap_or_default()
902 }
903 
904 /// A builder to configure a new [`Client`](Client).
905 ///
906 /// # Example
907 ///
908 /// ```
909 /// # #[cfg(feature  = "runtime")]
910 /// # fn run () {
911 /// use std::time::Duration;
912 /// use hyper::Client;
913 ///
914 /// let client = Client::builder()
915 ///     .pool_idle_timeout(Duration::from_secs(30))
916 ///     .http2_only(true)
917 ///     .build_http();
918 /// # let infer: Client<_, hyper::Body> = client;
919 /// # drop(infer);
920 /// # }
921 /// # fn main() {}
922 /// ```
923 #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
924 #[derive(Clone)]
925 pub struct Builder {
926     client_config: Config,
927     #[cfg_attr(feature = "deprecated", allow(deprecated))]
928     conn_builder: conn::Builder,
929     pool_config: pool::Config,
930 }
931 
932 impl Default for Builder {
default() -> Self933     fn default() -> Self {
934         Self {
935             client_config: Config {
936                 retry_canceled_requests: true,
937                 set_host: true,
938                 ver: Ver::Auto,
939             },
940             #[cfg_attr(feature = "deprecated", allow(deprecated))]
941             conn_builder: conn::Builder::new(),
942             pool_config: pool::Config {
943                 idle_timeout: Some(Duration::from_secs(90)),
944                 max_idle_per_host: std::usize::MAX,
945             },
946         }
947     }
948 }
949 
950 impl Builder {
951     #[doc(hidden)]
952     #[deprecated(
953         note = "name is confusing, to disable the connection pool, call pool_max_idle_per_host(0)"
954     )]
keep_alive(&mut self, val: bool) -> &mut Self955     pub fn keep_alive(&mut self, val: bool) -> &mut Self {
956         if !val {
957             // disable
958             self.pool_max_idle_per_host(0)
959         } else if self.pool_config.max_idle_per_host == 0 {
960             // enable
961             self.pool_max_idle_per_host(std::usize::MAX)
962         } else {
963             // already enabled
964             self
965         }
966     }
967 
968     #[doc(hidden)]
969     #[deprecated(note = "renamed to `pool_idle_timeout`")]
keep_alive_timeout<D>(&mut self, val: D) -> &mut Self where D: Into<Option<Duration>>,970     pub fn keep_alive_timeout<D>(&mut self, val: D) -> &mut Self
971     where
972         D: Into<Option<Duration>>,
973     {
974         self.pool_idle_timeout(val)
975     }
976 
977     /// Set an optional timeout for idle sockets being kept-alive.
978     ///
979     /// Pass `None` to disable timeout.
980     ///
981     /// Default is 90 seconds.
pool_idle_timeout<D>(&mut self, val: D) -> &mut Self where D: Into<Option<Duration>>,982     pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
983     where
984         D: Into<Option<Duration>>,
985     {
986         self.pool_config.idle_timeout = val.into();
987         self
988     }
989 
990     #[doc(hidden)]
991     #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
max_idle_per_host(&mut self, max_idle: usize) -> &mut Self992     pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
993         self.pool_config.max_idle_per_host = max_idle;
994         self
995     }
996 
997     /// Sets the maximum idle connection per host allowed in the pool.
998     ///
999     /// Default is `usize::MAX` (no limit).
pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self1000     pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1001         self.pool_config.max_idle_per_host = max_idle;
1002         self
1003     }
1004 
1005     // HTTP/1 options
1006 
1007     /// Sets the exact size of the read buffer to *always* use.
1008     ///
1009     /// Note that setting this option unsets the `http1_max_buf_size` option.
1010     ///
1011     /// Default is an adaptive read buffer.
http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self1012     pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
1013         self.conn_builder.http1_read_buf_exact_size(Some(sz));
1014         self
1015     }
1016 
1017     /// Set the maximum buffer size for the connection.
1018     ///
1019     /// Default is ~400kb.
1020     ///
1021     /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
1022     ///
1023     /// # Panics
1024     ///
1025     /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
1026     #[cfg(feature = "http1")]
1027     #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
http1_max_buf_size(&mut self, max: usize) -> &mut Self1028     pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
1029         self.conn_builder.http1_max_buf_size(max);
1030         self
1031     }
1032 
1033     /// Set whether HTTP/1 connections will accept spaces between header names
1034     /// and the colon that follow them in responses.
1035     ///
1036     /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
1037     /// parsing.
1038     ///
1039     /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1040     /// to say about it:
1041     ///
1042     /// > No whitespace is allowed between the header field-name and colon. In
1043     /// > the past, differences in the handling of such whitespace have led to
1044     /// > security vulnerabilities in request routing and response handling. A
1045     /// > server MUST reject any received request message that contains
1046     /// > whitespace between a header field-name and colon with a response code
1047     /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
1048     /// > response message before forwarding the message downstream.
1049     ///
1050     /// Note that this setting does not affect HTTP/2.
1051     ///
1052     /// Default is false.
1053     ///
1054     /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self1055     pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1056         self.conn_builder
1057             .http1_allow_spaces_after_header_name_in_responses(val);
1058         self
1059     }
1060 
1061     /// Set whether HTTP/1 connections will accept obsolete line folding for
1062     /// header values.
1063     ///
1064     /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1065     /// to say about it:
1066     ///
1067     /// > A server that receives an obs-fold in a request message that is not
1068     /// > within a message/http container MUST either reject the message by
1069     /// > sending a 400 (Bad Request), preferably with a representation
1070     /// > explaining that obsolete line folding is unacceptable, or replace
1071     /// > each received obs-fold with one or more SP octets prior to
1072     /// > interpreting the field value or forwarding the message downstream.
1073     ///
1074     /// > A proxy or gateway that receives an obs-fold in a response message
1075     /// > that is not within a message/http container MUST either discard the
1076     /// > message and replace it with a 502 (Bad Gateway) response, preferably
1077     /// > with a representation explaining that unacceptable line folding was
1078     /// > received, or replace each received obs-fold with one or more SP
1079     /// > octets prior to interpreting the field value or forwarding the
1080     /// > message downstream.
1081     ///
1082     /// > A user agent that receives an obs-fold in a response message that is
1083     /// > not within a message/http container MUST replace each received
1084     /// > obs-fold with one or more SP octets prior to interpreting the field
1085     /// > value.
1086     ///
1087     /// Note that this setting does not affect HTTP/2.
1088     ///
1089     /// Default is false.
1090     ///
1091     /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self1092     pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1093         self.conn_builder
1094             .http1_allow_obsolete_multiline_headers_in_responses(val);
1095         self
1096     }
1097 
1098     /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
1099     ///
1100     /// This mimicks the behaviour of major browsers. You probably don't want this.
1101     /// You should only want this if you are implementing a proxy whose main
1102     /// purpose is to sit in front of browsers whose users access arbitrary content
1103     /// which may be malformed, and they expect everything that works without
1104     /// the proxy to keep working with the proxy.
1105     ///
1106     /// This option will prevent Hyper's client from returning an error encountered
1107     /// when parsing a header, except if the error was caused by the character NUL
1108     /// (ASCII code 0), as Chrome specifically always reject those.
1109     ///
1110     /// The ignorable errors are:
1111     /// * empty header names;
1112     /// * characters that are not allowed in header names, except for `\0` and `\r`;
1113     /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
1114     ///   spaces and tabs between the header name and the colon;
1115     /// * missing colon between header name and colon;
1116     /// * characters that are not allowed in header values except for `\0` and `\r`.
1117     ///
1118     /// If an ignorable error is encountered, the parser tries to find the next
1119     /// line in the input to resume parsing the rest of the headers. An error
1120     /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
1121     /// looking for the next line.
http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder1122     pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
1123         self.conn_builder
1124             .http1_ignore_invalid_headers_in_responses(val);
1125         self
1126     }
1127 
1128     /// Set whether HTTP/1 connections should try to use vectored writes,
1129     /// or always flatten into a single buffer.
1130     ///
1131     /// Note that setting this to false may mean more copies of body data,
1132     /// but may also improve performance when an IO transport doesn't
1133     /// support vectored writes well, such as most TLS implementations.
1134     ///
1135     /// Setting this to true will force hyper to use queued strategy
1136     /// which may eliminate unnecessary cloning on some TLS backends
1137     ///
1138     /// Default is `auto`. In this mode hyper will try to guess which
1139     /// mode to use
http1_writev(&mut self, enabled: bool) -> &mut Builder1140     pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1141         self.conn_builder.http1_writev(enabled);
1142         self
1143     }
1144 
1145     /// Set whether HTTP/1 connections will write header names as title case at
1146     /// the socket level.
1147     ///
1148     /// Note that this setting does not affect HTTP/2.
1149     ///
1150     /// Default is false.
http1_title_case_headers(&mut self, val: bool) -> &mut Self1151     pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1152         self.conn_builder.http1_title_case_headers(val);
1153         self
1154     }
1155 
1156     /// Set whether to support preserving original header cases.
1157     ///
1158     /// Currently, this will record the original cases received, and store them
1159     /// in a private extension on the `Response`. It will also look for and use
1160     /// such an extension in any provided `Request`.
1161     ///
1162     /// Since the relevant extension is still private, there is no way to
1163     /// interact with the original cases. The only effect this can have now is
1164     /// to forward the cases in a proxy-like fashion.
1165     ///
1166     /// Note that this setting does not affect HTTP/2.
1167     ///
1168     /// Default is false.
http1_preserve_header_case(&mut self, val: bool) -> &mut Self1169     pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1170         self.conn_builder.http1_preserve_header_case(val);
1171         self
1172     }
1173 
1174     /// Set whether HTTP/0.9 responses should be tolerated.
1175     ///
1176     /// Default is false.
http09_responses(&mut self, val: bool) -> &mut Self1177     pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1178         self.conn_builder.http09_responses(val);
1179         self
1180     }
1181 
1182     /// Set whether the connection **must** use HTTP/2.
1183     ///
1184     /// The destination must either allow HTTP2 Prior Knowledge, or the
1185     /// `Connect` should be configured to do use ALPN to upgrade to `h2`
1186     /// as part of the connection process. This will not make the `Client`
1187     /// utilize ALPN by itself.
1188     ///
1189     /// Note that setting this to true prevents HTTP/1 from being allowed.
1190     ///
1191     /// Default is false.
1192     #[cfg(feature = "http2")]
1193     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_only(&mut self, val: bool) -> &mut Self1194     pub fn http2_only(&mut self, val: bool) -> &mut Self {
1195         self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1196         self
1197     }
1198 
1199     /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
1200     /// stream-level flow control.
1201     ///
1202     /// Passing `None` will do nothing.
1203     ///
1204     /// If not set, hyper will use a default.
1205     ///
1206     /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
1207     #[cfg(feature = "http2")]
1208     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self1209     pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1210         self.conn_builder
1211             .http2_initial_stream_window_size(sz.into());
1212         self
1213     }
1214 
1215     /// Sets the max connection-level flow control for HTTP2
1216     ///
1217     /// Passing `None` will do nothing.
1218     ///
1219     /// If not set, hyper will use a default.
1220     #[cfg(feature = "http2")]
1221     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_initial_connection_window_size( &mut self, sz: impl Into<Option<u32>>, ) -> &mut Self1222     pub fn http2_initial_connection_window_size(
1223         &mut self,
1224         sz: impl Into<Option<u32>>,
1225     ) -> &mut Self {
1226         self.conn_builder
1227             .http2_initial_connection_window_size(sz.into());
1228         self
1229     }
1230 
1231     /// Sets whether to use an adaptive flow control.
1232     ///
1233     /// Enabling this will override the limits set in
1234     /// `http2_initial_stream_window_size` and
1235     /// `http2_initial_connection_window_size`.
1236     #[cfg(feature = "http2")]
1237     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_adaptive_window(&mut self, enabled: bool) -> &mut Self1238     pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1239         self.conn_builder.http2_adaptive_window(enabled);
1240         self
1241     }
1242 
1243     /// Sets the maximum frame size to use for HTTP2.
1244     ///
1245     /// Passing `None` will do nothing.
1246     ///
1247     /// If not set, hyper will use a default.
1248     #[cfg(feature = "http2")]
1249     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self1250     pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1251         self.conn_builder.http2_max_frame_size(sz);
1252         self
1253     }
1254 
1255     /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1256     /// connection alive.
1257     ///
1258     /// Pass `None` to disable HTTP2 keep-alive.
1259     ///
1260     /// Default is currently disabled.
1261     ///
1262     /// # Cargo Feature
1263     ///
1264     /// Requires the `runtime` cargo feature to be enabled.
1265     #[cfg(feature = "runtime")]
1266     #[cfg(feature = "http2")]
1267     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_keep_alive_interval( &mut self, interval: impl Into<Option<Duration>>, ) -> &mut Self1268     pub fn http2_keep_alive_interval(
1269         &mut self,
1270         interval: impl Into<Option<Duration>>,
1271     ) -> &mut Self {
1272         self.conn_builder.http2_keep_alive_interval(interval);
1273         self
1274     }
1275 
1276     /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1277     ///
1278     /// If the ping is not acknowledged within the timeout, the connection will
1279     /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1280     ///
1281     /// Default is 20 seconds.
1282     ///
1283     /// # Cargo Feature
1284     ///
1285     /// Requires the `runtime` cargo feature to be enabled.
1286     #[cfg(feature = "runtime")]
1287     #[cfg(feature = "http2")]
1288     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self1289     pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1290         self.conn_builder.http2_keep_alive_timeout(timeout);
1291         self
1292     }
1293 
1294     /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
1295     ///
1296     /// If disabled, keep-alive pings are only sent while there are open
1297     /// request/responses streams. If enabled, pings are also sent when no
1298     /// streams are active. Does nothing if `http2_keep_alive_interval` is
1299     /// disabled.
1300     ///
1301     /// Default is `false`.
1302     ///
1303     /// # Cargo Feature
1304     ///
1305     /// Requires the `runtime` cargo feature to be enabled.
1306     #[cfg(feature = "runtime")]
1307     #[cfg(feature = "http2")]
1308     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self1309     pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1310         self.conn_builder.http2_keep_alive_while_idle(enabled);
1311         self
1312     }
1313 
1314     /// Sets the maximum number of HTTP2 concurrent locally reset streams.
1315     ///
1316     /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
1317     /// details.
1318     ///
1319     /// The default value is determined by the `h2` crate.
1320     ///
1321     /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
1322     #[cfg(feature = "http2")]
1323     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self1324     pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1325         self.conn_builder.http2_max_concurrent_reset_streams(max);
1326         self
1327     }
1328 
1329     /// Set the maximum write buffer size for each HTTP/2 stream.
1330     ///
1331     /// Default is currently 1MB, but may change.
1332     ///
1333     /// # Panics
1334     ///
1335     /// The value must be no larger than `u32::MAX`.
1336     #[cfg(feature = "http2")]
1337     #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
http2_max_send_buf_size(&mut self, max: usize) -> &mut Self1338     pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1339         self.conn_builder.http2_max_send_buf_size(max);
1340         self
1341     }
1342 
1343     /// Set whether to retry requests that get disrupted before ever starting
1344     /// to write.
1345     ///
1346     /// This means a request that is queued, and gets given an idle, reused
1347     /// connection, and then encounters an error immediately as the idle
1348     /// connection was found to be unusable.
1349     ///
1350     /// When this is set to `false`, the related `ResponseFuture` would instead
1351     /// resolve to an `Error::Cancel`.
1352     ///
1353     /// Default is `true`.
1354     #[inline]
retry_canceled_requests(&mut self, val: bool) -> &mut Self1355     pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1356         self.client_config.retry_canceled_requests = val;
1357         self
1358     }
1359 
1360     /// Set whether to automatically add the `Host` header to requests.
1361     ///
1362     /// If true, and a request does not include a `Host` header, one will be
1363     /// added automatically, derived from the authority of the `Uri`.
1364     ///
1365     /// Default is `true`.
1366     #[inline]
set_host(&mut self, val: bool) -> &mut Self1367     pub fn set_host(&mut self, val: bool) -> &mut Self {
1368         self.client_config.set_host = val;
1369         self
1370     }
1371 
1372     /// Provide an executor to execute background `Connection` tasks.
executor<E>(&mut self, exec: E) -> &mut Self where E: Executor<BoxSendFuture> + Send + Sync + 'static,1373     pub fn executor<E>(&mut self, exec: E) -> &mut Self
1374     where
1375         E: Executor<BoxSendFuture> + Send + Sync + 'static,
1376     {
1377         self.conn_builder.executor(exec);
1378         self
1379     }
1380 
1381     /// Builder a client with this configuration and the default `HttpConnector`.
1382     #[cfg(feature = "tcp")]
build_http<B>(&self) -> Client<HttpConnector, B> where B: HttpBody + Send, B::Data: Send,1383     pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1384     where
1385         B: HttpBody + Send,
1386         B::Data: Send,
1387     {
1388         let mut connector = HttpConnector::new();
1389         if self.pool_config.is_enabled() {
1390             connector.set_keepalive(self.pool_config.idle_timeout);
1391         }
1392         self.build(connector)
1393     }
1394 
1395     /// Combine the configuration of this builder with a connector to create a `Client`.
build<C, B>(&self, connector: C) -> Client<C, B> where C: Connect + Clone, B: HttpBody + Send, B::Data: Send,1396     pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1397     where
1398         C: Connect + Clone,
1399         B: HttpBody + Send,
1400         B::Data: Send,
1401     {
1402         #[cfg_attr(feature = "deprecated", allow(deprecated))]
1403         Client {
1404             config: self.client_config,
1405             conn_builder: self.conn_builder.clone(),
1406             connector,
1407             pool: Pool::new(self.pool_config, &self.conn_builder.exec),
1408         }
1409     }
1410 }
1411 
1412 impl fmt::Debug for Builder {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1413     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1414         f.debug_struct("Builder")
1415             .field("client_config", &self.client_config)
1416             .field("conn_builder", &self.conn_builder)
1417             .field("pool_config", &self.pool_config)
1418             .finish()
1419     }
1420 }
1421 
1422 #[cfg(test)]
1423 mod unit_tests {
1424     use super::*;
1425 
1426     #[test]
response_future_is_sync()1427     fn response_future_is_sync() {
1428         fn assert_sync<T: Sync>() {}
1429         assert_sync::<ResponseFuture>();
1430     }
1431 
1432     #[test]
set_relative_uri_with_implicit_path()1433     fn set_relative_uri_with_implicit_path() {
1434         let mut uri = "http://hyper.rs".parse().unwrap();
1435         origin_form(&mut uri);
1436         assert_eq!(uri.to_string(), "/");
1437     }
1438 
1439     #[test]
test_origin_form()1440     fn test_origin_form() {
1441         let mut uri = "http://hyper.rs/guides".parse().unwrap();
1442         origin_form(&mut uri);
1443         assert_eq!(uri.to_string(), "/guides");
1444 
1445         let mut uri = "http://hyper.rs/guides?foo=bar".parse().unwrap();
1446         origin_form(&mut uri);
1447         assert_eq!(uri.to_string(), "/guides?foo=bar");
1448     }
1449 
1450     #[test]
test_absolute_form()1451     fn test_absolute_form() {
1452         let mut uri = "http://hyper.rs/guides".parse().unwrap();
1453         absolute_form(&mut uri);
1454         assert_eq!(uri.to_string(), "http://hyper.rs/guides");
1455 
1456         let mut uri = "https://hyper.rs/guides".parse().unwrap();
1457         absolute_form(&mut uri);
1458         assert_eq!(uri.to_string(), "/guides");
1459     }
1460 
1461     #[test]
test_authority_form()1462     fn test_authority_form() {
1463         let _ = pretty_env_logger::try_init();
1464 
1465         let mut uri = "http://hyper.rs".parse().unwrap();
1466         authority_form(&mut uri);
1467         assert_eq!(uri.to_string(), "hyper.rs");
1468 
1469         let mut uri = "hyper.rs".parse().unwrap();
1470         authority_form(&mut uri);
1471         assert_eq!(uri.to_string(), "hyper.rs");
1472     }
1473 
1474     #[test]
test_extract_domain_connect_no_port()1475     fn test_extract_domain_connect_no_port() {
1476         let mut uri = "hyper.rs".parse().unwrap();
1477         let (scheme, host) = extract_domain(&mut uri, true).expect("extract domain");
1478         assert_eq!(scheme, *"http");
1479         assert_eq!(host, "hyper.rs");
1480     }
1481 
1482     #[test]
test_is_secure()1483     fn test_is_secure() {
1484         assert_eq!(
1485             is_schema_secure(&"http://hyper.rs".parse::<Uri>().unwrap()),
1486             false
1487         );
1488         assert_eq!(is_schema_secure(&"hyper.rs".parse::<Uri>().unwrap()), false);
1489         assert_eq!(
1490             is_schema_secure(&"wss://hyper.rs".parse::<Uri>().unwrap()),
1491             true
1492         );
1493         assert_eq!(
1494             is_schema_secure(&"ws://hyper.rs".parse::<Uri>().unwrap()),
1495             false
1496         );
1497     }
1498 
1499     #[test]
test_get_non_default_port()1500     fn test_get_non_default_port() {
1501         assert!(get_non_default_port(&"http://hyper.rs".parse::<Uri>().unwrap()).is_none());
1502         assert!(get_non_default_port(&"http://hyper.rs:80".parse::<Uri>().unwrap()).is_none());
1503         assert!(get_non_default_port(&"https://hyper.rs:443".parse::<Uri>().unwrap()).is_none());
1504         assert!(get_non_default_port(&"hyper.rs:80".parse::<Uri>().unwrap()).is_none());
1505 
1506         assert_eq!(
1507             get_non_default_port(&"http://hyper.rs:123".parse::<Uri>().unwrap())
1508                 .unwrap()
1509                 .as_u16(),
1510             123
1511         );
1512         assert_eq!(
1513             get_non_default_port(&"https://hyper.rs:80".parse::<Uri>().unwrap())
1514                 .unwrap()
1515                 .as_u16(),
1516             80
1517         );
1518         assert_eq!(
1519             get_non_default_port(&"hyper.rs:123".parse::<Uri>().unwrap())
1520                 .unwrap()
1521                 .as_u16(),
1522             123
1523         );
1524     }
1525 }
1526