1 //! Connectors used by the `Client`.
2 //!
3 //! This module contains:
4 //!
5 //! - A default [`HttpConnector`][] that does DNS resolution and establishes
6 //! connections over TCP.
7 //! - Types to build custom connectors.
8 //!
9 //! # Connectors
10 //!
11 //! A "connector" is a [`Service`][] that takes a [`Uri`][] destination, and
12 //! its `Response` is some type implementing [`AsyncRead`][], [`AsyncWrite`][],
13 //! and [`Connection`][].
14 //!
15 //! ## Custom Connectors
16 //!
17 //! A simple connector that ignores the `Uri` destination and always returns
18 //! a TCP connection to the same address could be written like this:
19 //!
20 //! ```rust,ignore
21 //! let connector = tower::service_fn(|_dst| async {
22 //! tokio::net::TcpStream::connect("127.0.0.1:1337")
23 //! })
24 //! ```
25 //!
26 //! Or, fully written out:
27 //!
28 //! ```
29 //! # #[cfg(feature = "runtime")]
30 //! # mod rt {
31 //! use std::{future::Future, net::SocketAddr, pin::Pin, task::{self, Poll}};
32 //! use hyper::{service::Service, Uri};
33 //! use tokio::net::TcpStream;
34 //!
35 //! #[derive(Clone)]
36 //! struct LocalConnector;
37 //!
38 //! impl Service<Uri> for LocalConnector {
39 //! type Response = TcpStream;
40 //! type Error = std::io::Error;
41 //! // We can't "name" an `async` generated future.
42 //! type Future = Pin<Box<
43 //! dyn Future<Output = Result<Self::Response, Self::Error>> + Send
44 //! >>;
45 //!
46 //! fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
47 //! // This connector is always ready, but others might not be.
48 //! Poll::Ready(Ok(()))
49 //! }
50 //!
51 //! fn call(&mut self, _: Uri) -> Self::Future {
52 //! Box::pin(TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], 1337))))
53 //! }
54 //! }
55 //! # }
56 //! ```
57 //!
58 //! It's worth noting that for `TcpStream`s, the [`HttpConnector`][] is a
59 //! better starting place to extend from.
60 //!
61 //! Using either of the above connector examples, it can be used with the
62 //! `Client` like this:
63 //!
64 //! ```
65 //! # #[cfg(feature = "runtime")]
66 //! # fn rt () {
67 //! # let connector = hyper::client::HttpConnector::new();
68 //! // let connector = ...
69 //!
70 //! let client = hyper::Client::builder()
71 //! .build::<_, hyper::Body>(connector);
72 //! # }
73 //! ```
74 //!
75 //!
76 //! [`HttpConnector`]: HttpConnector
77 //! [`Service`]: crate::service::Service
78 //! [`Uri`]: ::http::Uri
79 //! [`AsyncRead`]: tokio::io::AsyncRead
80 //! [`AsyncWrite`]: tokio::io::AsyncWrite
81 //! [`Connection`]: Connection
82 use std::fmt;
83 use std::fmt::{Debug, Formatter};
84 use std::ops::Deref;
85 use std::sync::atomic::{AtomicBool, Ordering};
86 use std::sync::Arc;
87
88 use ::http::Extensions;
89 use tokio::sync::watch;
90
91 cfg_feature! {
92 #![feature = "tcp"]
93
94 pub use self::http::{HttpConnector, HttpInfo};
95
96 pub mod dns;
97 mod http;
98 }
99
100 cfg_feature! {
101 #![any(feature = "http1", feature = "http2")]
102
103 pub use self::sealed::Connect;
104 }
105
106 /// Describes a type returned by a connector.
107 pub trait Connection {
108 /// Return metadata describing the connection.
connected(&self) -> Connected109 fn connected(&self) -> Connected;
110 }
111
112 /// Extra information about the connected transport.
113 ///
114 /// This can be used to inform recipients about things like if ALPN
115 /// was used, or if connected to an HTTP proxy.
116 #[derive(Debug)]
117 pub struct Connected {
118 pub(super) alpn: Alpn,
119 pub(super) is_proxied: bool,
120 pub(super) extra: Option<Extra>,
121 pub(super) poisoned: PoisonPill,
122 }
123
124 #[derive(Clone)]
125 pub(crate) struct PoisonPill {
126 poisoned: Arc<AtomicBool>,
127 }
128
129 impl Debug for PoisonPill {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result130 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
131 // print the address of the pill—this makes debugging issues much easier
132 write!(
133 f,
134 "PoisonPill@{:p} {{ poisoned: {} }}",
135 self.poisoned,
136 self.poisoned.load(Ordering::Relaxed)
137 )
138 }
139 }
140
141 impl PoisonPill {
healthy() -> Self142 pub(crate) fn healthy() -> Self {
143 Self {
144 poisoned: Arc::new(AtomicBool::new(false)),
145 }
146 }
poison(&self)147 pub(crate) fn poison(&self) {
148 self.poisoned.store(true, Ordering::Relaxed)
149 }
150
poisoned(&self) -> bool151 pub(crate) fn poisoned(&self) -> bool {
152 self.poisoned.load(Ordering::Relaxed)
153 }
154 }
155
156 /// [`CaptureConnection`] allows callers to capture [`Connected`] information
157 ///
158 /// To capture a connection for a request, use [`capture_connection`].
159 #[derive(Debug, Clone)]
160 pub struct CaptureConnection {
161 rx: watch::Receiver<Option<Connected>>,
162 }
163
164 /// Capture the connection for a given request
165 ///
166 /// When making a request with Hyper, the underlying connection must implement the [`Connection`] trait.
167 /// [`capture_connection`] allows a caller to capture the returned [`Connected`] structure as soon
168 /// as the connection is established.
169 ///
170 /// *Note*: If establishing a connection fails, [`CaptureConnection::connection_metadata`] will always return none.
171 ///
172 /// # Examples
173 ///
174 /// **Synchronous access**:
175 /// The [`CaptureConnection::connection_metadata`] method allows callers to check if a connection has been
176 /// established. This is ideal for situations where you are certain the connection has already
177 /// been established (e.g. after the response future has already completed).
178 /// ```rust
179 /// use hyper::client::connect::{capture_connection, CaptureConnection};
180 /// let mut request = http::Request::builder()
181 /// .uri("http://foo.com")
182 /// .body(())
183 /// .unwrap();
184 ///
185 /// let captured_connection = capture_connection(&mut request);
186 /// // some time later after the request has been sent...
187 /// let connection_info = captured_connection.connection_metadata();
188 /// println!("we are connected! {:?}", connection_info.as_ref());
189 /// ```
190 ///
191 /// **Asynchronous access**:
192 /// The [`CaptureConnection::wait_for_connection_metadata`] method returns a future resolves as soon as the
193 /// connection is available.
194 ///
195 /// ```rust
196 /// # #[cfg(feature = "runtime")]
197 /// # async fn example() {
198 /// use hyper::client::connect::{capture_connection, CaptureConnection};
199 /// let mut request = http::Request::builder()
200 /// .uri("http://foo.com")
201 /// .body(hyper::Body::empty())
202 /// .unwrap();
203 ///
204 /// let mut captured = capture_connection(&mut request);
205 /// tokio::task::spawn(async move {
206 /// let connection_info = captured.wait_for_connection_metadata().await;
207 /// println!("we are connected! {:?}", connection_info.as_ref());
208 /// });
209 ///
210 /// let client = hyper::Client::new();
211 /// client.request(request).await.expect("request failed");
212 /// # }
213 /// ```
capture_connection<B>(request: &mut crate::http::Request<B>) -> CaptureConnection214 pub fn capture_connection<B>(request: &mut crate::http::Request<B>) -> CaptureConnection {
215 let (tx, rx) = CaptureConnection::new();
216 request.extensions_mut().insert(tx);
217 rx
218 }
219
220 /// TxSide for [`CaptureConnection`]
221 ///
222 /// This is inserted into `Extensions` to allow Hyper to back channel connection info
223 #[derive(Clone)]
224 pub(crate) struct CaptureConnectionExtension {
225 tx: Arc<watch::Sender<Option<Connected>>>,
226 }
227
228 impl CaptureConnectionExtension {
set(&self, connected: &Connected)229 pub(crate) fn set(&self, connected: &Connected) {
230 self.tx.send_replace(Some(connected.clone()));
231 }
232 }
233
234 impl CaptureConnection {
235 /// Internal API to create the tx and rx half of [`CaptureConnection`]
new() -> (CaptureConnectionExtension, Self)236 pub(crate) fn new() -> (CaptureConnectionExtension, Self) {
237 let (tx, rx) = watch::channel(None);
238 (
239 CaptureConnectionExtension { tx: Arc::new(tx) },
240 CaptureConnection { rx },
241 )
242 }
243
244 /// Retrieve the connection metadata, if available
connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_245 pub fn connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_ {
246 self.rx.borrow()
247 }
248
249 /// Wait for the connection to be established
250 ///
251 /// If a connection was established, this will always return `Some(...)`. If the request never
252 /// successfully connected (e.g. DNS resolution failure), this method will never return.
wait_for_connection_metadata( &mut self, ) -> impl Deref<Target = Option<Connected>> + '_253 pub async fn wait_for_connection_metadata(
254 &mut self,
255 ) -> impl Deref<Target = Option<Connected>> + '_ {
256 if self.rx.borrow().is_some() {
257 return self.rx.borrow();
258 }
259 let _ = self.rx.changed().await;
260 self.rx.borrow()
261 }
262 }
263
264 pub(super) struct Extra(Box<dyn ExtraInner>);
265
266 #[derive(Clone, Copy, Debug, PartialEq)]
267 pub(super) enum Alpn {
268 H2,
269 None,
270 }
271
272 impl Connected {
273 /// Create new `Connected` type with empty metadata.
new() -> Connected274 pub fn new() -> Connected {
275 Connected {
276 alpn: Alpn::None,
277 is_proxied: false,
278 extra: None,
279 poisoned: PoisonPill::healthy(),
280 }
281 }
282
283 /// Set whether the connected transport is to an HTTP proxy.
284 ///
285 /// This setting will affect if HTTP/1 requests written on the transport
286 /// will have the request-target in absolute-form or origin-form:
287 ///
288 /// - When `proxy(false)`:
289 ///
290 /// ```http
291 /// GET /guide HTTP/1.1
292 /// ```
293 ///
294 /// - When `proxy(true)`:
295 ///
296 /// ```http
297 /// GET http://hyper.rs/guide HTTP/1.1
298 /// ```
299 ///
300 /// Default is `false`.
proxy(mut self, is_proxied: bool) -> Connected301 pub fn proxy(mut self, is_proxied: bool) -> Connected {
302 self.is_proxied = is_proxied;
303 self
304 }
305
306 /// Determines if the connected transport is to an HTTP proxy.
is_proxied(&self) -> bool307 pub fn is_proxied(&self) -> bool {
308 self.is_proxied
309 }
310
311 /// Set extra connection information to be set in the extensions of every `Response`.
extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected312 pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected {
313 if let Some(prev) = self.extra {
314 self.extra = Some(Extra(Box::new(ExtraChain(prev.0, extra))));
315 } else {
316 self.extra = Some(Extra(Box::new(ExtraEnvelope(extra))));
317 }
318 self
319 }
320
321 /// Copies the extra connection information into an `Extensions` map.
get_extras(&self, extensions: &mut Extensions)322 pub fn get_extras(&self, extensions: &mut Extensions) {
323 if let Some(extra) = &self.extra {
324 extra.set(extensions);
325 }
326 }
327
328 /// Set that the connected transport negotiated HTTP/2 as its next protocol.
negotiated_h2(mut self) -> Connected329 pub fn negotiated_h2(mut self) -> Connected {
330 self.alpn = Alpn::H2;
331 self
332 }
333
334 /// Determines if the connected transport negotiated HTTP/2 as its next protocol.
is_negotiated_h2(&self) -> bool335 pub fn is_negotiated_h2(&self) -> bool {
336 self.alpn == Alpn::H2
337 }
338
339 /// Poison this connection
340 ///
341 /// A poisoned connection will not be reused for subsequent requests by the pool
poison(&self)342 pub fn poison(&self) {
343 self.poisoned.poison();
344 tracing::debug!(
345 poison_pill = ?self.poisoned, "connection was poisoned"
346 );
347 }
348
349 // Don't public expose that `Connected` is `Clone`, unsure if we want to
350 // keep that contract...
clone(&self) -> Connected351 pub(super) fn clone(&self) -> Connected {
352 Connected {
353 alpn: self.alpn.clone(),
354 is_proxied: self.is_proxied,
355 extra: self.extra.clone(),
356 poisoned: self.poisoned.clone(),
357 }
358 }
359 }
360
361 // ===== impl Extra =====
362
363 impl Extra {
set(&self, res: &mut Extensions)364 pub(super) fn set(&self, res: &mut Extensions) {
365 self.0.set(res);
366 }
367 }
368
369 impl Clone for Extra {
clone(&self) -> Extra370 fn clone(&self) -> Extra {
371 Extra(self.0.clone_box())
372 }
373 }
374
375 impl fmt::Debug for Extra {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result376 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377 f.debug_struct("Extra").finish()
378 }
379 }
380
381 trait ExtraInner: Send + Sync {
clone_box(&self) -> Box<dyn ExtraInner>382 fn clone_box(&self) -> Box<dyn ExtraInner>;
set(&self, res: &mut Extensions)383 fn set(&self, res: &mut Extensions);
384 }
385
386 // This indirection allows the `Connected` to have a type-erased "extra" value,
387 // while that type still knows its inner extra type. This allows the correct
388 // TypeId to be used when inserting into `res.extensions_mut()`.
389 #[derive(Clone)]
390 struct ExtraEnvelope<T>(T);
391
392 impl<T> ExtraInner for ExtraEnvelope<T>
393 where
394 T: Clone + Send + Sync + 'static,
395 {
clone_box(&self) -> Box<dyn ExtraInner>396 fn clone_box(&self) -> Box<dyn ExtraInner> {
397 Box::new(self.clone())
398 }
399
set(&self, res: &mut Extensions)400 fn set(&self, res: &mut Extensions) {
401 res.insert(self.0.clone());
402 }
403 }
404
405 struct ExtraChain<T>(Box<dyn ExtraInner>, T);
406
407 impl<T: Clone> Clone for ExtraChain<T> {
clone(&self) -> Self408 fn clone(&self) -> Self {
409 ExtraChain(self.0.clone_box(), self.1.clone())
410 }
411 }
412
413 impl<T> ExtraInner for ExtraChain<T>
414 where
415 T: Clone + Send + Sync + 'static,
416 {
clone_box(&self) -> Box<dyn ExtraInner>417 fn clone_box(&self) -> Box<dyn ExtraInner> {
418 Box::new(self.clone())
419 }
420
set(&self, res: &mut Extensions)421 fn set(&self, res: &mut Extensions) {
422 self.0.set(res);
423 res.insert(self.1.clone());
424 }
425 }
426
427 #[cfg(any(feature = "http1", feature = "http2"))]
428 pub(super) mod sealed {
429 use std::error::Error as StdError;
430 use std::future::Future;
431 use std::marker::Unpin;
432
433 use ::http::Uri;
434 use tokio::io::{AsyncRead, AsyncWrite};
435
436 use super::Connection;
437
438 /// Connect to a destination, returning an IO transport.
439 ///
440 /// A connector receives a [`Uri`](::http::Uri) and returns a `Future` of the
441 /// ready connection.
442 ///
443 /// # Trait Alias
444 ///
445 /// This is really just an *alias* for the `tower::Service` trait, with
446 /// additional bounds set for convenience *inside* hyper. You don't actually
447 /// implement this trait, but `tower::Service<Uri>` instead.
448 // The `Sized` bound is to prevent creating `dyn Connect`, since they cannot
449 // fit the `Connect` bounds because of the blanket impl for `Service`.
450 pub trait Connect: Sealed + Sized {
451 #[doc(hidden)]
452 type _Svc: ConnectSvc;
453 #[doc(hidden)]
connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future454 fn connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future;
455 }
456
457 #[allow(unreachable_pub)]
458 pub trait ConnectSvc {
459 type Connection: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static;
460 type Error: Into<Box<dyn StdError + Send + Sync>>;
461 type Future: Future<Output = Result<Self::Connection, Self::Error>> + Unpin + Send + 'static;
462
connect(self, internal_only: Internal, dst: Uri) -> Self::Future463 fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future;
464 }
465
466 impl<S, T> Connect for S
467 where
468 S: tower_service::Service<Uri, Response = T> + Send + 'static,
469 S::Error: Into<Box<dyn StdError + Send + Sync>>,
470 S::Future: Unpin + Send,
471 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
472 {
473 type _Svc = S;
474
connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri>475 fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> {
476 crate::service::oneshot(self, dst)
477 }
478 }
479
480 impl<S, T> ConnectSvc for S
481 where
482 S: tower_service::Service<Uri, Response = T> + Send + 'static,
483 S::Error: Into<Box<dyn StdError + Send + Sync>>,
484 S::Future: Unpin + Send,
485 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
486 {
487 type Connection = T;
488 type Error = S::Error;
489 type Future = crate::service::Oneshot<S, Uri>;
490
connect(self, _: Internal, dst: Uri) -> Self::Future491 fn connect(self, _: Internal, dst: Uri) -> Self::Future {
492 crate::service::oneshot(self, dst)
493 }
494 }
495
496 impl<S, T> Sealed for S
497 where
498 S: tower_service::Service<Uri, Response = T> + Send,
499 S::Error: Into<Box<dyn StdError + Send + Sync>>,
500 S::Future: Unpin + Send,
501 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
502 {
503 }
504
505 pub trait Sealed {}
506 #[allow(missing_debug_implementations)]
507 pub struct Internal;
508 }
509
510 #[cfg(test)]
511 mod tests {
512 use super::Connected;
513 use crate::client::connect::CaptureConnection;
514
515 #[derive(Clone, Debug, PartialEq)]
516 struct Ex1(usize);
517
518 #[derive(Clone, Debug, PartialEq)]
519 struct Ex2(&'static str);
520
521 #[derive(Clone, Debug, PartialEq)]
522 struct Ex3(&'static str);
523
524 #[test]
test_connected_extra()525 fn test_connected_extra() {
526 let c1 = Connected::new().extra(Ex1(41));
527
528 let mut ex = ::http::Extensions::new();
529
530 assert_eq!(ex.get::<Ex1>(), None);
531
532 c1.extra.as_ref().expect("c1 extra").set(&mut ex);
533
534 assert_eq!(ex.get::<Ex1>(), Some(&Ex1(41)));
535 }
536
537 #[test]
test_connected_extra_chain()538 fn test_connected_extra_chain() {
539 // If a user composes connectors and at each stage, there's "extra"
540 // info to attach, it shouldn't override the previous extras.
541
542 let c1 = Connected::new()
543 .extra(Ex1(45))
544 .extra(Ex2("zoom"))
545 .extra(Ex3("pew pew"));
546
547 let mut ex1 = ::http::Extensions::new();
548
549 assert_eq!(ex1.get::<Ex1>(), None);
550 assert_eq!(ex1.get::<Ex2>(), None);
551 assert_eq!(ex1.get::<Ex3>(), None);
552
553 c1.extra.as_ref().expect("c1 extra").set(&mut ex1);
554
555 assert_eq!(ex1.get::<Ex1>(), Some(&Ex1(45)));
556 assert_eq!(ex1.get::<Ex2>(), Some(&Ex2("zoom")));
557 assert_eq!(ex1.get::<Ex3>(), Some(&Ex3("pew pew")));
558
559 // Just like extensions, inserting the same type overrides previous type.
560 let c2 = Connected::new()
561 .extra(Ex1(33))
562 .extra(Ex2("hiccup"))
563 .extra(Ex1(99));
564
565 let mut ex2 = ::http::Extensions::new();
566
567 c2.extra.as_ref().expect("c2 extra").set(&mut ex2);
568
569 assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99)));
570 assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup")));
571 }
572
573 #[test]
test_sync_capture_connection()574 fn test_sync_capture_connection() {
575 let (tx, rx) = CaptureConnection::new();
576 assert!(
577 rx.connection_metadata().is_none(),
578 "connection has not been set"
579 );
580 tx.set(&Connected::new().proxy(true));
581 assert_eq!(
582 rx.connection_metadata()
583 .as_ref()
584 .expect("connected should be set")
585 .is_proxied(),
586 true
587 );
588
589 // ensure it can be called multiple times
590 assert_eq!(
591 rx.connection_metadata()
592 .as_ref()
593 .expect("connected should be set")
594 .is_proxied(),
595 true
596 );
597 }
598
599 #[tokio::test]
async_capture_connection()600 async fn async_capture_connection() {
601 let (tx, mut rx) = CaptureConnection::new();
602 assert!(
603 rx.connection_metadata().is_none(),
604 "connection has not been set"
605 );
606 let test_task = tokio::spawn(async move {
607 assert_eq!(
608 rx.wait_for_connection_metadata()
609 .await
610 .as_ref()
611 .expect("connection should be set")
612 .is_proxied(),
613 true
614 );
615 // can be awaited multiple times
616 assert!(
617 rx.wait_for_connection_metadata().await.is_some(),
618 "should be awaitable multiple times"
619 );
620
621 assert_eq!(rx.connection_metadata().is_some(), true);
622 });
623 // can't be finished, we haven't set the connection yet
624 assert_eq!(test_task.is_finished(), false);
625 tx.set(&Connected::new().proxy(true));
626
627 assert!(test_task.await.is_ok());
628 }
629
630 #[tokio::test]
capture_connection_sender_side_dropped()631 async fn capture_connection_sender_side_dropped() {
632 let (tx, mut rx) = CaptureConnection::new();
633 assert!(
634 rx.connection_metadata().is_none(),
635 "connection has not been set"
636 );
637 drop(tx);
638 assert!(rx.wait_for_connection_metadata().await.is_none());
639 }
640 }
641