1 /// HTTP2 Ping usage
2 ///
3 /// hyper uses HTTP2 pings for two purposes:
4 ///
5 /// 1. Adaptive flow control using BDP
6 /// 2. Connection keep-alive
7 ///
8 /// Both cases are optional.
9 ///
10 /// # BDP Algorithm
11 ///
12 /// 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
13 ///   1a. Record current time.
14 ///   1b. Send a BDP ping.
15 /// 2. Increment the number of received bytes.
16 /// 3. When the BDP ping ack is received:
17 ///   3a. Record duration from sent time.
18 ///   3b. Merge RTT with a running average.
19 ///   3c. Calculate bdp as bytes/rtt.
20 ///   3d. If bdp is over 2/3 max, set new max to bdp and update windows.
21 
22 #[cfg(feature = "runtime")]
23 use std::fmt;
24 #[cfg(feature = "runtime")]
25 use std::future::Future;
26 #[cfg(feature = "runtime")]
27 use std::pin::Pin;
28 use std::sync::{Arc, Mutex};
29 use std::task::{self, Poll};
30 use std::time::Duration;
31 #[cfg(not(feature = "runtime"))]
32 use std::time::Instant;
33 
34 use h2::{Ping, PingPong};
35 #[cfg(feature = "runtime")]
36 use tokio::time::{Instant, Sleep};
37 use tracing::{debug, trace};
38 
39 type WindowSize = u32;
40 
disabled() -> Recorder41 pub(super) fn disabled() -> Recorder {
42     Recorder { shared: None }
43 }
44 
channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger)45 pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) {
46     debug_assert!(
47         config.is_enabled(),
48         "ping channel requires bdp or keep-alive config",
49     );
50 
51     let bdp = config.bdp_initial_window.map(|wnd| Bdp {
52         bdp: wnd,
53         max_bandwidth: 0.0,
54         rtt: 0.0,
55         ping_delay: Duration::from_millis(100),
56         stable_count: 0,
57     });
58 
59     let (bytes, next_bdp_at) = if bdp.is_some() {
60         (Some(0), Some(Instant::now()))
61     } else {
62         (None, None)
63     };
64 
65     #[cfg(feature = "runtime")]
66     let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
67         interval,
68         timeout: config.keep_alive_timeout,
69         while_idle: config.keep_alive_while_idle,
70         timer: Box::pin(tokio::time::sleep(interval)),
71         state: KeepAliveState::Init,
72     });
73 
74     #[cfg(feature = "runtime")]
75     let last_read_at = keep_alive.as_ref().map(|_| Instant::now());
76 
77     let shared = Arc::new(Mutex::new(Shared {
78         bytes,
79         #[cfg(feature = "runtime")]
80         last_read_at,
81         #[cfg(feature = "runtime")]
82         is_keep_alive_timed_out: false,
83         ping_pong,
84         ping_sent_at: None,
85         next_bdp_at,
86     }));
87 
88     (
89         Recorder {
90             shared: Some(shared.clone()),
91         },
92         Ponger {
93             bdp,
94             #[cfg(feature = "runtime")]
95             keep_alive,
96             shared,
97         },
98     )
99 }
100 
101 #[derive(Clone)]
102 pub(super) struct Config {
103     pub(super) bdp_initial_window: Option<WindowSize>,
104     /// If no frames are received in this amount of time, a PING frame is sent.
105     #[cfg(feature = "runtime")]
106     pub(super) keep_alive_interval: Option<Duration>,
107     /// After sending a keepalive PING, the connection will be closed if
108     /// a pong is not received in this amount of time.
109     #[cfg(feature = "runtime")]
110     pub(super) keep_alive_timeout: Duration,
111     /// If true, sends pings even when there are no active streams.
112     #[cfg(feature = "runtime")]
113     pub(super) keep_alive_while_idle: bool,
114 }
115 
116 #[derive(Clone)]
117 pub(crate) struct Recorder {
118     shared: Option<Arc<Mutex<Shared>>>,
119 }
120 
121 pub(super) struct Ponger {
122     bdp: Option<Bdp>,
123     #[cfg(feature = "runtime")]
124     keep_alive: Option<KeepAlive>,
125     shared: Arc<Mutex<Shared>>,
126 }
127 
128 struct Shared {
129     ping_pong: PingPong,
130     ping_sent_at: Option<Instant>,
131 
132     // bdp
133     /// If `Some`, bdp is enabled, and this tracks how many bytes have been
134     /// read during the current sample.
135     bytes: Option<usize>,
136     /// We delay a variable amount of time between BDP pings. This allows us
137     /// to send less pings as the bandwidth stabilizes.
138     next_bdp_at: Option<Instant>,
139 
140     // keep-alive
141     /// If `Some`, keep-alive is enabled, and the Instant is how long ago
142     /// the connection read the last frame.
143     #[cfg(feature = "runtime")]
144     last_read_at: Option<Instant>,
145 
146     #[cfg(feature = "runtime")]
147     is_keep_alive_timed_out: bool,
148 }
149 
150 struct Bdp {
151     /// Current BDP in bytes
152     bdp: u32,
153     /// Largest bandwidth we've seen so far.
154     max_bandwidth: f64,
155     /// Round trip time in seconds
156     rtt: f64,
157     /// Delay the next ping by this amount.
158     ///
159     /// This will change depending on how stable the current bandwidth is.
160     ping_delay: Duration,
161     /// The count of ping round trips where BDP has stayed the same.
162     stable_count: u32,
163 }
164 
165 #[cfg(feature = "runtime")]
166 struct KeepAlive {
167     /// If no frames are received in this amount of time, a PING frame is sent.
168     interval: Duration,
169     /// After sending a keepalive PING, the connection will be closed if
170     /// a pong is not received in this amount of time.
171     timeout: Duration,
172     /// If true, sends pings even when there are no active streams.
173     while_idle: bool,
174 
175     state: KeepAliveState,
176     timer: Pin<Box<Sleep>>,
177 }
178 
179 #[cfg(feature = "runtime")]
180 enum KeepAliveState {
181     Init,
182     Scheduled,
183     PingSent,
184 }
185 
186 pub(super) enum Ponged {
187     SizeUpdate(WindowSize),
188     #[cfg(feature = "runtime")]
189     KeepAliveTimedOut,
190 }
191 
192 #[cfg(feature = "runtime")]
193 #[derive(Debug)]
194 pub(super) struct KeepAliveTimedOut;
195 
196 // ===== impl Config =====
197 
198 impl Config {
is_enabled(&self) -> bool199     pub(super) fn is_enabled(&self) -> bool {
200         #[cfg(feature = "runtime")]
201         {
202             self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
203         }
204 
205         #[cfg(not(feature = "runtime"))]
206         {
207             self.bdp_initial_window.is_some()
208         }
209     }
210 }
211 
212 // ===== impl Recorder =====
213 
214 impl Recorder {
record_data(&self, len: usize)215     pub(crate) fn record_data(&self, len: usize) {
216         let shared = if let Some(ref shared) = self.shared {
217             shared
218         } else {
219             return;
220         };
221 
222         let mut locked = shared.lock().unwrap();
223 
224         #[cfg(feature = "runtime")]
225         locked.update_last_read_at();
226 
227         // are we ready to send another bdp ping?
228         // if not, we don't need to record bytes either
229 
230         if let Some(ref next_bdp_at) = locked.next_bdp_at {
231             if Instant::now() < *next_bdp_at {
232                 return;
233             } else {
234                 locked.next_bdp_at = None;
235             }
236         }
237 
238         if let Some(ref mut bytes) = locked.bytes {
239             *bytes += len;
240         } else {
241             // no need to send bdp ping if bdp is disabled
242             return;
243         }
244 
245         if !locked.is_ping_sent() {
246             locked.send_ping();
247         }
248     }
249 
record_non_data(&self)250     pub(crate) fn record_non_data(&self) {
251         #[cfg(feature = "runtime")]
252         {
253             let shared = if let Some(ref shared) = self.shared {
254                 shared
255             } else {
256                 return;
257             };
258 
259             let mut locked = shared.lock().unwrap();
260 
261             locked.update_last_read_at();
262         }
263     }
264 
265     /// If the incoming stream is already closed, convert self into
266     /// a disabled reporter.
267     #[cfg(feature = "client")]
for_stream(self, stream: &h2::RecvStream) -> Self268     pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
269         if stream.is_end_stream() {
270             disabled()
271         } else {
272             self
273         }
274     }
275 
ensure_not_timed_out(&self) -> crate::Result<()>276     pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
277         #[cfg(feature = "runtime")]
278         {
279             if let Some(ref shared) = self.shared {
280                 let locked = shared.lock().unwrap();
281                 if locked.is_keep_alive_timed_out {
282                     return Err(KeepAliveTimedOut.crate_error());
283                 }
284             }
285         }
286 
287         // else
288         Ok(())
289     }
290 }
291 
292 // ===== impl Ponger =====
293 
294 impl Ponger {
poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged>295     pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
296         let now = Instant::now();
297         let mut locked = self.shared.lock().unwrap();
298         #[cfg(feature = "runtime")]
299         let is_idle = self.is_idle();
300 
301         #[cfg(feature = "runtime")]
302         {
303             if let Some(ref mut ka) = self.keep_alive {
304                 ka.schedule(is_idle, &locked);
305                 ka.maybe_ping(cx, &mut locked);
306             }
307         }
308 
309         if !locked.is_ping_sent() {
310             // XXX: this doesn't register a waker...?
311             return Poll::Pending;
312         }
313 
314         match locked.ping_pong.poll_pong(cx) {
315             Poll::Ready(Ok(_pong)) => {
316                 let start = locked
317                     .ping_sent_at
318                     .expect("pong received implies ping_sent_at");
319                 locked.ping_sent_at = None;
320                 let rtt = now - start;
321                 trace!("recv pong");
322 
323                 #[cfg(feature = "runtime")]
324                 {
325                     if let Some(ref mut ka) = self.keep_alive {
326                         locked.update_last_read_at();
327                         ka.schedule(is_idle, &locked);
328                     }
329                 }
330 
331                 if let Some(ref mut bdp) = self.bdp {
332                     let bytes = locked.bytes.expect("bdp enabled implies bytes");
333                     locked.bytes = Some(0); // reset
334                     trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
335 
336                     let update = bdp.calculate(bytes, rtt);
337                     locked.next_bdp_at = Some(now + bdp.ping_delay);
338                     if let Some(update) = update {
339                         return Poll::Ready(Ponged::SizeUpdate(update));
340                     }
341                 }
342             }
343             Poll::Ready(Err(e)) => {
344                 debug!("pong error: {}", e);
345             }
346             Poll::Pending => {
347                 #[cfg(feature = "runtime")]
348                 {
349                     if let Some(ref mut ka) = self.keep_alive {
350                         if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
351                             self.keep_alive = None;
352                             locked.is_keep_alive_timed_out = true;
353                             return Poll::Ready(Ponged::KeepAliveTimedOut);
354                         }
355                     }
356                 }
357             }
358         }
359 
360         // XXX: this doesn't register a waker...?
361         Poll::Pending
362     }
363 
364     #[cfg(feature = "runtime")]
is_idle(&self) -> bool365     fn is_idle(&self) -> bool {
366         Arc::strong_count(&self.shared) <= 2
367     }
368 }
369 
370 // ===== impl Shared =====
371 
372 impl Shared {
send_ping(&mut self)373     fn send_ping(&mut self) {
374         match self.ping_pong.send_ping(Ping::opaque()) {
375             Ok(()) => {
376                 self.ping_sent_at = Some(Instant::now());
377                 trace!("sent ping");
378             }
379             Err(err) => {
380                 debug!("error sending ping: {}", err);
381             }
382         }
383     }
384 
is_ping_sent(&self) -> bool385     fn is_ping_sent(&self) -> bool {
386         self.ping_sent_at.is_some()
387     }
388 
389     #[cfg(feature = "runtime")]
update_last_read_at(&mut self)390     fn update_last_read_at(&mut self) {
391         if self.last_read_at.is_some() {
392             self.last_read_at = Some(Instant::now());
393         }
394     }
395 
396     #[cfg(feature = "runtime")]
last_read_at(&self) -> Instant397     fn last_read_at(&self) -> Instant {
398         self.last_read_at.expect("keep_alive expects last_read_at")
399     }
400 }
401 
402 // ===== impl Bdp =====
403 
404 /// Any higher than this likely will be hitting the TCP flow control.
405 const BDP_LIMIT: usize = 1024 * 1024 * 16;
406 
407 impl Bdp {
calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize>408     fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
409         // No need to do any math if we're at the limit.
410         if self.bdp as usize == BDP_LIMIT {
411             self.stabilize_delay();
412             return None;
413         }
414 
415         // average the rtt
416         let rtt = seconds(rtt);
417         if self.rtt == 0.0 {
418             // First sample means rtt is first rtt.
419             self.rtt = rtt;
420         } else {
421             // Weigh this rtt as 1/8 for a moving average.
422             self.rtt += (rtt - self.rtt) * 0.125;
423         }
424 
425         // calculate the current bandwidth
426         let bw = (bytes as f64) / (self.rtt * 1.5);
427         trace!("current bandwidth = {:.1}B/s", bw);
428 
429         if bw < self.max_bandwidth {
430             // not a faster bandwidth, so don't update
431             self.stabilize_delay();
432             return None;
433         } else {
434             self.max_bandwidth = bw;
435         }
436 
437         // if the current `bytes` sample is at least 2/3 the previous
438         // bdp, increase to double the current sample.
439         if bytes >= self.bdp as usize * 2 / 3 {
440             self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
441             trace!("BDP increased to {}", self.bdp);
442 
443             self.stable_count = 0;
444             self.ping_delay /= 2;
445             Some(self.bdp)
446         } else {
447             self.stabilize_delay();
448             None
449         }
450     }
451 
stabilize_delay(&mut self)452     fn stabilize_delay(&mut self) {
453         if self.ping_delay < Duration::from_secs(10) {
454             self.stable_count += 1;
455 
456             if self.stable_count >= 2 {
457                 self.ping_delay *= 4;
458                 self.stable_count = 0;
459             }
460         }
461     }
462 }
463 
seconds(dur: Duration) -> f64464 fn seconds(dur: Duration) -> f64 {
465     const NANOS_PER_SEC: f64 = 1_000_000_000.0;
466     let secs = dur.as_secs() as f64;
467     secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
468 }
469 
470 // ===== impl KeepAlive =====
471 
472 #[cfg(feature = "runtime")]
473 impl KeepAlive {
schedule(&mut self, is_idle: bool, shared: &Shared)474     fn schedule(&mut self, is_idle: bool, shared: &Shared) {
475         match self.state {
476             KeepAliveState::Init => {
477                 if !self.while_idle && is_idle {
478                     return;
479                 }
480 
481                 self.state = KeepAliveState::Scheduled;
482                 let interval = shared.last_read_at() + self.interval;
483                 self.timer.as_mut().reset(interval);
484             }
485             KeepAliveState::PingSent => {
486                 if shared.is_ping_sent() {
487                     return;
488                 }
489 
490                 self.state = KeepAliveState::Scheduled;
491                 let interval = shared.last_read_at() + self.interval;
492                 self.timer.as_mut().reset(interval);
493             }
494             KeepAliveState::Scheduled => (),
495         }
496     }
497 
maybe_ping(&mut self, cx: &mut task::Context<'_>, shared: &mut Shared)498     fn maybe_ping(&mut self, cx: &mut task::Context<'_>, shared: &mut Shared) {
499         match self.state {
500             KeepAliveState::Scheduled => {
501                 if Pin::new(&mut self.timer).poll(cx).is_pending() {
502                     return;
503                 }
504                 // check if we've received a frame while we were scheduled
505                 if shared.last_read_at() + self.interval > self.timer.deadline() {
506                     self.state = KeepAliveState::Init;
507                     cx.waker().wake_by_ref(); // schedule us again
508                     return;
509                 }
510                 trace!("keep-alive interval ({:?}) reached", self.interval);
511                 shared.send_ping();
512                 self.state = KeepAliveState::PingSent;
513                 let timeout = Instant::now() + self.timeout;
514                 self.timer.as_mut().reset(timeout);
515             }
516             KeepAliveState::Init | KeepAliveState::PingSent => (),
517         }
518     }
519 
maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut>520     fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
521         match self.state {
522             KeepAliveState::PingSent => {
523                 if Pin::new(&mut self.timer).poll(cx).is_pending() {
524                     return Ok(());
525                 }
526                 trace!("keep-alive timeout ({:?}) reached", self.timeout);
527                 Err(KeepAliveTimedOut)
528             }
529             KeepAliveState::Init | KeepAliveState::Scheduled => Ok(()),
530         }
531     }
532 }
533 
534 // ===== impl KeepAliveTimedOut =====
535 
536 #[cfg(feature = "runtime")]
537 impl KeepAliveTimedOut {
crate_error(self) -> crate::Error538     pub(super) fn crate_error(self) -> crate::Error {
539         crate::Error::new(crate::error::Kind::Http2).with(self)
540     }
541 }
542 
543 #[cfg(feature = "runtime")]
544 impl fmt::Display for KeepAliveTimedOut {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result545     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546         f.write_str("keep-alive timed out")
547     }
548 }
549 
550 #[cfg(feature = "runtime")]
551 impl std::error::Error for KeepAliveTimedOut {
source(&self) -> Option<&(dyn std::error::Error + 'static)>552     fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
553         Some(&crate::error::TimedOut)
554     }
555 }
556