1 use std::collections::{HashMap, HashSet, VecDeque};
2 use std::error::Error as StdError;
3 use std::fmt;
4 use std::future::Future;
5 use std::marker::Unpin;
6 use std::ops::{Deref, DerefMut};
7 use std::pin::Pin;
8 use std::sync::{Arc, Mutex, Weak};
9 use std::task::{Context, Poll};
10 
11 #[cfg(not(feature = "runtime"))]
12 use std::time::{Duration, Instant};
13 
14 use futures_channel::oneshot;
15 #[cfg(feature = "runtime")]
16 use tokio::time::{Duration, Instant, Interval};
17 use tracing::{debug, trace};
18 
19 use super::client::Ver;
20 use crate::common::exec::Exec;
21 
22 // FIXME: allow() required due to `impl Trait` leaking types to this lint
23 #[allow(missing_debug_implementations)]
24 pub(super) struct Pool<T> {
25     // If the pool is disabled, this is None.
26     inner: Option<Arc<Mutex<PoolInner<T>>>>,
27 }
28 
29 // Before using a pooled connection, make sure the sender is not dead.
30 //
31 // This is a trait to allow the `client::pool::tests` to work for `i32`.
32 //
33 // See https://github.com/hyperium/hyper/issues/1429
34 pub(super) trait Poolable: Unpin + Send + Sized + 'static {
is_open(&self) -> bool35     fn is_open(&self) -> bool;
36     /// Reserve this connection.
37     ///
38     /// Allows for HTTP/2 to return a shared reservation.
reserve(self) -> Reservation<Self>39     fn reserve(self) -> Reservation<Self>;
can_share(&self) -> bool40     fn can_share(&self) -> bool;
41 }
42 
43 /// When checking out a pooled connection, it might be that the connection
44 /// only supports a single reservation, or it might be usable for many.
45 ///
46 /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
47 /// used for multiple requests.
48 // FIXME: allow() required due to `impl Trait` leaking types to this lint
49 #[allow(missing_debug_implementations)]
50 pub(super) enum Reservation<T> {
51     /// This connection could be used multiple times, the first one will be
52     /// reinserted into the `idle` pool, and the second will be given to
53     /// the `Checkout`.
54     #[cfg(feature = "http2")]
55     Shared(T, T),
56     /// This connection requires unique access. It will be returned after
57     /// use is complete.
58     Unique(T),
59 }
60 
61 /// Simple type alias in case the key type needs to be adjusted.
62 pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
63 
64 struct PoolInner<T> {
65     // A flag that a connection is being established, and the connection
66     // should be shared. This prevents making multiple HTTP/2 connections
67     // to the same host.
68     connecting: HashSet<Key>,
69     // These are internal Conns sitting in the event loop in the KeepAlive
70     // state, waiting to receive a new Request to send on the socket.
71     idle: HashMap<Key, Vec<Idle<T>>>,
72     max_idle_per_host: usize,
73     // These are outstanding Checkouts that are waiting for a socket to be
74     // able to send a Request one. This is used when "racing" for a new
75     // connection.
76     //
77     // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
78     // for the Pool to receive an idle Conn. When a Conn becomes idle,
79     // this list is checked for any parked Checkouts, and tries to notify
80     // them that the Conn could be used instead of waiting for a brand new
81     // connection.
82     waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
83     // A oneshot channel is used to allow the interval to be notified when
84     // the Pool completely drops. That way, the interval can cancel immediately.
85     #[cfg(feature = "runtime")]
86     idle_interval_ref: Option<oneshot::Sender<std::convert::Infallible>>,
87     #[cfg(feature = "runtime")]
88     exec: Exec,
89     timeout: Option<Duration>,
90 }
91 
92 // This is because `Weak::new()` *allocates* space for `T`, even if it
93 // doesn't need it!
94 struct WeakOpt<T>(Option<Weak<T>>);
95 
96 #[derive(Clone, Copy, Debug)]
97 pub(super) struct Config {
98     pub(super) idle_timeout: Option<Duration>,
99     pub(super) max_idle_per_host: usize,
100 }
101 
102 impl Config {
is_enabled(&self) -> bool103     pub(super) fn is_enabled(&self) -> bool {
104         self.max_idle_per_host > 0
105     }
106 }
107 
108 impl<T> Pool<T> {
new(config: Config, __exec: &Exec) -> Pool<T>109     pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> {
110         let inner = if config.is_enabled() {
111             Some(Arc::new(Mutex::new(PoolInner {
112                 connecting: HashSet::new(),
113                 idle: HashMap::new(),
114                 #[cfg(feature = "runtime")]
115                 idle_interval_ref: None,
116                 max_idle_per_host: config.max_idle_per_host,
117                 waiters: HashMap::new(),
118                 #[cfg(feature = "runtime")]
119                 exec: __exec.clone(),
120                 timeout: config.idle_timeout.filter(|&t| t > Duration::ZERO),
121             })))
122         } else {
123             None
124         };
125 
126         Pool { inner }
127     }
128 
is_enabled(&self) -> bool129     fn is_enabled(&self) -> bool {
130         self.inner.is_some()
131     }
132 
133     #[cfg(test)]
no_timer(&self)134     pub(super) fn no_timer(&self) {
135         // Prevent an actual interval from being created for this pool...
136         #[cfg(feature = "runtime")]
137         {
138             let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
139             assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
140             let (tx, _) = oneshot::channel();
141             inner.idle_interval_ref = Some(tx);
142         }
143     }
144 }
145 
146 impl<T: Poolable> Pool<T> {
147     /// Returns a `Checkout` which is a future that resolves if an idle
148     /// connection becomes available.
checkout(&self, key: Key) -> Checkout<T>149     pub(super) fn checkout(&self, key: Key) -> Checkout<T> {
150         Checkout {
151             key,
152             pool: self.clone(),
153             waiter: None,
154         }
155     }
156 
157     /// Ensure that there is only ever 1 connecting task for HTTP/2
158     /// connections. This does nothing for HTTP/1.
connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>>159     pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
160         if ver == Ver::Http2 {
161             if let Some(ref enabled) = self.inner {
162                 let mut inner = enabled.lock().unwrap();
163                 return if inner.connecting.insert(key.clone()) {
164                     let connecting = Connecting {
165                         key: key.clone(),
166                         pool: WeakOpt::downgrade(enabled),
167                     };
168                     Some(connecting)
169                 } else {
170                     trace!("HTTP/2 connecting already in progress for {:?}", key);
171                     None
172                 };
173             }
174         }
175 
176         // else
177         Some(Connecting {
178             key: key.clone(),
179             // in HTTP/1's case, there is never a lock, so we don't
180             // need to do anything in Drop.
181             pool: WeakOpt::none(),
182         })
183     }
184 
185     #[cfg(test)]
locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>>186     fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> {
187         self.inner.as_ref().expect("enabled").lock().expect("lock")
188     }
189 
190     /* Used in client/tests.rs...
191     #[cfg(feature = "runtime")]
192     #[cfg(test)]
193     pub(super) fn h1_key(&self, s: &str) -> Key {
194         Arc::new(s.to_string())
195     }
196 
197     #[cfg(feature = "runtime")]
198     #[cfg(test)]
199     pub(super) fn idle_count(&self, key: &Key) -> usize {
200         self
201             .locked()
202             .idle
203             .get(key)
204             .map(|list| list.len())
205             .unwrap_or(0)
206     }
207     */
208 
pooled( &self, #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>, value: T, ) -> Pooled<T>209     pub(super) fn pooled(
210         &self,
211         #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>,
212         value: T,
213     ) -> Pooled<T> {
214         let (value, pool_ref) = if let Some(ref enabled) = self.inner {
215             match value.reserve() {
216                 #[cfg(feature = "http2")]
217                 Reservation::Shared(to_insert, to_return) => {
218                     let mut inner = enabled.lock().unwrap();
219                     inner.put(connecting.key.clone(), to_insert, enabled);
220                     // Do this here instead of Drop for Connecting because we
221                     // already have a lock, no need to lock the mutex twice.
222                     inner.connected(&connecting.key);
223                     // prevent the Drop of Connecting from repeating inner.connected()
224                     connecting.pool = WeakOpt::none();
225 
226                     // Shared reservations don't need a reference to the pool,
227                     // since the pool always keeps a copy.
228                     (to_return, WeakOpt::none())
229                 }
230                 Reservation::Unique(value) => {
231                     // Unique reservations must take a reference to the pool
232                     // since they hope to reinsert once the reservation is
233                     // completed
234                     (value, WeakOpt::downgrade(enabled))
235                 }
236             }
237         } else {
238             // If pool is not enabled, skip all the things...
239 
240             // The Connecting should have had no pool ref
241             debug_assert!(connecting.pool.upgrade().is_none());
242 
243             (value, WeakOpt::none())
244         };
245         Pooled {
246             key: connecting.key.clone(),
247             is_reused: false,
248             pool: pool_ref,
249             value: Some(value),
250         }
251     }
252 
reuse(&self, key: &Key, value: T) -> Pooled<T>253     fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
254         debug!("reuse idle connection for {:?}", key);
255         // TODO: unhack this
256         // In Pool::pooled(), which is used for inserting brand new connections,
257         // there's some code that adjusts the pool reference taken depending
258         // on if the Reservation can be shared or is unique. By the time
259         // reuse() is called, the reservation has already been made, and
260         // we just have the final value, without knowledge of if this is
261         // unique or shared. So, the hack is to just assume Ver::Http2 means
262         // shared... :(
263         let mut pool_ref = WeakOpt::none();
264         if !value.can_share() {
265             if let Some(ref enabled) = self.inner {
266                 pool_ref = WeakOpt::downgrade(enabled);
267             }
268         }
269 
270         Pooled {
271             is_reused: true,
272             key: key.clone(),
273             pool: pool_ref,
274             value: Some(value),
275         }
276     }
277 }
278 
279 /// Pop off this list, looking for a usable connection that hasn't expired.
280 struct IdlePopper<'a, T> {
281     key: &'a Key,
282     list: &'a mut Vec<Idle<T>>,
283 }
284 
285 impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
pop(self, expiration: &Expiration) -> Option<Idle<T>>286     fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
287         while let Some(entry) = self.list.pop() {
288             // If the connection has been closed, or is older than our idle
289             // timeout, simply drop it and keep looking...
290             if !entry.value.is_open() {
291                 trace!("removing closed connection for {:?}", self.key);
292                 continue;
293             }
294             // TODO: Actually, since the `idle` list is pushed to the end always,
295             // that would imply that if *this* entry is expired, then anything
296             // "earlier" in the list would *have* to be expired also... Right?
297             //
298             // In that case, we could just break out of the loop and drop the
299             // whole list...
300             if expiration.expires(entry.idle_at) {
301                 trace!("removing expired connection for {:?}", self.key);
302                 continue;
303             }
304 
305             let value = match entry.value.reserve() {
306                 #[cfg(feature = "http2")]
307                 Reservation::Shared(to_reinsert, to_checkout) => {
308                     self.list.push(Idle {
309                         idle_at: Instant::now(),
310                         value: to_reinsert,
311                     });
312                     to_checkout
313                 }
314                 Reservation::Unique(unique) => unique,
315             };
316 
317             return Some(Idle {
318                 idle_at: entry.idle_at,
319                 value,
320             });
321         }
322 
323         None
324     }
325 }
326 
327 impl<T: Poolable> PoolInner<T> {
put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>)328     fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
329         if value.can_share() && self.idle.contains_key(&key) {
330             trace!("put; existing idle HTTP/2 connection for {:?}", key);
331             return;
332         }
333         trace!("put; add idle connection for {:?}", key);
334         let mut remove_waiters = false;
335         let mut value = Some(value);
336         if let Some(waiters) = self.waiters.get_mut(&key) {
337             while let Some(tx) = waiters.pop_front() {
338                 if !tx.is_canceled() {
339                     let reserved = value.take().expect("value already sent");
340                     let reserved = match reserved.reserve() {
341                         #[cfg(feature = "http2")]
342                         Reservation::Shared(to_keep, to_send) => {
343                             value = Some(to_keep);
344                             to_send
345                         }
346                         Reservation::Unique(uniq) => uniq,
347                     };
348                     match tx.send(reserved) {
349                         Ok(()) => {
350                             if value.is_none() {
351                                 break;
352                             } else {
353                                 continue;
354                             }
355                         }
356                         Err(e) => {
357                             value = Some(e);
358                         }
359                     }
360                 }
361 
362                 trace!("put; removing canceled waiter for {:?}", key);
363             }
364             remove_waiters = waiters.is_empty();
365         }
366         if remove_waiters {
367             self.waiters.remove(&key);
368         }
369 
370         match value {
371             Some(value) => {
372                 // borrow-check scope...
373                 {
374                     let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
375                     if self.max_idle_per_host <= idle_list.len() {
376                         trace!("max idle per host for {:?}, dropping connection", key);
377                         return;
378                     }
379 
380                     debug!("pooling idle connection for {:?}", key);
381                     idle_list.push(Idle {
382                         value,
383                         idle_at: Instant::now(),
384                     });
385                 }
386 
387                 #[cfg(feature = "runtime")]
388                 {
389                     self.spawn_idle_interval(__pool_ref);
390                 }
391             }
392             None => trace!("put; found waiter for {:?}", key),
393         }
394     }
395 
396     /// A `Connecting` task is complete. Not necessarily successfully,
397     /// but the lock is going away, so clean up.
connected(&mut self, key: &Key)398     fn connected(&mut self, key: &Key) {
399         let existed = self.connecting.remove(key);
400         debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
401         // cancel any waiters. if there are any, it's because
402         // this Connecting task didn't complete successfully.
403         // those waiters would never receive a connection.
404         self.waiters.remove(key);
405     }
406 
407     #[cfg(feature = "runtime")]
spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>)408     fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
409         let (dur, rx) = {
410             if self.idle_interval_ref.is_some() {
411                 return;
412             }
413 
414             if let Some(dur) = self.timeout {
415                 let (tx, rx) = oneshot::channel();
416                 self.idle_interval_ref = Some(tx);
417                 (dur, rx)
418             } else {
419                 return;
420             }
421         };
422 
423         let interval = IdleTask {
424             interval: tokio::time::interval(dur),
425             pool: WeakOpt::downgrade(pool_ref),
426             pool_drop_notifier: rx,
427         };
428 
429         self.exec.execute(interval);
430     }
431 }
432 
433 impl<T> PoolInner<T> {
434     /// Any `FutureResponse`s that were created will have made a `Checkout`,
435     /// and possibly inserted into the pool that it is waiting for an idle
436     /// connection. If a user ever dropped that future, we need to clean out
437     /// those parked senders.
clean_waiters(&mut self, key: &Key)438     fn clean_waiters(&mut self, key: &Key) {
439         let mut remove_waiters = false;
440         if let Some(waiters) = self.waiters.get_mut(key) {
441             waiters.retain(|tx| !tx.is_canceled());
442             remove_waiters = waiters.is_empty();
443         }
444         if remove_waiters {
445             self.waiters.remove(key);
446         }
447     }
448 }
449 
450 #[cfg(feature = "runtime")]
451 impl<T: Poolable> PoolInner<T> {
452     /// This should *only* be called by the IdleTask
clear_expired(&mut self)453     fn clear_expired(&mut self) {
454         let dur = self.timeout.expect("interval assumes timeout");
455 
456         let now = Instant::now();
457         //self.last_idle_check_at = now;
458 
459         self.idle.retain(|key, values| {
460             values.retain(|entry| {
461                 if !entry.value.is_open() {
462                     trace!("idle interval evicting closed for {:?}", key);
463                     return false;
464                 }
465 
466                 // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470.
467                 if now.saturating_duration_since(entry.idle_at) > dur {
468                     trace!("idle interval evicting expired for {:?}", key);
469                     return false;
470                 }
471 
472                 // Otherwise, keep this value...
473                 true
474             });
475 
476             // returning false evicts this key/val
477             !values.is_empty()
478         });
479     }
480 }
481 
482 impl<T> Clone for Pool<T> {
clone(&self) -> Pool<T>483     fn clone(&self) -> Pool<T> {
484         Pool {
485             inner: self.inner.clone(),
486         }
487     }
488 }
489 
490 /// A wrapped poolable value that tries to reinsert to the Pool on Drop.
491 // Note: The bounds `T: Poolable` is needed for the Drop impl.
492 pub(super) struct Pooled<T: Poolable> {
493     value: Option<T>,
494     is_reused: bool,
495     key: Key,
496     pool: WeakOpt<Mutex<PoolInner<T>>>,
497 }
498 
499 impl<T: Poolable> Pooled<T> {
is_reused(&self) -> bool500     pub(super) fn is_reused(&self) -> bool {
501         self.is_reused
502     }
503 
is_pool_enabled(&self) -> bool504     pub(super) fn is_pool_enabled(&self) -> bool {
505         self.pool.0.is_some()
506     }
507 
as_ref(&self) -> &T508     fn as_ref(&self) -> &T {
509         self.value.as_ref().expect("not dropped")
510     }
511 
as_mut(&mut self) -> &mut T512     fn as_mut(&mut self) -> &mut T {
513         self.value.as_mut().expect("not dropped")
514     }
515 }
516 
517 impl<T: Poolable> Deref for Pooled<T> {
518     type Target = T;
deref(&self) -> &T519     fn deref(&self) -> &T {
520         self.as_ref()
521     }
522 }
523 
524 impl<T: Poolable> DerefMut for Pooled<T> {
deref_mut(&mut self) -> &mut T525     fn deref_mut(&mut self) -> &mut T {
526         self.as_mut()
527     }
528 }
529 
530 impl<T: Poolable> Drop for Pooled<T> {
drop(&mut self)531     fn drop(&mut self) {
532         if let Some(value) = self.value.take() {
533             if !value.is_open() {
534                 // If we *already* know the connection is done here,
535                 // it shouldn't be re-inserted back into the pool.
536                 return;
537             }
538 
539             if let Some(pool) = self.pool.upgrade() {
540                 if let Ok(mut inner) = pool.lock() {
541                     inner.put(self.key.clone(), value, &pool);
542                 }
543             } else if !value.can_share() {
544                 trace!("pool dropped, dropping pooled ({:?})", self.key);
545             }
546             // Ver::Http2 is already in the Pool (or dead), so we wouldn't
547             // have an actual reference to the Pool.
548         }
549     }
550 }
551 
552 impl<T: Poolable> fmt::Debug for Pooled<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result553     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
554         f.debug_struct("Pooled").field("key", &self.key).finish()
555     }
556 }
557 
558 struct Idle<T> {
559     idle_at: Instant,
560     value: T,
561 }
562 
563 // FIXME: allow() required due to `impl Trait` leaking types to this lint
564 #[allow(missing_debug_implementations)]
565 pub(super) struct Checkout<T> {
566     key: Key,
567     pool: Pool<T>,
568     waiter: Option<oneshot::Receiver<T>>,
569 }
570 
571 #[derive(Debug)]
572 pub(super) struct CheckoutIsClosedError;
573 
574 impl StdError for CheckoutIsClosedError {}
575 
576 impl fmt::Display for CheckoutIsClosedError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result577     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578         f.write_str("checked out connection was closed")
579     }
580 }
581 
582 impl<T: Poolable> Checkout<T> {
poll_waiter(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Pooled<T>>>>583     fn poll_waiter(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Pooled<T>>>> {
584         if let Some(mut rx) = self.waiter.take() {
585             match Pin::new(&mut rx).poll(cx) {
586                 Poll::Ready(Ok(value)) => {
587                     if value.is_open() {
588                         Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
589                     } else {
590                         Poll::Ready(Some(Err(
591                             crate::Error::new_canceled().with(CheckoutIsClosedError)
592                         )))
593                     }
594                 }
595                 Poll::Pending => {
596                     self.waiter = Some(rx);
597                     Poll::Pending
598                 }
599                 Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
600                     crate::Error::new_canceled().with("request has been canceled")
601                 ))),
602             }
603         } else {
604             Poll::Ready(None)
605         }
606     }
607 
checkout(&mut self, cx: &mut Context<'_>) -> Option<Pooled<T>>608     fn checkout(&mut self, cx: &mut Context<'_>) -> Option<Pooled<T>> {
609         let entry = {
610             let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
611             let expiration = Expiration::new(inner.timeout);
612             let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
613                 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
614                 // A block to end the mutable borrow on list,
615                 // so the map below can check is_empty()
616                 {
617                     let popper = IdlePopper {
618                         key: &self.key,
619                         list,
620                     };
621                     popper.pop(&expiration)
622                 }
623                 .map(|e| (e, list.is_empty()))
624             });
625 
626             let (entry, empty) = if let Some((e, empty)) = maybe_entry {
627                 (Some(e), empty)
628             } else {
629                 // No entry found means nuke the list for sure.
630                 (None, true)
631             };
632             if empty {
633                 //TODO: This could be done with the HashMap::entry API instead.
634                 inner.idle.remove(&self.key);
635             }
636 
637             if entry.is_none() && self.waiter.is_none() {
638                 let (tx, mut rx) = oneshot::channel();
639                 trace!("checkout waiting for idle connection: {:?}", self.key);
640                 inner
641                     .waiters
642                     .entry(self.key.clone())
643                     .or_insert_with(VecDeque::new)
644                     .push_back(tx);
645 
646                 // register the waker with this oneshot
647                 assert!(Pin::new(&mut rx).poll(cx).is_pending());
648                 self.waiter = Some(rx);
649             }
650 
651             entry
652         };
653 
654         entry.map(|e| self.pool.reuse(&self.key, e.value))
655     }
656 }
657 
658 impl<T: Poolable> Future for Checkout<T> {
659     type Output = crate::Result<Pooled<T>>;
660 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>661     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
662         if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
663             return Poll::Ready(Ok(pooled));
664         }
665 
666         if let Some(pooled) = self.checkout(cx) {
667             Poll::Ready(Ok(pooled))
668         } else if !self.pool.is_enabled() {
669             Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled")))
670         } else {
671             // There's a new waiter, already registered in self.checkout()
672             debug_assert!(self.waiter.is_some());
673             Poll::Pending
674         }
675     }
676 }
677 
678 impl<T> Drop for Checkout<T> {
drop(&mut self)679     fn drop(&mut self) {
680         if self.waiter.take().is_some() {
681             trace!("checkout dropped for {:?}", self.key);
682             if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
683                 inner.clean_waiters(&self.key);
684             }
685         }
686     }
687 }
688 
689 // FIXME: allow() required due to `impl Trait` leaking types to this lint
690 #[allow(missing_debug_implementations)]
691 pub(super) struct Connecting<T: Poolable> {
692     key: Key,
693     pool: WeakOpt<Mutex<PoolInner<T>>>,
694 }
695 
696 impl<T: Poolable> Connecting<T> {
alpn_h2(self, pool: &Pool<T>) -> Option<Self>697     pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
698         debug_assert!(
699             self.pool.0.is_none(),
700             "Connecting::alpn_h2 but already Http2"
701         );
702 
703         pool.connecting(&self.key, Ver::Http2)
704     }
705 }
706 
707 impl<T: Poolable> Drop for Connecting<T> {
drop(&mut self)708     fn drop(&mut self) {
709         if let Some(pool) = self.pool.upgrade() {
710             // No need to panic on drop, that could abort!
711             if let Ok(mut inner) = pool.lock() {
712                 inner.connected(&self.key);
713             }
714         }
715     }
716 }
717 
718 struct Expiration(Option<Duration>);
719 
720 impl Expiration {
new(dur: Option<Duration>) -> Expiration721     fn new(dur: Option<Duration>) -> Expiration {
722         Expiration(dur)
723     }
724 
expires(&self, instant: Instant) -> bool725     fn expires(&self, instant: Instant) -> bool {
726         match self.0 {
727             // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470.
728             Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout,
729             None => false,
730         }
731     }
732 }
733 
734 #[cfg(feature = "runtime")]
735 pin_project_lite::pin_project! {
736     struct IdleTask<T> {
737         #[pin]
738         interval: Interval,
739         pool: WeakOpt<Mutex<PoolInner<T>>>,
740         // This allows the IdleTask to be notified as soon as the entire
741         // Pool is fully dropped, and shutdown. This channel is never sent on,
742         // but Err(Canceled) will be received when the Pool is dropped.
743         #[pin]
744         pool_drop_notifier: oneshot::Receiver<std::convert::Infallible>,
745     }
746 }
747 
748 #[cfg(feature = "runtime")]
749 impl<T: Poolable + 'static> Future for IdleTask<T> {
750     type Output = ();
751 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>752     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
753         let mut this = self.project();
754         loop {
755             match this.pool_drop_notifier.as_mut().poll(cx) {
756                 Poll::Ready(Ok(n)) => match n {},
757                 Poll::Pending => (),
758                 Poll::Ready(Err(_canceled)) => {
759                     trace!("pool closed, canceling idle interval");
760                     return Poll::Ready(());
761                 }
762             }
763 
764             ready!(this.interval.as_mut().poll_tick(cx));
765 
766             if let Some(inner) = this.pool.upgrade() {
767                 if let Ok(mut inner) = inner.lock() {
768                     trace!("idle interval checking for expired");
769                     inner.clear_expired();
770                     continue;
771                 }
772             }
773             return Poll::Ready(());
774         }
775     }
776 }
777 
778 impl<T> WeakOpt<T> {
none() -> Self779     fn none() -> Self {
780         WeakOpt(None)
781     }
782 
downgrade(arc: &Arc<T>) -> Self783     fn downgrade(arc: &Arc<T>) -> Self {
784         WeakOpt(Some(Arc::downgrade(arc)))
785     }
786 
upgrade(&self) -> Option<Arc<T>>787     fn upgrade(&self) -> Option<Arc<T>> {
788         self.0.as_ref().and_then(Weak::upgrade)
789     }
790 }
791 
792 #[cfg(test)]
793 mod tests {
794     use std::future::Future;
795     use std::pin::Pin;
796     use std::task::Context;
797     use std::task::Poll;
798     use std::time::Duration;
799 
800     use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
801     use crate::common::exec::Exec;
802 
803     /// Test unique reservations.
804     #[derive(Debug, PartialEq, Eq)]
805     struct Uniq<T>(T);
806 
807     impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
is_open(&self) -> bool808         fn is_open(&self) -> bool {
809             true
810         }
811 
reserve(self) -> Reservation<Self>812         fn reserve(self) -> Reservation<Self> {
813             Reservation::Unique(self)
814         }
815 
can_share(&self) -> bool816         fn can_share(&self) -> bool {
817             false
818         }
819     }
820 
c<T: Poolable>(key: Key) -> Connecting<T>821     fn c<T: Poolable>(key: Key) -> Connecting<T> {
822         Connecting {
823             key,
824             pool: WeakOpt::none(),
825         }
826     }
827 
host_key(s: &str) -> Key828     fn host_key(s: &str) -> Key {
829         (http::uri::Scheme::HTTP, s.parse().expect("host key"))
830     }
831 
pool_no_timer<T>() -> Pool<T>832     fn pool_no_timer<T>() -> Pool<T> {
833         pool_max_idle_no_timer(::std::usize::MAX)
834     }
835 
pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T>836     fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
837         let pool = Pool::new(
838             super::Config {
839                 idle_timeout: Some(Duration::from_millis(100)),
840                 max_idle_per_host: max_idle,
841             },
842             &Exec::Default,
843         );
844         pool.no_timer();
845         pool
846     }
847 
848     #[tokio::test]
test_pool_checkout_smoke()849     async fn test_pool_checkout_smoke() {
850         let pool = pool_no_timer();
851         let key = host_key("foo");
852         let pooled = pool.pooled(c(key.clone()), Uniq(41));
853 
854         drop(pooled);
855 
856         match pool.checkout(key).await {
857             Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
858             Err(_) => panic!("not ready"),
859         };
860     }
861 
862     /// Helper to check if the future is ready after polling once.
863     struct PollOnce<'a, F>(&'a mut F);
864 
865     impl<F, T, U> Future for PollOnce<'_, F>
866     where
867         F: Future<Output = Result<T, U>> + Unpin,
868     {
869         type Output = Option<()>;
870 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>871         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
872             match Pin::new(&mut self.0).poll(cx) {
873                 Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
874                 Poll::Ready(Err(_)) => Poll::Ready(Some(())),
875                 Poll::Pending => Poll::Ready(None),
876             }
877         }
878     }
879 
880     #[tokio::test]
test_pool_checkout_returns_none_if_expired()881     async fn test_pool_checkout_returns_none_if_expired() {
882         let pool = pool_no_timer();
883         let key = host_key("foo");
884         let pooled = pool.pooled(c(key.clone()), Uniq(41));
885 
886         drop(pooled);
887         tokio::time::sleep(pool.locked().timeout.unwrap()).await;
888         let mut checkout = pool.checkout(key);
889         let poll_once = PollOnce(&mut checkout);
890         let is_not_ready = poll_once.await.is_none();
891         assert!(is_not_ready);
892     }
893 
894     #[cfg(feature = "runtime")]
895     #[tokio::test]
test_pool_checkout_removes_expired()896     async fn test_pool_checkout_removes_expired() {
897         let pool = pool_no_timer();
898         let key = host_key("foo");
899 
900         pool.pooled(c(key.clone()), Uniq(41));
901         pool.pooled(c(key.clone()), Uniq(5));
902         pool.pooled(c(key.clone()), Uniq(99));
903 
904         assert_eq!(
905             pool.locked().idle.get(&key).map(|entries| entries.len()),
906             Some(3)
907         );
908         tokio::time::sleep(pool.locked().timeout.unwrap()).await;
909 
910         let mut checkout = pool.checkout(key.clone());
911         let poll_once = PollOnce(&mut checkout);
912         // checkout.await should clean out the expired
913         poll_once.await;
914         assert!(pool.locked().idle.get(&key).is_none());
915     }
916 
917     #[test]
test_pool_max_idle_per_host()918     fn test_pool_max_idle_per_host() {
919         let pool = pool_max_idle_no_timer(2);
920         let key = host_key("foo");
921 
922         pool.pooled(c(key.clone()), Uniq(41));
923         pool.pooled(c(key.clone()), Uniq(5));
924         pool.pooled(c(key.clone()), Uniq(99));
925 
926         // pooled and dropped 3, max_idle should only allow 2
927         assert_eq!(
928             pool.locked().idle.get(&key).map(|entries| entries.len()),
929             Some(2)
930         );
931     }
932 
933     #[cfg(feature = "runtime")]
934     #[tokio::test]
test_pool_timer_removes_expired()935     async fn test_pool_timer_removes_expired() {
936         let _ = pretty_env_logger::try_init();
937         tokio::time::pause();
938 
939         let pool = Pool::new(
940             super::Config {
941                 idle_timeout: Some(Duration::from_millis(10)),
942                 max_idle_per_host: std::usize::MAX,
943             },
944             &Exec::Default,
945         );
946 
947         let key = host_key("foo");
948 
949         pool.pooled(c(key.clone()), Uniq(41));
950         pool.pooled(c(key.clone()), Uniq(5));
951         pool.pooled(c(key.clone()), Uniq(99));
952 
953         assert_eq!(
954             pool.locked().idle.get(&key).map(|entries| entries.len()),
955             Some(3)
956         );
957 
958         // Let the timer tick passed the expiration...
959         tokio::time::advance(Duration::from_millis(30)).await;
960         // Yield so the Interval can reap...
961         tokio::task::yield_now().await;
962 
963         assert!(pool.locked().idle.get(&key).is_none());
964     }
965 
966     #[tokio::test]
test_pool_checkout_task_unparked()967     async fn test_pool_checkout_task_unparked() {
968         use futures_util::future::join;
969         use futures_util::FutureExt;
970 
971         let pool = pool_no_timer();
972         let key = host_key("foo");
973         let pooled = pool.pooled(c(key.clone()), Uniq(41));
974 
975         let checkout = join(pool.checkout(key), async {
976             // the checkout future will park first,
977             // and then this lazy future will be polled, which will insert
978             // the pooled back into the pool
979             //
980             // this test makes sure that doing so will unpark the checkout
981             drop(pooled);
982         })
983         .map(|(entry, _)| entry);
984 
985         assert_eq!(*checkout.await.unwrap(), Uniq(41));
986     }
987 
988     #[tokio::test]
test_pool_checkout_drop_cleans_up_waiters()989     async fn test_pool_checkout_drop_cleans_up_waiters() {
990         let pool = pool_no_timer::<Uniq<i32>>();
991         let key = host_key("foo");
992 
993         let mut checkout1 = pool.checkout(key.clone());
994         let mut checkout2 = pool.checkout(key.clone());
995 
996         let poll_once1 = PollOnce(&mut checkout1);
997         let poll_once2 = PollOnce(&mut checkout2);
998 
999         // first poll needed to get into Pool's parked
1000         poll_once1.await;
1001         assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1002         poll_once2.await;
1003         assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1004 
1005         // on drop, clean up Pool
1006         drop(checkout1);
1007         assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1008 
1009         drop(checkout2);
1010         assert!(pool.locked().waiters.get(&key).is_none());
1011     }
1012 
1013     #[derive(Debug)]
1014     struct CanClose {
1015         #[allow(unused)]
1016         val: i32,
1017         closed: bool,
1018     }
1019 
1020     impl Poolable for CanClose {
is_open(&self) -> bool1021         fn is_open(&self) -> bool {
1022             !self.closed
1023         }
1024 
reserve(self) -> Reservation<Self>1025         fn reserve(self) -> Reservation<Self> {
1026             Reservation::Unique(self)
1027         }
1028 
can_share(&self) -> bool1029         fn can_share(&self) -> bool {
1030             false
1031         }
1032     }
1033 
1034     #[test]
pooled_drop_if_closed_doesnt_reinsert()1035     fn pooled_drop_if_closed_doesnt_reinsert() {
1036         let pool = pool_no_timer();
1037         let key = host_key("foo");
1038         pool.pooled(
1039             c(key.clone()),
1040             CanClose {
1041                 val: 57,
1042                 closed: true,
1043             },
1044         );
1045 
1046         assert!(!pool.locked().idle.contains_key(&key));
1047     }
1048 }
1049