1 use std::collections::{HashMap, HashSet, VecDeque}; 2 use std::error::Error as StdError; 3 use std::fmt; 4 use std::future::Future; 5 use std::marker::Unpin; 6 use std::ops::{Deref, DerefMut}; 7 use std::pin::Pin; 8 use std::sync::{Arc, Mutex, Weak}; 9 use std::task::{Context, Poll}; 10 11 #[cfg(not(feature = "runtime"))] 12 use std::time::{Duration, Instant}; 13 14 use futures_channel::oneshot; 15 #[cfg(feature = "runtime")] 16 use tokio::time::{Duration, Instant, Interval}; 17 use tracing::{debug, trace}; 18 19 use super::client::Ver; 20 use crate::common::exec::Exec; 21 22 // FIXME: allow() required due to `impl Trait` leaking types to this lint 23 #[allow(missing_debug_implementations)] 24 pub(super) struct Pool<T> { 25 // If the pool is disabled, this is None. 26 inner: Option<Arc<Mutex<PoolInner<T>>>>, 27 } 28 29 // Before using a pooled connection, make sure the sender is not dead. 30 // 31 // This is a trait to allow the `client::pool::tests` to work for `i32`. 32 // 33 // See https://github.com/hyperium/hyper/issues/1429 34 pub(super) trait Poolable: Unpin + Send + Sized + 'static { is_open(&self) -> bool35 fn is_open(&self) -> bool; 36 /// Reserve this connection. 37 /// 38 /// Allows for HTTP/2 to return a shared reservation. reserve(self) -> Reservation<Self>39 fn reserve(self) -> Reservation<Self>; can_share(&self) -> bool40 fn can_share(&self) -> bool; 41 } 42 43 /// When checking out a pooled connection, it might be that the connection 44 /// only supports a single reservation, or it might be usable for many. 45 /// 46 /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be 47 /// used for multiple requests. 48 // FIXME: allow() required due to `impl Trait` leaking types to this lint 49 #[allow(missing_debug_implementations)] 50 pub(super) enum Reservation<T> { 51 /// This connection could be used multiple times, the first one will be 52 /// reinserted into the `idle` pool, and the second will be given to 53 /// the `Checkout`. 54 #[cfg(feature = "http2")] 55 Shared(T, T), 56 /// This connection requires unique access. It will be returned after 57 /// use is complete. 58 Unique(T), 59 } 60 61 /// Simple type alias in case the key type needs to be adjusted. 62 pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>; 63 64 struct PoolInner<T> { 65 // A flag that a connection is being established, and the connection 66 // should be shared. This prevents making multiple HTTP/2 connections 67 // to the same host. 68 connecting: HashSet<Key>, 69 // These are internal Conns sitting in the event loop in the KeepAlive 70 // state, waiting to receive a new Request to send on the socket. 71 idle: HashMap<Key, Vec<Idle<T>>>, 72 max_idle_per_host: usize, 73 // These are outstanding Checkouts that are waiting for a socket to be 74 // able to send a Request one. This is used when "racing" for a new 75 // connection. 76 // 77 // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait 78 // for the Pool to receive an idle Conn. When a Conn becomes idle, 79 // this list is checked for any parked Checkouts, and tries to notify 80 // them that the Conn could be used instead of waiting for a brand new 81 // connection. 82 waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>, 83 // A oneshot channel is used to allow the interval to be notified when 84 // the Pool completely drops. That way, the interval can cancel immediately. 85 #[cfg(feature = "runtime")] 86 idle_interval_ref: Option<oneshot::Sender<std::convert::Infallible>>, 87 #[cfg(feature = "runtime")] 88 exec: Exec, 89 timeout: Option<Duration>, 90 } 91 92 // This is because `Weak::new()` *allocates* space for `T`, even if it 93 // doesn't need it! 94 struct WeakOpt<T>(Option<Weak<T>>); 95 96 #[derive(Clone, Copy, Debug)] 97 pub(super) struct Config { 98 pub(super) idle_timeout: Option<Duration>, 99 pub(super) max_idle_per_host: usize, 100 } 101 102 impl Config { is_enabled(&self) -> bool103 pub(super) fn is_enabled(&self) -> bool { 104 self.max_idle_per_host > 0 105 } 106 } 107 108 impl<T> Pool<T> { new(config: Config, __exec: &Exec) -> Pool<T>109 pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> { 110 let inner = if config.is_enabled() { 111 Some(Arc::new(Mutex::new(PoolInner { 112 connecting: HashSet::new(), 113 idle: HashMap::new(), 114 #[cfg(feature = "runtime")] 115 idle_interval_ref: None, 116 max_idle_per_host: config.max_idle_per_host, 117 waiters: HashMap::new(), 118 #[cfg(feature = "runtime")] 119 exec: __exec.clone(), 120 timeout: config.idle_timeout.filter(|&t| t > Duration::ZERO), 121 }))) 122 } else { 123 None 124 }; 125 126 Pool { inner } 127 } 128 is_enabled(&self) -> bool129 fn is_enabled(&self) -> bool { 130 self.inner.is_some() 131 } 132 133 #[cfg(test)] no_timer(&self)134 pub(super) fn no_timer(&self) { 135 // Prevent an actual interval from being created for this pool... 136 #[cfg(feature = "runtime")] 137 { 138 let mut inner = self.inner.as_ref().unwrap().lock().unwrap(); 139 assert!(inner.idle_interval_ref.is_none(), "timer already spawned"); 140 let (tx, _) = oneshot::channel(); 141 inner.idle_interval_ref = Some(tx); 142 } 143 } 144 } 145 146 impl<T: Poolable> Pool<T> { 147 /// Returns a `Checkout` which is a future that resolves if an idle 148 /// connection becomes available. checkout(&self, key: Key) -> Checkout<T>149 pub(super) fn checkout(&self, key: Key) -> Checkout<T> { 150 Checkout { 151 key, 152 pool: self.clone(), 153 waiter: None, 154 } 155 } 156 157 /// Ensure that there is only ever 1 connecting task for HTTP/2 158 /// connections. This does nothing for HTTP/1. connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>>159 pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> { 160 if ver == Ver::Http2 { 161 if let Some(ref enabled) = self.inner { 162 let mut inner = enabled.lock().unwrap(); 163 return if inner.connecting.insert(key.clone()) { 164 let connecting = Connecting { 165 key: key.clone(), 166 pool: WeakOpt::downgrade(enabled), 167 }; 168 Some(connecting) 169 } else { 170 trace!("HTTP/2 connecting already in progress for {:?}", key); 171 None 172 }; 173 } 174 } 175 176 // else 177 Some(Connecting { 178 key: key.clone(), 179 // in HTTP/1's case, there is never a lock, so we don't 180 // need to do anything in Drop. 181 pool: WeakOpt::none(), 182 }) 183 } 184 185 #[cfg(test)] locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>>186 fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> { 187 self.inner.as_ref().expect("enabled").lock().expect("lock") 188 } 189 190 /* Used in client/tests.rs... 191 #[cfg(feature = "runtime")] 192 #[cfg(test)] 193 pub(super) fn h1_key(&self, s: &str) -> Key { 194 Arc::new(s.to_string()) 195 } 196 197 #[cfg(feature = "runtime")] 198 #[cfg(test)] 199 pub(super) fn idle_count(&self, key: &Key) -> usize { 200 self 201 .locked() 202 .idle 203 .get(key) 204 .map(|list| list.len()) 205 .unwrap_or(0) 206 } 207 */ 208 pooled( &self, #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>, value: T, ) -> Pooled<T>209 pub(super) fn pooled( 210 &self, 211 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>, 212 value: T, 213 ) -> Pooled<T> { 214 let (value, pool_ref) = if let Some(ref enabled) = self.inner { 215 match value.reserve() { 216 #[cfg(feature = "http2")] 217 Reservation::Shared(to_insert, to_return) => { 218 let mut inner = enabled.lock().unwrap(); 219 inner.put(connecting.key.clone(), to_insert, enabled); 220 // Do this here instead of Drop for Connecting because we 221 // already have a lock, no need to lock the mutex twice. 222 inner.connected(&connecting.key); 223 // prevent the Drop of Connecting from repeating inner.connected() 224 connecting.pool = WeakOpt::none(); 225 226 // Shared reservations don't need a reference to the pool, 227 // since the pool always keeps a copy. 228 (to_return, WeakOpt::none()) 229 } 230 Reservation::Unique(value) => { 231 // Unique reservations must take a reference to the pool 232 // since they hope to reinsert once the reservation is 233 // completed 234 (value, WeakOpt::downgrade(enabled)) 235 } 236 } 237 } else { 238 // If pool is not enabled, skip all the things... 239 240 // The Connecting should have had no pool ref 241 debug_assert!(connecting.pool.upgrade().is_none()); 242 243 (value, WeakOpt::none()) 244 }; 245 Pooled { 246 key: connecting.key.clone(), 247 is_reused: false, 248 pool: pool_ref, 249 value: Some(value), 250 } 251 } 252 reuse(&self, key: &Key, value: T) -> Pooled<T>253 fn reuse(&self, key: &Key, value: T) -> Pooled<T> { 254 debug!("reuse idle connection for {:?}", key); 255 // TODO: unhack this 256 // In Pool::pooled(), which is used for inserting brand new connections, 257 // there's some code that adjusts the pool reference taken depending 258 // on if the Reservation can be shared or is unique. By the time 259 // reuse() is called, the reservation has already been made, and 260 // we just have the final value, without knowledge of if this is 261 // unique or shared. So, the hack is to just assume Ver::Http2 means 262 // shared... :( 263 let mut pool_ref = WeakOpt::none(); 264 if !value.can_share() { 265 if let Some(ref enabled) = self.inner { 266 pool_ref = WeakOpt::downgrade(enabled); 267 } 268 } 269 270 Pooled { 271 is_reused: true, 272 key: key.clone(), 273 pool: pool_ref, 274 value: Some(value), 275 } 276 } 277 } 278 279 /// Pop off this list, looking for a usable connection that hasn't expired. 280 struct IdlePopper<'a, T> { 281 key: &'a Key, 282 list: &'a mut Vec<Idle<T>>, 283 } 284 285 impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { pop(self, expiration: &Expiration) -> Option<Idle<T>>286 fn pop(self, expiration: &Expiration) -> Option<Idle<T>> { 287 while let Some(entry) = self.list.pop() { 288 // If the connection has been closed, or is older than our idle 289 // timeout, simply drop it and keep looking... 290 if !entry.value.is_open() { 291 trace!("removing closed connection for {:?}", self.key); 292 continue; 293 } 294 // TODO: Actually, since the `idle` list is pushed to the end always, 295 // that would imply that if *this* entry is expired, then anything 296 // "earlier" in the list would *have* to be expired also... Right? 297 // 298 // In that case, we could just break out of the loop and drop the 299 // whole list... 300 if expiration.expires(entry.idle_at) { 301 trace!("removing expired connection for {:?}", self.key); 302 continue; 303 } 304 305 let value = match entry.value.reserve() { 306 #[cfg(feature = "http2")] 307 Reservation::Shared(to_reinsert, to_checkout) => { 308 self.list.push(Idle { 309 idle_at: Instant::now(), 310 value: to_reinsert, 311 }); 312 to_checkout 313 } 314 Reservation::Unique(unique) => unique, 315 }; 316 317 return Some(Idle { 318 idle_at: entry.idle_at, 319 value, 320 }); 321 } 322 323 None 324 } 325 } 326 327 impl<T: Poolable> PoolInner<T> { put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>)328 fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) { 329 if value.can_share() && self.idle.contains_key(&key) { 330 trace!("put; existing idle HTTP/2 connection for {:?}", key); 331 return; 332 } 333 trace!("put; add idle connection for {:?}", key); 334 let mut remove_waiters = false; 335 let mut value = Some(value); 336 if let Some(waiters) = self.waiters.get_mut(&key) { 337 while let Some(tx) = waiters.pop_front() { 338 if !tx.is_canceled() { 339 let reserved = value.take().expect("value already sent"); 340 let reserved = match reserved.reserve() { 341 #[cfg(feature = "http2")] 342 Reservation::Shared(to_keep, to_send) => { 343 value = Some(to_keep); 344 to_send 345 } 346 Reservation::Unique(uniq) => uniq, 347 }; 348 match tx.send(reserved) { 349 Ok(()) => { 350 if value.is_none() { 351 break; 352 } else { 353 continue; 354 } 355 } 356 Err(e) => { 357 value = Some(e); 358 } 359 } 360 } 361 362 trace!("put; removing canceled waiter for {:?}", key); 363 } 364 remove_waiters = waiters.is_empty(); 365 } 366 if remove_waiters { 367 self.waiters.remove(&key); 368 } 369 370 match value { 371 Some(value) => { 372 // borrow-check scope... 373 { 374 let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new); 375 if self.max_idle_per_host <= idle_list.len() { 376 trace!("max idle per host for {:?}, dropping connection", key); 377 return; 378 } 379 380 debug!("pooling idle connection for {:?}", key); 381 idle_list.push(Idle { 382 value, 383 idle_at: Instant::now(), 384 }); 385 } 386 387 #[cfg(feature = "runtime")] 388 { 389 self.spawn_idle_interval(__pool_ref); 390 } 391 } 392 None => trace!("put; found waiter for {:?}", key), 393 } 394 } 395 396 /// A `Connecting` task is complete. Not necessarily successfully, 397 /// but the lock is going away, so clean up. connected(&mut self, key: &Key)398 fn connected(&mut self, key: &Key) { 399 let existed = self.connecting.remove(key); 400 debug_assert!(existed, "Connecting dropped, key not in pool.connecting"); 401 // cancel any waiters. if there are any, it's because 402 // this Connecting task didn't complete successfully. 403 // those waiters would never receive a connection. 404 self.waiters.remove(key); 405 } 406 407 #[cfg(feature = "runtime")] spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>)408 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) { 409 let (dur, rx) = { 410 if self.idle_interval_ref.is_some() { 411 return; 412 } 413 414 if let Some(dur) = self.timeout { 415 let (tx, rx) = oneshot::channel(); 416 self.idle_interval_ref = Some(tx); 417 (dur, rx) 418 } else { 419 return; 420 } 421 }; 422 423 let interval = IdleTask { 424 interval: tokio::time::interval(dur), 425 pool: WeakOpt::downgrade(pool_ref), 426 pool_drop_notifier: rx, 427 }; 428 429 self.exec.execute(interval); 430 } 431 } 432 433 impl<T> PoolInner<T> { 434 /// Any `FutureResponse`s that were created will have made a `Checkout`, 435 /// and possibly inserted into the pool that it is waiting for an idle 436 /// connection. If a user ever dropped that future, we need to clean out 437 /// those parked senders. clean_waiters(&mut self, key: &Key)438 fn clean_waiters(&mut self, key: &Key) { 439 let mut remove_waiters = false; 440 if let Some(waiters) = self.waiters.get_mut(key) { 441 waiters.retain(|tx| !tx.is_canceled()); 442 remove_waiters = waiters.is_empty(); 443 } 444 if remove_waiters { 445 self.waiters.remove(key); 446 } 447 } 448 } 449 450 #[cfg(feature = "runtime")] 451 impl<T: Poolable> PoolInner<T> { 452 /// This should *only* be called by the IdleTask clear_expired(&mut self)453 fn clear_expired(&mut self) { 454 let dur = self.timeout.expect("interval assumes timeout"); 455 456 let now = Instant::now(); 457 //self.last_idle_check_at = now; 458 459 self.idle.retain(|key, values| { 460 values.retain(|entry| { 461 if !entry.value.is_open() { 462 trace!("idle interval evicting closed for {:?}", key); 463 return false; 464 } 465 466 // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470. 467 if now.saturating_duration_since(entry.idle_at) > dur { 468 trace!("idle interval evicting expired for {:?}", key); 469 return false; 470 } 471 472 // Otherwise, keep this value... 473 true 474 }); 475 476 // returning false evicts this key/val 477 !values.is_empty() 478 }); 479 } 480 } 481 482 impl<T> Clone for Pool<T> { clone(&self) -> Pool<T>483 fn clone(&self) -> Pool<T> { 484 Pool { 485 inner: self.inner.clone(), 486 } 487 } 488 } 489 490 /// A wrapped poolable value that tries to reinsert to the Pool on Drop. 491 // Note: The bounds `T: Poolable` is needed for the Drop impl. 492 pub(super) struct Pooled<T: Poolable> { 493 value: Option<T>, 494 is_reused: bool, 495 key: Key, 496 pool: WeakOpt<Mutex<PoolInner<T>>>, 497 } 498 499 impl<T: Poolable> Pooled<T> { is_reused(&self) -> bool500 pub(super) fn is_reused(&self) -> bool { 501 self.is_reused 502 } 503 is_pool_enabled(&self) -> bool504 pub(super) fn is_pool_enabled(&self) -> bool { 505 self.pool.0.is_some() 506 } 507 as_ref(&self) -> &T508 fn as_ref(&self) -> &T { 509 self.value.as_ref().expect("not dropped") 510 } 511 as_mut(&mut self) -> &mut T512 fn as_mut(&mut self) -> &mut T { 513 self.value.as_mut().expect("not dropped") 514 } 515 } 516 517 impl<T: Poolable> Deref for Pooled<T> { 518 type Target = T; deref(&self) -> &T519 fn deref(&self) -> &T { 520 self.as_ref() 521 } 522 } 523 524 impl<T: Poolable> DerefMut for Pooled<T> { deref_mut(&mut self) -> &mut T525 fn deref_mut(&mut self) -> &mut T { 526 self.as_mut() 527 } 528 } 529 530 impl<T: Poolable> Drop for Pooled<T> { drop(&mut self)531 fn drop(&mut self) { 532 if let Some(value) = self.value.take() { 533 if !value.is_open() { 534 // If we *already* know the connection is done here, 535 // it shouldn't be re-inserted back into the pool. 536 return; 537 } 538 539 if let Some(pool) = self.pool.upgrade() { 540 if let Ok(mut inner) = pool.lock() { 541 inner.put(self.key.clone(), value, &pool); 542 } 543 } else if !value.can_share() { 544 trace!("pool dropped, dropping pooled ({:?})", self.key); 545 } 546 // Ver::Http2 is already in the Pool (or dead), so we wouldn't 547 // have an actual reference to the Pool. 548 } 549 } 550 } 551 552 impl<T: Poolable> fmt::Debug for Pooled<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result553 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 554 f.debug_struct("Pooled").field("key", &self.key).finish() 555 } 556 } 557 558 struct Idle<T> { 559 idle_at: Instant, 560 value: T, 561 } 562 563 // FIXME: allow() required due to `impl Trait` leaking types to this lint 564 #[allow(missing_debug_implementations)] 565 pub(super) struct Checkout<T> { 566 key: Key, 567 pool: Pool<T>, 568 waiter: Option<oneshot::Receiver<T>>, 569 } 570 571 #[derive(Debug)] 572 pub(super) struct CheckoutIsClosedError; 573 574 impl StdError for CheckoutIsClosedError {} 575 576 impl fmt::Display for CheckoutIsClosedError { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result577 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 578 f.write_str("checked out connection was closed") 579 } 580 } 581 582 impl<T: Poolable> Checkout<T> { poll_waiter(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Pooled<T>>>>583 fn poll_waiter(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Pooled<T>>>> { 584 if let Some(mut rx) = self.waiter.take() { 585 match Pin::new(&mut rx).poll(cx) { 586 Poll::Ready(Ok(value)) => { 587 if value.is_open() { 588 Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value)))) 589 } else { 590 Poll::Ready(Some(Err( 591 crate::Error::new_canceled().with(CheckoutIsClosedError) 592 ))) 593 } 594 } 595 Poll::Pending => { 596 self.waiter = Some(rx); 597 Poll::Pending 598 } 599 Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err( 600 crate::Error::new_canceled().with("request has been canceled") 601 ))), 602 } 603 } else { 604 Poll::Ready(None) 605 } 606 } 607 checkout(&mut self, cx: &mut Context<'_>) -> Option<Pooled<T>>608 fn checkout(&mut self, cx: &mut Context<'_>) -> Option<Pooled<T>> { 609 let entry = { 610 let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); 611 let expiration = Expiration::new(inner.timeout); 612 let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| { 613 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0); 614 // A block to end the mutable borrow on list, 615 // so the map below can check is_empty() 616 { 617 let popper = IdlePopper { 618 key: &self.key, 619 list, 620 }; 621 popper.pop(&expiration) 622 } 623 .map(|e| (e, list.is_empty())) 624 }); 625 626 let (entry, empty) = if let Some((e, empty)) = maybe_entry { 627 (Some(e), empty) 628 } else { 629 // No entry found means nuke the list for sure. 630 (None, true) 631 }; 632 if empty { 633 //TODO: This could be done with the HashMap::entry API instead. 634 inner.idle.remove(&self.key); 635 } 636 637 if entry.is_none() && self.waiter.is_none() { 638 let (tx, mut rx) = oneshot::channel(); 639 trace!("checkout waiting for idle connection: {:?}", self.key); 640 inner 641 .waiters 642 .entry(self.key.clone()) 643 .or_insert_with(VecDeque::new) 644 .push_back(tx); 645 646 // register the waker with this oneshot 647 assert!(Pin::new(&mut rx).poll(cx).is_pending()); 648 self.waiter = Some(rx); 649 } 650 651 entry 652 }; 653 654 entry.map(|e| self.pool.reuse(&self.key, e.value)) 655 } 656 } 657 658 impl<T: Poolable> Future for Checkout<T> { 659 type Output = crate::Result<Pooled<T>>; 660 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>661 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 662 if let Some(pooled) = ready!(self.poll_waiter(cx)?) { 663 return Poll::Ready(Ok(pooled)); 664 } 665 666 if let Some(pooled) = self.checkout(cx) { 667 Poll::Ready(Ok(pooled)) 668 } else if !self.pool.is_enabled() { 669 Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled"))) 670 } else { 671 // There's a new waiter, already registered in self.checkout() 672 debug_assert!(self.waiter.is_some()); 673 Poll::Pending 674 } 675 } 676 } 677 678 impl<T> Drop for Checkout<T> { drop(&mut self)679 fn drop(&mut self) { 680 if self.waiter.take().is_some() { 681 trace!("checkout dropped for {:?}", self.key); 682 if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) { 683 inner.clean_waiters(&self.key); 684 } 685 } 686 } 687 } 688 689 // FIXME: allow() required due to `impl Trait` leaking types to this lint 690 #[allow(missing_debug_implementations)] 691 pub(super) struct Connecting<T: Poolable> { 692 key: Key, 693 pool: WeakOpt<Mutex<PoolInner<T>>>, 694 } 695 696 impl<T: Poolable> Connecting<T> { alpn_h2(self, pool: &Pool<T>) -> Option<Self>697 pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> { 698 debug_assert!( 699 self.pool.0.is_none(), 700 "Connecting::alpn_h2 but already Http2" 701 ); 702 703 pool.connecting(&self.key, Ver::Http2) 704 } 705 } 706 707 impl<T: Poolable> Drop for Connecting<T> { drop(&mut self)708 fn drop(&mut self) { 709 if let Some(pool) = self.pool.upgrade() { 710 // No need to panic on drop, that could abort! 711 if let Ok(mut inner) = pool.lock() { 712 inner.connected(&self.key); 713 } 714 } 715 } 716 } 717 718 struct Expiration(Option<Duration>); 719 720 impl Expiration { new(dur: Option<Duration>) -> Expiration721 fn new(dur: Option<Duration>) -> Expiration { 722 Expiration(dur) 723 } 724 expires(&self, instant: Instant) -> bool725 fn expires(&self, instant: Instant) -> bool { 726 match self.0 { 727 // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470. 728 Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout, 729 None => false, 730 } 731 } 732 } 733 734 #[cfg(feature = "runtime")] 735 pin_project_lite::pin_project! { 736 struct IdleTask<T> { 737 #[pin] 738 interval: Interval, 739 pool: WeakOpt<Mutex<PoolInner<T>>>, 740 // This allows the IdleTask to be notified as soon as the entire 741 // Pool is fully dropped, and shutdown. This channel is never sent on, 742 // but Err(Canceled) will be received when the Pool is dropped. 743 #[pin] 744 pool_drop_notifier: oneshot::Receiver<std::convert::Infallible>, 745 } 746 } 747 748 #[cfg(feature = "runtime")] 749 impl<T: Poolable + 'static> Future for IdleTask<T> { 750 type Output = (); 751 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>752 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 753 let mut this = self.project(); 754 loop { 755 match this.pool_drop_notifier.as_mut().poll(cx) { 756 Poll::Ready(Ok(n)) => match n {}, 757 Poll::Pending => (), 758 Poll::Ready(Err(_canceled)) => { 759 trace!("pool closed, canceling idle interval"); 760 return Poll::Ready(()); 761 } 762 } 763 764 ready!(this.interval.as_mut().poll_tick(cx)); 765 766 if let Some(inner) = this.pool.upgrade() { 767 if let Ok(mut inner) = inner.lock() { 768 trace!("idle interval checking for expired"); 769 inner.clear_expired(); 770 continue; 771 } 772 } 773 return Poll::Ready(()); 774 } 775 } 776 } 777 778 impl<T> WeakOpt<T> { none() -> Self779 fn none() -> Self { 780 WeakOpt(None) 781 } 782 downgrade(arc: &Arc<T>) -> Self783 fn downgrade(arc: &Arc<T>) -> Self { 784 WeakOpt(Some(Arc::downgrade(arc))) 785 } 786 upgrade(&self) -> Option<Arc<T>>787 fn upgrade(&self) -> Option<Arc<T>> { 788 self.0.as_ref().and_then(Weak::upgrade) 789 } 790 } 791 792 #[cfg(test)] 793 mod tests { 794 use std::future::Future; 795 use std::pin::Pin; 796 use std::task::Context; 797 use std::task::Poll; 798 use std::time::Duration; 799 800 use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; 801 use crate::common::exec::Exec; 802 803 /// Test unique reservations. 804 #[derive(Debug, PartialEq, Eq)] 805 struct Uniq<T>(T); 806 807 impl<T: Send + 'static + Unpin> Poolable for Uniq<T> { is_open(&self) -> bool808 fn is_open(&self) -> bool { 809 true 810 } 811 reserve(self) -> Reservation<Self>812 fn reserve(self) -> Reservation<Self> { 813 Reservation::Unique(self) 814 } 815 can_share(&self) -> bool816 fn can_share(&self) -> bool { 817 false 818 } 819 } 820 c<T: Poolable>(key: Key) -> Connecting<T>821 fn c<T: Poolable>(key: Key) -> Connecting<T> { 822 Connecting { 823 key, 824 pool: WeakOpt::none(), 825 } 826 } 827 host_key(s: &str) -> Key828 fn host_key(s: &str) -> Key { 829 (http::uri::Scheme::HTTP, s.parse().expect("host key")) 830 } 831 pool_no_timer<T>() -> Pool<T>832 fn pool_no_timer<T>() -> Pool<T> { 833 pool_max_idle_no_timer(::std::usize::MAX) 834 } 835 pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T>836 fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> { 837 let pool = Pool::new( 838 super::Config { 839 idle_timeout: Some(Duration::from_millis(100)), 840 max_idle_per_host: max_idle, 841 }, 842 &Exec::Default, 843 ); 844 pool.no_timer(); 845 pool 846 } 847 848 #[tokio::test] test_pool_checkout_smoke()849 async fn test_pool_checkout_smoke() { 850 let pool = pool_no_timer(); 851 let key = host_key("foo"); 852 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 853 854 drop(pooled); 855 856 match pool.checkout(key).await { 857 Ok(pooled) => assert_eq!(*pooled, Uniq(41)), 858 Err(_) => panic!("not ready"), 859 }; 860 } 861 862 /// Helper to check if the future is ready after polling once. 863 struct PollOnce<'a, F>(&'a mut F); 864 865 impl<F, T, U> Future for PollOnce<'_, F> 866 where 867 F: Future<Output = Result<T, U>> + Unpin, 868 { 869 type Output = Option<()>; 870 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>871 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 872 match Pin::new(&mut self.0).poll(cx) { 873 Poll::Ready(Ok(_)) => Poll::Ready(Some(())), 874 Poll::Ready(Err(_)) => Poll::Ready(Some(())), 875 Poll::Pending => Poll::Ready(None), 876 } 877 } 878 } 879 880 #[tokio::test] test_pool_checkout_returns_none_if_expired()881 async fn test_pool_checkout_returns_none_if_expired() { 882 let pool = pool_no_timer(); 883 let key = host_key("foo"); 884 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 885 886 drop(pooled); 887 tokio::time::sleep(pool.locked().timeout.unwrap()).await; 888 let mut checkout = pool.checkout(key); 889 let poll_once = PollOnce(&mut checkout); 890 let is_not_ready = poll_once.await.is_none(); 891 assert!(is_not_ready); 892 } 893 894 #[cfg(feature = "runtime")] 895 #[tokio::test] test_pool_checkout_removes_expired()896 async fn test_pool_checkout_removes_expired() { 897 let pool = pool_no_timer(); 898 let key = host_key("foo"); 899 900 pool.pooled(c(key.clone()), Uniq(41)); 901 pool.pooled(c(key.clone()), Uniq(5)); 902 pool.pooled(c(key.clone()), Uniq(99)); 903 904 assert_eq!( 905 pool.locked().idle.get(&key).map(|entries| entries.len()), 906 Some(3) 907 ); 908 tokio::time::sleep(pool.locked().timeout.unwrap()).await; 909 910 let mut checkout = pool.checkout(key.clone()); 911 let poll_once = PollOnce(&mut checkout); 912 // checkout.await should clean out the expired 913 poll_once.await; 914 assert!(pool.locked().idle.get(&key).is_none()); 915 } 916 917 #[test] test_pool_max_idle_per_host()918 fn test_pool_max_idle_per_host() { 919 let pool = pool_max_idle_no_timer(2); 920 let key = host_key("foo"); 921 922 pool.pooled(c(key.clone()), Uniq(41)); 923 pool.pooled(c(key.clone()), Uniq(5)); 924 pool.pooled(c(key.clone()), Uniq(99)); 925 926 // pooled and dropped 3, max_idle should only allow 2 927 assert_eq!( 928 pool.locked().idle.get(&key).map(|entries| entries.len()), 929 Some(2) 930 ); 931 } 932 933 #[cfg(feature = "runtime")] 934 #[tokio::test] test_pool_timer_removes_expired()935 async fn test_pool_timer_removes_expired() { 936 let _ = pretty_env_logger::try_init(); 937 tokio::time::pause(); 938 939 let pool = Pool::new( 940 super::Config { 941 idle_timeout: Some(Duration::from_millis(10)), 942 max_idle_per_host: std::usize::MAX, 943 }, 944 &Exec::Default, 945 ); 946 947 let key = host_key("foo"); 948 949 pool.pooled(c(key.clone()), Uniq(41)); 950 pool.pooled(c(key.clone()), Uniq(5)); 951 pool.pooled(c(key.clone()), Uniq(99)); 952 953 assert_eq!( 954 pool.locked().idle.get(&key).map(|entries| entries.len()), 955 Some(3) 956 ); 957 958 // Let the timer tick passed the expiration... 959 tokio::time::advance(Duration::from_millis(30)).await; 960 // Yield so the Interval can reap... 961 tokio::task::yield_now().await; 962 963 assert!(pool.locked().idle.get(&key).is_none()); 964 } 965 966 #[tokio::test] test_pool_checkout_task_unparked()967 async fn test_pool_checkout_task_unparked() { 968 use futures_util::future::join; 969 use futures_util::FutureExt; 970 971 let pool = pool_no_timer(); 972 let key = host_key("foo"); 973 let pooled = pool.pooled(c(key.clone()), Uniq(41)); 974 975 let checkout = join(pool.checkout(key), async { 976 // the checkout future will park first, 977 // and then this lazy future will be polled, which will insert 978 // the pooled back into the pool 979 // 980 // this test makes sure that doing so will unpark the checkout 981 drop(pooled); 982 }) 983 .map(|(entry, _)| entry); 984 985 assert_eq!(*checkout.await.unwrap(), Uniq(41)); 986 } 987 988 #[tokio::test] test_pool_checkout_drop_cleans_up_waiters()989 async fn test_pool_checkout_drop_cleans_up_waiters() { 990 let pool = pool_no_timer::<Uniq<i32>>(); 991 let key = host_key("foo"); 992 993 let mut checkout1 = pool.checkout(key.clone()); 994 let mut checkout2 = pool.checkout(key.clone()); 995 996 let poll_once1 = PollOnce(&mut checkout1); 997 let poll_once2 = PollOnce(&mut checkout2); 998 999 // first poll needed to get into Pool's parked 1000 poll_once1.await; 1001 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); 1002 poll_once2.await; 1003 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); 1004 1005 // on drop, clean up Pool 1006 drop(checkout1); 1007 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); 1008 1009 drop(checkout2); 1010 assert!(pool.locked().waiters.get(&key).is_none()); 1011 } 1012 1013 #[derive(Debug)] 1014 struct CanClose { 1015 #[allow(unused)] 1016 val: i32, 1017 closed: bool, 1018 } 1019 1020 impl Poolable for CanClose { is_open(&self) -> bool1021 fn is_open(&self) -> bool { 1022 !self.closed 1023 } 1024 reserve(self) -> Reservation<Self>1025 fn reserve(self) -> Reservation<Self> { 1026 Reservation::Unique(self) 1027 } 1028 can_share(&self) -> bool1029 fn can_share(&self) -> bool { 1030 false 1031 } 1032 } 1033 1034 #[test] pooled_drop_if_closed_doesnt_reinsert()1035 fn pooled_drop_if_closed_doesnt_reinsert() { 1036 let pool = pool_no_timer(); 1037 let key = host_key("foo"); 1038 pool.pooled( 1039 c(key.clone()), 1040 CanClose { 1041 val: 57, 1042 closed: true, 1043 }, 1044 ); 1045 1046 assert!(!pool.locked().idle.contains_key(&key)); 1047 } 1048 } 1049