1 //! Connectors used by the `Client`.
2 //!
3 //! This module contains:
4 //!
5 //! - A default [`HttpConnector`][] that does DNS resolution and establishes
6 //!   connections over TCP.
7 //! - Types to build custom connectors.
8 //!
9 //! # Connectors
10 //!
11 //! A "connector" is a [`Service`][] that takes a [`Uri`][] destination, and
12 //! its `Response` is some type implementing [`AsyncRead`][], [`AsyncWrite`][],
13 //! and [`Connection`][].
14 //!
15 //! ## Custom Connectors
16 //!
17 //! A simple connector that ignores the `Uri` destination and always returns
18 //! a TCP connection to the same address could be written like this:
19 //!
20 //! ```rust,ignore
21 //! let connector = tower::service_fn(|_dst| async {
22 //!     tokio::net::TcpStream::connect("127.0.0.1:1337")
23 //! })
24 //! ```
25 //!
26 //! Or, fully written out:
27 //!
28 //! ```
29 //! # #[cfg(feature = "runtime")]
30 //! # mod rt {
31 //! use std::{future::Future, net::SocketAddr, pin::Pin, task::{self, Poll}};
32 //! use hyper::{service::Service, Uri};
33 //! use tokio::net::TcpStream;
34 //!
35 //! #[derive(Clone)]
36 //! struct LocalConnector;
37 //!
38 //! impl Service<Uri> for LocalConnector {
39 //!     type Response = TcpStream;
40 //!     type Error = std::io::Error;
41 //!     // We can't "name" an `async` generated future.
42 //!     type Future = Pin<Box<
43 //!         dyn Future<Output = Result<Self::Response, Self::Error>> + Send
44 //!     >>;
45 //!
46 //!     fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
47 //!         // This connector is always ready, but others might not be.
48 //!         Poll::Ready(Ok(()))
49 //!     }
50 //!
51 //!     fn call(&mut self, _: Uri) -> Self::Future {
52 //!         Box::pin(TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], 1337))))
53 //!     }
54 //! }
55 //! # }
56 //! ```
57 //!
58 //! It's worth noting that for `TcpStream`s, the [`HttpConnector`][] is a
59 //! better starting place to extend from.
60 //!
61 //! Using either of the above connector examples, it can be used with the
62 //! `Client` like this:
63 //!
64 //! ```
65 //! # #[cfg(feature = "runtime")]
66 //! # fn rt () {
67 //! # let connector = hyper::client::HttpConnector::new();
68 //! // let connector = ...
69 //!
70 //! let client = hyper::Client::builder()
71 //!     .build::<_, hyper::Body>(connector);
72 //! # }
73 //! ```
74 //!
75 //!
76 //! [`HttpConnector`]: HttpConnector
77 //! [`Service`]: crate::service::Service
78 //! [`Uri`]: ::http::Uri
79 //! [`AsyncRead`]: tokio::io::AsyncRead
80 //! [`AsyncWrite`]: tokio::io::AsyncWrite
81 //! [`Connection`]: Connection
82 use std::fmt;
83 use std::fmt::{Debug, Formatter};
84 use std::ops::Deref;
85 use std::sync::atomic::{AtomicBool, Ordering};
86 use std::sync::Arc;
87 
88 use ::http::Extensions;
89 use tokio::sync::watch;
90 
91 cfg_feature! {
92     #![feature = "tcp"]
93 
94     pub use self::http::{HttpConnector, HttpInfo};
95 
96     pub mod dns;
97     mod http;
98 }
99 
100 cfg_feature! {
101     #![any(feature = "http1", feature = "http2")]
102 
103     pub use self::sealed::Connect;
104 }
105 
106 /// Describes a type returned by a connector.
107 pub trait Connection {
108     /// Return metadata describing the connection.
connected(&self) -> Connected109     fn connected(&self) -> Connected;
110 }
111 
112 /// Extra information about the connected transport.
113 ///
114 /// This can be used to inform recipients about things like if ALPN
115 /// was used, or if connected to an HTTP proxy.
116 #[derive(Debug)]
117 pub struct Connected {
118     pub(super) alpn: Alpn,
119     pub(super) is_proxied: bool,
120     pub(super) extra: Option<Extra>,
121     pub(super) poisoned: PoisonPill,
122 }
123 
124 #[derive(Clone)]
125 pub(crate) struct PoisonPill {
126     poisoned: Arc<AtomicBool>,
127 }
128 
129 impl Debug for PoisonPill {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result130     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
131         // print the address of the pill—this makes debugging issues much easier
132         write!(
133             f,
134             "PoisonPill@{:p} {{ poisoned: {} }}",
135             self.poisoned,
136             self.poisoned.load(Ordering::Relaxed)
137         )
138     }
139 }
140 
141 impl PoisonPill {
healthy() -> Self142     pub(crate) fn healthy() -> Self {
143         Self {
144             poisoned: Arc::new(AtomicBool::new(false)),
145         }
146     }
poison(&self)147     pub(crate) fn poison(&self) {
148         self.poisoned.store(true, Ordering::Relaxed)
149     }
150 
poisoned(&self) -> bool151     pub(crate) fn poisoned(&self) -> bool {
152         self.poisoned.load(Ordering::Relaxed)
153     }
154 }
155 
156 /// [`CaptureConnection`] allows callers to capture [`Connected`] information
157 ///
158 /// To capture a connection for a request, use [`capture_connection`].
159 #[derive(Debug, Clone)]
160 pub struct CaptureConnection {
161     rx: watch::Receiver<Option<Connected>>,
162 }
163 
164 /// Capture the connection for a given request
165 ///
166 /// When making a request with Hyper, the underlying connection must implement the [`Connection`] trait.
167 /// [`capture_connection`] allows a caller to capture the returned [`Connected`] structure as soon
168 /// as the connection is established.
169 ///
170 /// *Note*: If establishing a connection fails, [`CaptureConnection::connection_metadata`] will always return none.
171 ///
172 /// # Examples
173 ///
174 /// **Synchronous access**:
175 /// The [`CaptureConnection::connection_metadata`] method allows callers to check if a connection has been
176 /// established. This is ideal for situations where you are certain the connection has already
177 /// been established (e.g. after the response future has already completed).
178 /// ```rust
179 /// use hyper::client::connect::{capture_connection, CaptureConnection};
180 /// let mut request = http::Request::builder()
181 ///   .uri("http://foo.com")
182 ///   .body(())
183 ///   .unwrap();
184 ///
185 /// let captured_connection = capture_connection(&mut request);
186 /// // some time later after the request has been sent...
187 /// let connection_info = captured_connection.connection_metadata();
188 /// println!("we are connected! {:?}", connection_info.as_ref());
189 /// ```
190 ///
191 /// **Asynchronous access**:
192 /// The [`CaptureConnection::wait_for_connection_metadata`] method returns a future resolves as soon as the
193 /// connection is available.
194 ///
195 /// ```rust
196 /// # #[cfg(feature  = "runtime")]
197 /// # async fn example() {
198 /// use hyper::client::connect::{capture_connection, CaptureConnection};
199 /// let mut request = http::Request::builder()
200 ///   .uri("http://foo.com")
201 ///   .body(hyper::Body::empty())
202 ///   .unwrap();
203 ///
204 /// let mut captured = capture_connection(&mut request);
205 /// tokio::task::spawn(async move {
206 ///     let connection_info = captured.wait_for_connection_metadata().await;
207 ///     println!("we are connected! {:?}", connection_info.as_ref());
208 /// });
209 ///
210 /// let client = hyper::Client::new();
211 /// client.request(request).await.expect("request failed");
212 /// # }
213 /// ```
capture_connection<B>(request: &mut crate::http::Request<B>) -> CaptureConnection214 pub fn capture_connection<B>(request: &mut crate::http::Request<B>) -> CaptureConnection {
215     let (tx, rx) = CaptureConnection::new();
216     request.extensions_mut().insert(tx);
217     rx
218 }
219 
220 /// TxSide for [`CaptureConnection`]
221 ///
222 /// This is inserted into `Extensions` to allow Hyper to back channel connection info
223 #[derive(Clone)]
224 pub(crate) struct CaptureConnectionExtension {
225     tx: Arc<watch::Sender<Option<Connected>>>,
226 }
227 
228 impl CaptureConnectionExtension {
set(&self, connected: &Connected)229     pub(crate) fn set(&self, connected: &Connected) {
230         self.tx.send_replace(Some(connected.clone()));
231     }
232 }
233 
234 impl CaptureConnection {
235     /// Internal API to create the tx and rx half of [`CaptureConnection`]
new() -> (CaptureConnectionExtension, Self)236     pub(crate) fn new() -> (CaptureConnectionExtension, Self) {
237         let (tx, rx) = watch::channel(None);
238         (
239             CaptureConnectionExtension { tx: Arc::new(tx) },
240             CaptureConnection { rx },
241         )
242     }
243 
244     /// Retrieve the connection metadata, if available
connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_245     pub fn connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_ {
246         self.rx.borrow()
247     }
248 
249     /// Wait for the connection to be established
250     ///
251     /// If a connection was established, this will always return `Some(...)`. If the request never
252     /// successfully connected (e.g. DNS resolution failure), this method will never return.
wait_for_connection_metadata( &mut self, ) -> impl Deref<Target = Option<Connected>> + '_253     pub async fn wait_for_connection_metadata(
254         &mut self,
255     ) -> impl Deref<Target = Option<Connected>> + '_ {
256         if self.rx.borrow().is_some() {
257             return self.rx.borrow();
258         }
259         let _ = self.rx.changed().await;
260         self.rx.borrow()
261     }
262 }
263 
264 pub(super) struct Extra(Box<dyn ExtraInner>);
265 
266 #[derive(Clone, Copy, Debug, PartialEq)]
267 pub(super) enum Alpn {
268     H2,
269     None,
270 }
271 
272 impl Connected {
273     /// Create new `Connected` type with empty metadata.
new() -> Connected274     pub fn new() -> Connected {
275         Connected {
276             alpn: Alpn::None,
277             is_proxied: false,
278             extra: None,
279             poisoned: PoisonPill::healthy(),
280         }
281     }
282 
283     /// Set whether the connected transport is to an HTTP proxy.
284     ///
285     /// This setting will affect if HTTP/1 requests written on the transport
286     /// will have the request-target in absolute-form or origin-form:
287     ///
288     /// - When `proxy(false)`:
289     ///
290     /// ```http
291     /// GET /guide HTTP/1.1
292     /// ```
293     ///
294     /// - When `proxy(true)`:
295     ///
296     /// ```http
297     /// GET http://hyper.rs/guide HTTP/1.1
298     /// ```
299     ///
300     /// Default is `false`.
proxy(mut self, is_proxied: bool) -> Connected301     pub fn proxy(mut self, is_proxied: bool) -> Connected {
302         self.is_proxied = is_proxied;
303         self
304     }
305 
306     /// Determines if the connected transport is to an HTTP proxy.
is_proxied(&self) -> bool307     pub fn is_proxied(&self) -> bool {
308         self.is_proxied
309     }
310 
311     /// Set extra connection information to be set in the extensions of every `Response`.
extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected312     pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected {
313         if let Some(prev) = self.extra {
314             self.extra = Some(Extra(Box::new(ExtraChain(prev.0, extra))));
315         } else {
316             self.extra = Some(Extra(Box::new(ExtraEnvelope(extra))));
317         }
318         self
319     }
320 
321     /// Copies the extra connection information into an `Extensions` map.
get_extras(&self, extensions: &mut Extensions)322     pub fn get_extras(&self, extensions: &mut Extensions) {
323         if let Some(extra) = &self.extra {
324             extra.set(extensions);
325         }
326     }
327 
328     /// Set that the connected transport negotiated HTTP/2 as its next protocol.
negotiated_h2(mut self) -> Connected329     pub fn negotiated_h2(mut self) -> Connected {
330         self.alpn = Alpn::H2;
331         self
332     }
333 
334     /// Determines if the connected transport negotiated HTTP/2 as its next protocol.
is_negotiated_h2(&self) -> bool335     pub fn is_negotiated_h2(&self) -> bool {
336         self.alpn == Alpn::H2
337     }
338 
339     /// Poison this connection
340     ///
341     /// A poisoned connection will not be reused for subsequent requests by the pool
poison(&self)342     pub fn poison(&self) {
343         self.poisoned.poison();
344         tracing::debug!(
345             poison_pill = ?self.poisoned, "connection was poisoned"
346         );
347     }
348 
349     // Don't public expose that `Connected` is `Clone`, unsure if we want to
350     // keep that contract...
clone(&self) -> Connected351     pub(super) fn clone(&self) -> Connected {
352         Connected {
353             alpn: self.alpn.clone(),
354             is_proxied: self.is_proxied,
355             extra: self.extra.clone(),
356             poisoned: self.poisoned.clone(),
357         }
358     }
359 }
360 
361 // ===== impl Extra =====
362 
363 impl Extra {
set(&self, res: &mut Extensions)364     pub(super) fn set(&self, res: &mut Extensions) {
365         self.0.set(res);
366     }
367 }
368 
369 impl Clone for Extra {
clone(&self) -> Extra370     fn clone(&self) -> Extra {
371         Extra(self.0.clone_box())
372     }
373 }
374 
375 impl fmt::Debug for Extra {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result376     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377         f.debug_struct("Extra").finish()
378     }
379 }
380 
381 trait ExtraInner: Send + Sync {
clone_box(&self) -> Box<dyn ExtraInner>382     fn clone_box(&self) -> Box<dyn ExtraInner>;
set(&self, res: &mut Extensions)383     fn set(&self, res: &mut Extensions);
384 }
385 
386 // This indirection allows the `Connected` to have a type-erased "extra" value,
387 // while that type still knows its inner extra type. This allows the correct
388 // TypeId to be used when inserting into `res.extensions_mut()`.
389 #[derive(Clone)]
390 struct ExtraEnvelope<T>(T);
391 
392 impl<T> ExtraInner for ExtraEnvelope<T>
393 where
394     T: Clone + Send + Sync + 'static,
395 {
clone_box(&self) -> Box<dyn ExtraInner>396     fn clone_box(&self) -> Box<dyn ExtraInner> {
397         Box::new(self.clone())
398     }
399 
set(&self, res: &mut Extensions)400     fn set(&self, res: &mut Extensions) {
401         res.insert(self.0.clone());
402     }
403 }
404 
405 struct ExtraChain<T>(Box<dyn ExtraInner>, T);
406 
407 impl<T: Clone> Clone for ExtraChain<T> {
clone(&self) -> Self408     fn clone(&self) -> Self {
409         ExtraChain(self.0.clone_box(), self.1.clone())
410     }
411 }
412 
413 impl<T> ExtraInner for ExtraChain<T>
414 where
415     T: Clone + Send + Sync + 'static,
416 {
clone_box(&self) -> Box<dyn ExtraInner>417     fn clone_box(&self) -> Box<dyn ExtraInner> {
418         Box::new(self.clone())
419     }
420 
set(&self, res: &mut Extensions)421     fn set(&self, res: &mut Extensions) {
422         self.0.set(res);
423         res.insert(self.1.clone());
424     }
425 }
426 
427 #[cfg(any(feature = "http1", feature = "http2"))]
428 pub(super) mod sealed {
429     use std::error::Error as StdError;
430     use std::future::Future;
431     use std::marker::Unpin;
432 
433     use ::http::Uri;
434     use tokio::io::{AsyncRead, AsyncWrite};
435 
436     use super::Connection;
437 
438     /// Connect to a destination, returning an IO transport.
439     ///
440     /// A connector receives a [`Uri`](::http::Uri) and returns a `Future` of the
441     /// ready connection.
442     ///
443     /// # Trait Alias
444     ///
445     /// This is really just an *alias* for the `tower::Service` trait, with
446     /// additional bounds set for convenience *inside* hyper. You don't actually
447     /// implement this trait, but `tower::Service<Uri>` instead.
448     // The `Sized` bound is to prevent creating `dyn Connect`, since they cannot
449     // fit the `Connect` bounds because of the blanket impl for `Service`.
450     pub trait Connect: Sealed + Sized {
451         #[doc(hidden)]
452         type _Svc: ConnectSvc;
453         #[doc(hidden)]
connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future454         fn connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future;
455     }
456 
457     #[allow(unreachable_pub)]
458     pub trait ConnectSvc {
459         type Connection: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static;
460         type Error: Into<Box<dyn StdError + Send + Sync>>;
461         type Future: Future<Output = Result<Self::Connection, Self::Error>> + Unpin + Send + 'static;
462 
connect(self, internal_only: Internal, dst: Uri) -> Self::Future463         fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future;
464     }
465 
466     impl<S, T> Connect for S
467     where
468         S: tower_service::Service<Uri, Response = T> + Send + 'static,
469         S::Error: Into<Box<dyn StdError + Send + Sync>>,
470         S::Future: Unpin + Send,
471         T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
472     {
473         type _Svc = S;
474 
connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri>475         fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> {
476             crate::service::oneshot(self, dst)
477         }
478     }
479 
480     impl<S, T> ConnectSvc for S
481     where
482         S: tower_service::Service<Uri, Response = T> + Send + 'static,
483         S::Error: Into<Box<dyn StdError + Send + Sync>>,
484         S::Future: Unpin + Send,
485         T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
486     {
487         type Connection = T;
488         type Error = S::Error;
489         type Future = crate::service::Oneshot<S, Uri>;
490 
connect(self, _: Internal, dst: Uri) -> Self::Future491         fn connect(self, _: Internal, dst: Uri) -> Self::Future {
492             crate::service::oneshot(self, dst)
493         }
494     }
495 
496     impl<S, T> Sealed for S
497     where
498         S: tower_service::Service<Uri, Response = T> + Send,
499         S::Error: Into<Box<dyn StdError + Send + Sync>>,
500         S::Future: Unpin + Send,
501         T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
502     {
503     }
504 
505     pub trait Sealed {}
506     #[allow(missing_debug_implementations)]
507     pub struct Internal;
508 }
509 
510 #[cfg(test)]
511 mod tests {
512     use super::Connected;
513     use crate::client::connect::CaptureConnection;
514 
515     #[derive(Clone, Debug, PartialEq)]
516     struct Ex1(usize);
517 
518     #[derive(Clone, Debug, PartialEq)]
519     struct Ex2(&'static str);
520 
521     #[derive(Clone, Debug, PartialEq)]
522     struct Ex3(&'static str);
523 
524     #[test]
test_connected_extra()525     fn test_connected_extra() {
526         let c1 = Connected::new().extra(Ex1(41));
527 
528         let mut ex = ::http::Extensions::new();
529 
530         assert_eq!(ex.get::<Ex1>(), None);
531 
532         c1.extra.as_ref().expect("c1 extra").set(&mut ex);
533 
534         assert_eq!(ex.get::<Ex1>(), Some(&Ex1(41)));
535     }
536 
537     #[test]
test_connected_extra_chain()538     fn test_connected_extra_chain() {
539         // If a user composes connectors and at each stage, there's "extra"
540         // info to attach, it shouldn't override the previous extras.
541 
542         let c1 = Connected::new()
543             .extra(Ex1(45))
544             .extra(Ex2("zoom"))
545             .extra(Ex3("pew pew"));
546 
547         let mut ex1 = ::http::Extensions::new();
548 
549         assert_eq!(ex1.get::<Ex1>(), None);
550         assert_eq!(ex1.get::<Ex2>(), None);
551         assert_eq!(ex1.get::<Ex3>(), None);
552 
553         c1.extra.as_ref().expect("c1 extra").set(&mut ex1);
554 
555         assert_eq!(ex1.get::<Ex1>(), Some(&Ex1(45)));
556         assert_eq!(ex1.get::<Ex2>(), Some(&Ex2("zoom")));
557         assert_eq!(ex1.get::<Ex3>(), Some(&Ex3("pew pew")));
558 
559         // Just like extensions, inserting the same type overrides previous type.
560         let c2 = Connected::new()
561             .extra(Ex1(33))
562             .extra(Ex2("hiccup"))
563             .extra(Ex1(99));
564 
565         let mut ex2 = ::http::Extensions::new();
566 
567         c2.extra.as_ref().expect("c2 extra").set(&mut ex2);
568 
569         assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99)));
570         assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup")));
571     }
572 
573     #[test]
test_sync_capture_connection()574     fn test_sync_capture_connection() {
575         let (tx, rx) = CaptureConnection::new();
576         assert!(
577             rx.connection_metadata().is_none(),
578             "connection has not been set"
579         );
580         tx.set(&Connected::new().proxy(true));
581         assert_eq!(
582             rx.connection_metadata()
583                 .as_ref()
584                 .expect("connected should be set")
585                 .is_proxied(),
586             true
587         );
588 
589         // ensure it can be called multiple times
590         assert_eq!(
591             rx.connection_metadata()
592                 .as_ref()
593                 .expect("connected should be set")
594                 .is_proxied(),
595             true
596         );
597     }
598 
599     #[tokio::test]
async_capture_connection()600     async fn async_capture_connection() {
601         let (tx, mut rx) = CaptureConnection::new();
602         assert!(
603             rx.connection_metadata().is_none(),
604             "connection has not been set"
605         );
606         let test_task = tokio::spawn(async move {
607             assert_eq!(
608                 rx.wait_for_connection_metadata()
609                     .await
610                     .as_ref()
611                     .expect("connection should be set")
612                     .is_proxied(),
613                 true
614             );
615             // can be awaited multiple times
616             assert!(
617                 rx.wait_for_connection_metadata().await.is_some(),
618                 "should be awaitable multiple times"
619             );
620 
621             assert_eq!(rx.connection_metadata().is_some(), true);
622         });
623         // can't be finished, we haven't set the connection yet
624         assert_eq!(test_task.is_finished(), false);
625         tx.set(&Connected::new().proxy(true));
626 
627         assert!(test_task.await.is_ok());
628     }
629 
630     #[tokio::test]
capture_connection_sender_side_dropped()631     async fn capture_connection_sender_side_dropped() {
632         let (tx, mut rx) = CaptureConnection::new();
633         assert!(
634             rx.connection_metadata().is_none(),
635             "connection has not been set"
636         );
637         drop(tx);
638         assert!(rx.wait_for_connection_metadata().await.is_none());
639     }
640 }
641