1 //! This module defines a load-balanced pool of services that adds new services when load is high. 2 //! 3 //! The pool uses `poll_ready` as a signal indicating whether additional services should be spawned 4 //! to handle the current level of load. Specifically, every time `poll_ready` on the inner service 5 //! returns `Ready`, [`Pool`] consider that a 0, and every time it returns `Pending`, [`Pool`] 6 //! considers it a 1. [`Pool`] then maintains an [exponential moving 7 //! average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) over those 8 //! samples, which gives an estimate of how often the underlying service has been ready when it was 9 //! needed "recently" (see [`Builder::urgency`]). If the service is loaded (see 10 //! [`Builder::loaded_above`]), a new service is created and added to the underlying [`Balance`]. 11 //! If the service is underutilized (see [`Builder::underutilized_below`]) and there are two or 12 //! more services, then the latest added service is removed. In either case, the load estimate is 13 //! reset to its initial value (see [`Builder::initial`] to prevent services from being rapidly 14 //! added or removed. 15 #![deny(missing_docs)] 16 17 use super::p2c::Balance; 18 use crate::discover::Change; 19 use crate::load::Load; 20 use crate::make::MakeService; 21 use futures_core::{ready, Stream}; 22 use pin_project_lite::pin_project; 23 use slab::Slab; 24 use std::{ 25 fmt, 26 future::Future, 27 pin::Pin, 28 task::{Context, Poll}, 29 }; 30 use tower_service::Service; 31 32 #[cfg(test)] 33 mod test; 34 35 #[derive(Debug, Clone, Copy, Eq, PartialEq)] 36 enum Level { 37 /// Load is low -- remove a service instance. 38 Low, 39 /// Load is normal -- keep the service set as it is. 40 Normal, 41 /// Load is high -- add another service instance. 42 High, 43 } 44 45 pin_project! { 46 /// A wrapper around `MakeService` that discovers a new service when load is high, and removes a 47 /// service when load is low. See [`Pool`]. 48 pub struct PoolDiscoverer<MS, Target, Request> 49 where 50 MS: MakeService<Target, Request>, 51 { 52 maker: MS, 53 #[pin] 54 making: Option<MS::Future>, 55 target: Target, 56 load: Level, 57 services: Slab<()>, 58 died_tx: tokio::sync::mpsc::UnboundedSender<usize>, 59 #[pin] 60 died_rx: tokio::sync::mpsc::UnboundedReceiver<usize>, 61 limit: Option<usize>, 62 } 63 } 64 65 impl<MS, Target, Request> fmt::Debug for PoolDiscoverer<MS, Target, Request> 66 where 67 MS: MakeService<Target, Request> + fmt::Debug, 68 Target: fmt::Debug, 69 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 71 f.debug_struct("PoolDiscoverer") 72 .field("maker", &self.maker) 73 .field("making", &self.making.is_some()) 74 .field("target", &self.target) 75 .field("load", &self.load) 76 .field("services", &self.services) 77 .field("limit", &self.limit) 78 .finish() 79 } 80 } 81 82 impl<MS, Target, Request> Stream for PoolDiscoverer<MS, Target, Request> 83 where 84 MS: MakeService<Target, Request>, 85 MS::MakeError: Into<crate::BoxError>, 86 MS::Error: Into<crate::BoxError>, 87 Target: Clone, 88 { 89 type Item = Result<Change<usize, DropNotifyService<MS::Service>>, MS::MakeError>; 90 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>91 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 92 let mut this = self.project(); 93 94 while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_recv(cx) { 95 this.services.remove(sid); 96 tracing::trace!( 97 pool.services = this.services.len(), 98 message = "removing dropped service" 99 ); 100 } 101 102 if this.services.is_empty() && this.making.is_none() { 103 let _ = ready!(this.maker.poll_ready(cx))?; 104 tracing::trace!("construct initial pool connection"); 105 this.making 106 .set(Some(this.maker.make_service(this.target.clone()))); 107 } 108 109 if let Level::High = this.load { 110 if this.making.is_none() { 111 if this 112 .limit 113 .map(|limit| this.services.len() >= limit) 114 .unwrap_or(false) 115 { 116 return Poll::Pending; 117 } 118 119 tracing::trace!( 120 pool.services = this.services.len(), 121 message = "decided to add service to loaded pool" 122 ); 123 ready!(this.maker.poll_ready(cx))?; 124 tracing::trace!("making new service"); 125 // TODO: it'd be great if we could avoid the clone here and use, say, &Target 126 this.making 127 .set(Some(this.maker.make_service(this.target.clone()))); 128 } 129 } 130 131 if let Some(fut) = this.making.as_mut().as_pin_mut() { 132 let svc = ready!(fut.poll(cx))?; 133 this.making.set(None); 134 135 let id = this.services.insert(()); 136 let svc = DropNotifyService { 137 svc, 138 id, 139 notify: this.died_tx.clone(), 140 }; 141 tracing::trace!( 142 pool.services = this.services.len(), 143 message = "finished creating new service" 144 ); 145 *this.load = Level::Normal; 146 return Poll::Ready(Some(Ok(Change::Insert(id, svc)))); 147 } 148 149 match this.load { 150 Level::High => { 151 unreachable!("found high load but no Service being made"); 152 } 153 Level::Normal => Poll::Pending, 154 Level::Low if this.services.len() == 1 => Poll::Pending, 155 Level::Low => { 156 *this.load = Level::Normal; 157 // NOTE: this is a little sad -- we'd prefer to kill short-living services 158 let rm = this.services.iter().next().unwrap().0; 159 // note that we _don't_ remove from self.services here 160 // that'll happen automatically on drop 161 tracing::trace!( 162 pool.services = this.services.len(), 163 message = "removing service for over-provisioned pool" 164 ); 165 Poll::Ready(Some(Ok(Change::Remove(rm)))) 166 } 167 } 168 } 169 } 170 171 /// A [builder] that lets you configure how a [`Pool`] determines whether the underlying service is 172 /// loaded or not. See the [module-level documentation](self) and the builder's methods for 173 /// details. 174 /// 175 /// [builder]: https://rust-lang-nursery.github.io/api-guidelines/type-safety.html#builders-enable-construction-of-complex-values-c-builder 176 #[derive(Copy, Clone, Debug)] 177 pub struct Builder { 178 low: f64, 179 high: f64, 180 init: f64, 181 alpha: f64, 182 limit: Option<usize>, 183 } 184 185 impl Default for Builder { default() -> Self186 fn default() -> Self { 187 Builder { 188 init: 0.1, 189 low: 0.00001, 190 high: 0.2, 191 alpha: 0.03, 192 limit: None, 193 } 194 } 195 } 196 197 impl Builder { 198 /// Create a new builder with default values for all load settings. 199 /// 200 /// If you just want to use the defaults, you can just use [`Pool::new`]. new() -> Self201 pub fn new() -> Self { 202 Self::default() 203 } 204 205 /// When the estimated load (see the [module-level docs](self)) drops below this 206 /// threshold, and there are at least two services active, a service is removed. 207 /// 208 /// The default value is 0.01. That is, when one in every 100 `poll_ready` calls return 209 /// `Pending`, then the underlying service is considered underutilized. underutilized_below(&mut self, low: f64) -> &mut Self210 pub fn underutilized_below(&mut self, low: f64) -> &mut Self { 211 self.low = low; 212 self 213 } 214 215 /// When the estimated load (see the [module-level docs](self)) exceeds this 216 /// threshold, and no service is currently in the process of being added, a new service is 217 /// scheduled to be added to the underlying [`Balance`]. 218 /// 219 /// The default value is 0.5. That is, when every other call to `poll_ready` returns 220 /// `Pending`, then the underlying service is considered highly loaded. loaded_above(&mut self, high: f64) -> &mut Self221 pub fn loaded_above(&mut self, high: f64) -> &mut Self { 222 self.high = high; 223 self 224 } 225 226 /// The initial estimated load average. 227 /// 228 /// This is also the value that the estimated load will be reset to whenever a service is added 229 /// or removed. 230 /// 231 /// The default value is 0.1. initial(&mut self, init: f64) -> &mut Self232 pub fn initial(&mut self, init: f64) -> &mut Self { 233 self.init = init; 234 self 235 } 236 237 /// How aggressively the estimated load average is updated. 238 /// 239 /// This is the α parameter of the formula for the [exponential moving 240 /// average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average), and 241 /// dictates how quickly new samples of the current load affect the estimated load. If the 242 /// value is closer to 1, newer samples affect the load average a lot (when α is 1, the load 243 /// average is immediately set to the current load). If the value is closer to 0, newer samples 244 /// affect the load average very little at a time. 245 /// 246 /// The given value is clamped to `[0,1]`. 247 /// 248 /// The default value is 0.05, meaning, in very approximate terms, that each new load sample 249 /// affects the estimated load by 5%. urgency(&mut self, alpha: f64) -> &mut Self250 pub fn urgency(&mut self, alpha: f64) -> &mut Self { 251 self.alpha = alpha.max(0.0).min(1.0); 252 self 253 } 254 255 /// The maximum number of backing `Service` instances to maintain. 256 /// 257 /// When the limit is reached, the load estimate is clamped to the high load threshhold, and no 258 /// new service is spawned. 259 /// 260 /// No maximum limit is imposed by default. max_services(&mut self, limit: Option<usize>) -> &mut Self261 pub fn max_services(&mut self, limit: Option<usize>) -> &mut Self { 262 self.limit = limit; 263 self 264 } 265 266 /// See [`Pool::new`]. build<MS, Target, Request>( &self, make_service: MS, target: Target, ) -> Pool<MS, Target, Request> where MS: MakeService<Target, Request>, MS::Service: Load, <MS::Service as Load>::Metric: std::fmt::Debug, MS::MakeError: Into<crate::BoxError>, MS::Error: Into<crate::BoxError>, Target: Clone,267 pub fn build<MS, Target, Request>( 268 &self, 269 make_service: MS, 270 target: Target, 271 ) -> Pool<MS, Target, Request> 272 where 273 MS: MakeService<Target, Request>, 274 MS::Service: Load, 275 <MS::Service as Load>::Metric: std::fmt::Debug, 276 MS::MakeError: Into<crate::BoxError>, 277 MS::Error: Into<crate::BoxError>, 278 Target: Clone, 279 { 280 let (died_tx, died_rx) = tokio::sync::mpsc::unbounded_channel(); 281 let d = PoolDiscoverer { 282 maker: make_service, 283 making: None, 284 target, 285 load: Level::Normal, 286 services: Slab::new(), 287 died_tx, 288 died_rx, 289 limit: self.limit, 290 }; 291 292 Pool { 293 balance: Balance::new(Box::pin(d)), 294 options: *self, 295 ewma: self.init, 296 } 297 } 298 } 299 300 /// A dynamically sized, load-balanced pool of `Service` instances. 301 pub struct Pool<MS, Target, Request> 302 where 303 MS: MakeService<Target, Request>, 304 MS::MakeError: Into<crate::BoxError>, 305 MS::Error: Into<crate::BoxError>, 306 Target: Clone, 307 { 308 // the Pin<Box<_>> here is needed since Balance requires the Service to be Unpin 309 balance: Balance<Pin<Box<PoolDiscoverer<MS, Target, Request>>>, Request>, 310 options: Builder, 311 ewma: f64, 312 } 313 314 impl<MS, Target, Request> fmt::Debug for Pool<MS, Target, Request> 315 where 316 MS: MakeService<Target, Request> + fmt::Debug, 317 MS::MakeError: Into<crate::BoxError>, 318 MS::Error: Into<crate::BoxError>, 319 Target: Clone + fmt::Debug, 320 MS::Service: fmt::Debug, 321 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 323 f.debug_struct("Pool") 324 .field("balance", &self.balance) 325 .field("options", &self.options) 326 .field("ewma", &self.ewma) 327 .finish() 328 } 329 } 330 331 impl<MS, Target, Request> Pool<MS, Target, Request> 332 where 333 MS: MakeService<Target, Request>, 334 MS::Service: Load, 335 <MS::Service as Load>::Metric: std::fmt::Debug, 336 MS::MakeError: Into<crate::BoxError>, 337 MS::Error: Into<crate::BoxError>, 338 Target: Clone, 339 { 340 /// Construct a new dynamically sized `Pool`. 341 /// 342 /// If many calls to `poll_ready` return `Pending`, `new_service` is used to 343 /// construct another `Service` that is then added to the load-balanced pool. 344 /// If many calls to `poll_ready` succeed, the most recently added `Service` 345 /// is dropped from the pool. new(make_service: MS, target: Target) -> Self346 pub fn new(make_service: MS, target: Target) -> Self { 347 Builder::new().build(make_service, target) 348 } 349 } 350 351 type PinBalance<S, Request> = Balance<Pin<Box<S>>, Request>; 352 353 impl<MS, Target, Req> Service<Req> for Pool<MS, Target, Req> 354 where 355 MS: MakeService<Target, Req>, 356 MS::Service: Load, 357 <MS::Service as Load>::Metric: std::fmt::Debug, 358 MS::MakeError: Into<crate::BoxError>, 359 MS::Error: Into<crate::BoxError>, 360 Target: Clone, 361 { 362 type Response = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Response; 363 type Error = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Error; 364 type Future = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Future; 365 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>366 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 367 if let Poll::Ready(()) = self.balance.poll_ready(cx)? { 368 // services was ready -- there are enough services 369 // update ewma with a 0 sample 370 self.ewma *= 1.0 - self.options.alpha; 371 372 let discover = self.balance.discover_mut().as_mut().project(); 373 if self.ewma < self.options.low { 374 if *discover.load != Level::Low { 375 tracing::trace!({ ewma = %self.ewma }, "pool is over-provisioned"); 376 } 377 *discover.load = Level::Low; 378 379 if discover.services.len() > 1 { 380 // reset EWMA so we don't immediately try to remove another service 381 self.ewma = self.options.init; 382 } 383 } else { 384 if *discover.load != Level::Normal { 385 tracing::trace!({ ewma = %self.ewma }, "pool is appropriately provisioned"); 386 } 387 *discover.load = Level::Normal; 388 } 389 390 return Poll::Ready(Ok(())); 391 } 392 393 let discover = self.balance.discover_mut().as_mut().project(); 394 if discover.making.is_none() { 395 // no services are ready -- we're overloaded 396 // update ewma with a 1 sample 397 self.ewma = self.options.alpha + (1.0 - self.options.alpha) * self.ewma; 398 399 if self.ewma > self.options.high { 400 if *discover.load != Level::High { 401 tracing::trace!({ ewma = %self.ewma }, "pool is under-provisioned"); 402 } 403 *discover.load = Level::High; 404 405 // don't reset the EWMA -- in theory, poll_ready should now start returning 406 // `Ready`, so we won't try to launch another service immediately. 407 // we clamp it to high though in case the # of services is limited. 408 self.ewma = self.options.high; 409 410 // we need to call balance again for PoolDiscover to realize 411 // it can make a new service 412 return self.balance.poll_ready(cx); 413 } else { 414 *discover.load = Level::Normal; 415 } 416 } 417 418 Poll::Pending 419 } 420 call(&mut self, req: Req) -> Self::Future421 fn call(&mut self, req: Req) -> Self::Future { 422 self.balance.call(req) 423 } 424 } 425 426 #[doc(hidden)] 427 #[derive(Debug)] 428 pub struct DropNotifyService<Svc> { 429 svc: Svc, 430 id: usize, 431 notify: tokio::sync::mpsc::UnboundedSender<usize>, 432 } 433 434 impl<Svc> Drop for DropNotifyService<Svc> { drop(&mut self)435 fn drop(&mut self) { 436 let _ = self.notify.send(self.id).is_ok(); 437 } 438 } 439 440 impl<Svc: Load> Load for DropNotifyService<Svc> { 441 type Metric = Svc::Metric; load(&self) -> Self::Metric442 fn load(&self) -> Self::Metric { 443 self.svc.load() 444 } 445 } 446 447 impl<Request, Svc: Service<Request>> Service<Request> for DropNotifyService<Svc> { 448 type Response = Svc::Response; 449 type Future = Svc::Future; 450 type Error = Svc::Error; 451 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>452 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 453 self.svc.poll_ready(cx) 454 } 455 call(&mut self, req: Request) -> Self::Future456 fn call(&mut self, req: Request) -> Self::Future { 457 self.svc.call(req) 458 } 459 } 460