1 //! This module defines a load-balanced pool of services that adds new services when load is high.
2 //!
3 //! The pool uses `poll_ready` as a signal indicating whether additional services should be spawned
4 //! to handle the current level of load. Specifically, every time `poll_ready` on the inner service
5 //! returns `Ready`, [`Pool`] consider that a 0, and every time it returns `Pending`, [`Pool`]
6 //! considers it a 1. [`Pool`] then maintains an [exponential moving
7 //! average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) over those
8 //! samples, which gives an estimate of how often the underlying service has been ready when it was
9 //! needed "recently" (see [`Builder::urgency`]). If the service is loaded (see
10 //! [`Builder::loaded_above`]), a new service is created and added to the underlying [`Balance`].
11 //! If the service is underutilized (see [`Builder::underutilized_below`]) and there are two or
12 //! more services, then the latest added service is removed. In either case, the load estimate is
13 //! reset to its initial value (see [`Builder::initial`] to prevent services from being rapidly
14 //! added or removed.
15 #![deny(missing_docs)]
16 
17 use super::p2c::Balance;
18 use crate::discover::Change;
19 use crate::load::Load;
20 use crate::make::MakeService;
21 use futures_core::{ready, Stream};
22 use pin_project_lite::pin_project;
23 use slab::Slab;
24 use std::{
25     fmt,
26     future::Future,
27     pin::Pin,
28     task::{Context, Poll},
29 };
30 use tower_service::Service;
31 
32 #[cfg(test)]
33 mod test;
34 
35 #[derive(Debug, Clone, Copy, Eq, PartialEq)]
36 enum Level {
37     /// Load is low -- remove a service instance.
38     Low,
39     /// Load is normal -- keep the service set as it is.
40     Normal,
41     /// Load is high -- add another service instance.
42     High,
43 }
44 
45 pin_project! {
46     /// A wrapper around `MakeService` that discovers a new service when load is high, and removes a
47     /// service when load is low. See [`Pool`].
48     pub struct PoolDiscoverer<MS, Target, Request>
49     where
50         MS: MakeService<Target, Request>,
51     {
52         maker: MS,
53         #[pin]
54         making: Option<MS::Future>,
55         target: Target,
56         load: Level,
57         services: Slab<()>,
58         died_tx: tokio::sync::mpsc::UnboundedSender<usize>,
59         #[pin]
60         died_rx: tokio::sync::mpsc::UnboundedReceiver<usize>,
61         limit: Option<usize>,
62     }
63 }
64 
65 impl<MS, Target, Request> fmt::Debug for PoolDiscoverer<MS, Target, Request>
66 where
67     MS: MakeService<Target, Request> + fmt::Debug,
68     Target: fmt::Debug,
69 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result70     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71         f.debug_struct("PoolDiscoverer")
72             .field("maker", &self.maker)
73             .field("making", &self.making.is_some())
74             .field("target", &self.target)
75             .field("load", &self.load)
76             .field("services", &self.services)
77             .field("limit", &self.limit)
78             .finish()
79     }
80 }
81 
82 impl<MS, Target, Request> Stream for PoolDiscoverer<MS, Target, Request>
83 where
84     MS: MakeService<Target, Request>,
85     MS::MakeError: Into<crate::BoxError>,
86     MS::Error: Into<crate::BoxError>,
87     Target: Clone,
88 {
89     type Item = Result<Change<usize, DropNotifyService<MS::Service>>, MS::MakeError>;
90 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>91     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
92         let mut this = self.project();
93 
94         while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_recv(cx) {
95             this.services.remove(sid);
96             tracing::trace!(
97                 pool.services = this.services.len(),
98                 message = "removing dropped service"
99             );
100         }
101 
102         if this.services.is_empty() && this.making.is_none() {
103             let _ = ready!(this.maker.poll_ready(cx))?;
104             tracing::trace!("construct initial pool connection");
105             this.making
106                 .set(Some(this.maker.make_service(this.target.clone())));
107         }
108 
109         if let Level::High = this.load {
110             if this.making.is_none() {
111                 if this
112                     .limit
113                     .map(|limit| this.services.len() >= limit)
114                     .unwrap_or(false)
115                 {
116                     return Poll::Pending;
117                 }
118 
119                 tracing::trace!(
120                     pool.services = this.services.len(),
121                     message = "decided to add service to loaded pool"
122                 );
123                 ready!(this.maker.poll_ready(cx))?;
124                 tracing::trace!("making new service");
125                 // TODO: it'd be great if we could avoid the clone here and use, say, &Target
126                 this.making
127                     .set(Some(this.maker.make_service(this.target.clone())));
128             }
129         }
130 
131         if let Some(fut) = this.making.as_mut().as_pin_mut() {
132             let svc = ready!(fut.poll(cx))?;
133             this.making.set(None);
134 
135             let id = this.services.insert(());
136             let svc = DropNotifyService {
137                 svc,
138                 id,
139                 notify: this.died_tx.clone(),
140             };
141             tracing::trace!(
142                 pool.services = this.services.len(),
143                 message = "finished creating new service"
144             );
145             *this.load = Level::Normal;
146             return Poll::Ready(Some(Ok(Change::Insert(id, svc))));
147         }
148 
149         match this.load {
150             Level::High => {
151                 unreachable!("found high load but no Service being made");
152             }
153             Level::Normal => Poll::Pending,
154             Level::Low if this.services.len() == 1 => Poll::Pending,
155             Level::Low => {
156                 *this.load = Level::Normal;
157                 // NOTE: this is a little sad -- we'd prefer to kill short-living services
158                 let rm = this.services.iter().next().unwrap().0;
159                 // note that we _don't_ remove from self.services here
160                 // that'll happen automatically on drop
161                 tracing::trace!(
162                     pool.services = this.services.len(),
163                     message = "removing service for over-provisioned pool"
164                 );
165                 Poll::Ready(Some(Ok(Change::Remove(rm))))
166             }
167         }
168     }
169 }
170 
171 /// A [builder] that lets you configure how a [`Pool`] determines whether the underlying service is
172 /// loaded or not. See the [module-level documentation](self) and the builder's methods for
173 /// details.
174 ///
175 ///  [builder]: https://rust-lang-nursery.github.io/api-guidelines/type-safety.html#builders-enable-construction-of-complex-values-c-builder
176 #[derive(Copy, Clone, Debug)]
177 pub struct Builder {
178     low: f64,
179     high: f64,
180     init: f64,
181     alpha: f64,
182     limit: Option<usize>,
183 }
184 
185 impl Default for Builder {
default() -> Self186     fn default() -> Self {
187         Builder {
188             init: 0.1,
189             low: 0.00001,
190             high: 0.2,
191             alpha: 0.03,
192             limit: None,
193         }
194     }
195 }
196 
197 impl Builder {
198     /// Create a new builder with default values for all load settings.
199     ///
200     /// If you just want to use the defaults, you can just use [`Pool::new`].
new() -> Self201     pub fn new() -> Self {
202         Self::default()
203     }
204 
205     /// When the estimated load (see the [module-level docs](self)) drops below this
206     /// threshold, and there are at least two services active, a service is removed.
207     ///
208     /// The default value is 0.01. That is, when one in every 100 `poll_ready` calls return
209     /// `Pending`, then the underlying service is considered underutilized.
underutilized_below(&mut self, low: f64) -> &mut Self210     pub fn underutilized_below(&mut self, low: f64) -> &mut Self {
211         self.low = low;
212         self
213     }
214 
215     /// When the estimated load (see the [module-level docs](self)) exceeds this
216     /// threshold, and no service is currently in the process of being added, a new service is
217     /// scheduled to be added to the underlying [`Balance`].
218     ///
219     /// The default value is 0.5. That is, when every other call to `poll_ready` returns
220     /// `Pending`, then the underlying service is considered highly loaded.
loaded_above(&mut self, high: f64) -> &mut Self221     pub fn loaded_above(&mut self, high: f64) -> &mut Self {
222         self.high = high;
223         self
224     }
225 
226     /// The initial estimated load average.
227     ///
228     /// This is also the value that the estimated load will be reset to whenever a service is added
229     /// or removed.
230     ///
231     /// The default value is 0.1.
initial(&mut self, init: f64) -> &mut Self232     pub fn initial(&mut self, init: f64) -> &mut Self {
233         self.init = init;
234         self
235     }
236 
237     /// How aggressively the estimated load average is updated.
238     ///
239     /// This is the α parameter of the formula for the [exponential moving
240     /// average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average), and
241     /// dictates how quickly new samples of the current load affect the estimated load. If the
242     /// value is closer to 1, newer samples affect the load average a lot (when α is 1, the load
243     /// average is immediately set to the current load). If the value is closer to 0, newer samples
244     /// affect the load average very little at a time.
245     ///
246     /// The given value is clamped to `[0,1]`.
247     ///
248     /// The default value is 0.05, meaning, in very approximate terms, that each new load sample
249     /// affects the estimated load by 5%.
urgency(&mut self, alpha: f64) -> &mut Self250     pub fn urgency(&mut self, alpha: f64) -> &mut Self {
251         self.alpha = alpha.max(0.0).min(1.0);
252         self
253     }
254 
255     /// The maximum number of backing `Service` instances to maintain.
256     ///
257     /// When the limit is reached, the load estimate is clamped to the high load threshhold, and no
258     /// new service is spawned.
259     ///
260     /// No maximum limit is imposed by default.
max_services(&mut self, limit: Option<usize>) -> &mut Self261     pub fn max_services(&mut self, limit: Option<usize>) -> &mut Self {
262         self.limit = limit;
263         self
264     }
265 
266     /// See [`Pool::new`].
build<MS, Target, Request>( &self, make_service: MS, target: Target, ) -> Pool<MS, Target, Request> where MS: MakeService<Target, Request>, MS::Service: Load, <MS::Service as Load>::Metric: std::fmt::Debug, MS::MakeError: Into<crate::BoxError>, MS::Error: Into<crate::BoxError>, Target: Clone,267     pub fn build<MS, Target, Request>(
268         &self,
269         make_service: MS,
270         target: Target,
271     ) -> Pool<MS, Target, Request>
272     where
273         MS: MakeService<Target, Request>,
274         MS::Service: Load,
275         <MS::Service as Load>::Metric: std::fmt::Debug,
276         MS::MakeError: Into<crate::BoxError>,
277         MS::Error: Into<crate::BoxError>,
278         Target: Clone,
279     {
280         let (died_tx, died_rx) = tokio::sync::mpsc::unbounded_channel();
281         let d = PoolDiscoverer {
282             maker: make_service,
283             making: None,
284             target,
285             load: Level::Normal,
286             services: Slab::new(),
287             died_tx,
288             died_rx,
289             limit: self.limit,
290         };
291 
292         Pool {
293             balance: Balance::new(Box::pin(d)),
294             options: *self,
295             ewma: self.init,
296         }
297     }
298 }
299 
300 /// A dynamically sized, load-balanced pool of `Service` instances.
301 pub struct Pool<MS, Target, Request>
302 where
303     MS: MakeService<Target, Request>,
304     MS::MakeError: Into<crate::BoxError>,
305     MS::Error: Into<crate::BoxError>,
306     Target: Clone,
307 {
308     // the Pin<Box<_>> here is needed since Balance requires the Service to be Unpin
309     balance: Balance<Pin<Box<PoolDiscoverer<MS, Target, Request>>>, Request>,
310     options: Builder,
311     ewma: f64,
312 }
313 
314 impl<MS, Target, Request> fmt::Debug for Pool<MS, Target, Request>
315 where
316     MS: MakeService<Target, Request> + fmt::Debug,
317     MS::MakeError: Into<crate::BoxError>,
318     MS::Error: Into<crate::BoxError>,
319     Target: Clone + fmt::Debug,
320     MS::Service: fmt::Debug,
321 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result322     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323         f.debug_struct("Pool")
324             .field("balance", &self.balance)
325             .field("options", &self.options)
326             .field("ewma", &self.ewma)
327             .finish()
328     }
329 }
330 
331 impl<MS, Target, Request> Pool<MS, Target, Request>
332 where
333     MS: MakeService<Target, Request>,
334     MS::Service: Load,
335     <MS::Service as Load>::Metric: std::fmt::Debug,
336     MS::MakeError: Into<crate::BoxError>,
337     MS::Error: Into<crate::BoxError>,
338     Target: Clone,
339 {
340     /// Construct a new dynamically sized `Pool`.
341     ///
342     /// If many calls to `poll_ready` return `Pending`, `new_service` is used to
343     /// construct another `Service` that is then added to the load-balanced pool.
344     /// If many calls to `poll_ready` succeed, the most recently added `Service`
345     /// is dropped from the pool.
new(make_service: MS, target: Target) -> Self346     pub fn new(make_service: MS, target: Target) -> Self {
347         Builder::new().build(make_service, target)
348     }
349 }
350 
351 type PinBalance<S, Request> = Balance<Pin<Box<S>>, Request>;
352 
353 impl<MS, Target, Req> Service<Req> for Pool<MS, Target, Req>
354 where
355     MS: MakeService<Target, Req>,
356     MS::Service: Load,
357     <MS::Service as Load>::Metric: std::fmt::Debug,
358     MS::MakeError: Into<crate::BoxError>,
359     MS::Error: Into<crate::BoxError>,
360     Target: Clone,
361 {
362     type Response = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Response;
363     type Error = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Error;
364     type Future = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Future;
365 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>366     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
367         if let Poll::Ready(()) = self.balance.poll_ready(cx)? {
368             // services was ready -- there are enough services
369             // update ewma with a 0 sample
370             self.ewma *= 1.0 - self.options.alpha;
371 
372             let discover = self.balance.discover_mut().as_mut().project();
373             if self.ewma < self.options.low {
374                 if *discover.load != Level::Low {
375                     tracing::trace!({ ewma = %self.ewma }, "pool is over-provisioned");
376                 }
377                 *discover.load = Level::Low;
378 
379                 if discover.services.len() > 1 {
380                     // reset EWMA so we don't immediately try to remove another service
381                     self.ewma = self.options.init;
382                 }
383             } else {
384                 if *discover.load != Level::Normal {
385                     tracing::trace!({ ewma = %self.ewma }, "pool is appropriately provisioned");
386                 }
387                 *discover.load = Level::Normal;
388             }
389 
390             return Poll::Ready(Ok(()));
391         }
392 
393         let discover = self.balance.discover_mut().as_mut().project();
394         if discover.making.is_none() {
395             // no services are ready -- we're overloaded
396             // update ewma with a 1 sample
397             self.ewma = self.options.alpha + (1.0 - self.options.alpha) * self.ewma;
398 
399             if self.ewma > self.options.high {
400                 if *discover.load != Level::High {
401                     tracing::trace!({ ewma = %self.ewma }, "pool is under-provisioned");
402                 }
403                 *discover.load = Level::High;
404 
405                 // don't reset the EWMA -- in theory, poll_ready should now start returning
406                 // `Ready`, so we won't try to launch another service immediately.
407                 // we clamp it to high though in case the # of services is limited.
408                 self.ewma = self.options.high;
409 
410                 // we need to call balance again for PoolDiscover to realize
411                 // it can make a new service
412                 return self.balance.poll_ready(cx);
413             } else {
414                 *discover.load = Level::Normal;
415             }
416         }
417 
418         Poll::Pending
419     }
420 
call(&mut self, req: Req) -> Self::Future421     fn call(&mut self, req: Req) -> Self::Future {
422         self.balance.call(req)
423     }
424 }
425 
426 #[doc(hidden)]
427 #[derive(Debug)]
428 pub struct DropNotifyService<Svc> {
429     svc: Svc,
430     id: usize,
431     notify: tokio::sync::mpsc::UnboundedSender<usize>,
432 }
433 
434 impl<Svc> Drop for DropNotifyService<Svc> {
drop(&mut self)435     fn drop(&mut self) {
436         let _ = self.notify.send(self.id).is_ok();
437     }
438 }
439 
440 impl<Svc: Load> Load for DropNotifyService<Svc> {
441     type Metric = Svc::Metric;
load(&self) -> Self::Metric442     fn load(&self) -> Self::Metric {
443         self.svc.load()
444     }
445 }
446 
447 impl<Request, Svc: Service<Request>> Service<Request> for DropNotifyService<Svc> {
448     type Response = Svc::Response;
449     type Future = Svc::Future;
450     type Error = Svc::Error;
451 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>452     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
453         self.svc.poll_ready(cx)
454     }
455 
call(&mut self, req: Request) -> Self::Future456     fn call(&mut self, req: Request) -> Self::Future {
457         self.svc.call(req)
458     }
459 }
460