1 use crate::frame::Reason;
2 use crate::proto::{WindowSize, MAX_WINDOW_SIZE};
3 
4 use std::fmt;
5 
6 // We don't want to send WINDOW_UPDATE frames for tiny changes, but instead
7 // aggregate them when the changes are significant. Many implementations do
8 // this by keeping a "ratio" of the update version the allowed window size.
9 //
10 // While some may wish to represent this ratio as percentage, using a f32,
11 // we skip having to deal with float math and stick to integers. To do so,
12 // the "ratio" is represented by 2 i32s, split into the numerator and
13 // denominator. For example, a 50% ratio is simply represented as 1/2.
14 //
15 // An example applying this ratio: If a stream has an allowed window size of
16 // 100 bytes, WINDOW_UPDATE frames are scheduled when the unclaimed change
17 // becomes greater than 1/2, or 50 bytes.
18 const UNCLAIMED_NUMERATOR: i32 = 1;
19 const UNCLAIMED_DENOMINATOR: i32 = 2;
20 
21 #[test]
22 #[allow(clippy::assertions_on_constants)]
sanity_unclaimed_ratio()23 fn sanity_unclaimed_ratio() {
24     assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR);
25     assert!(UNCLAIMED_NUMERATOR >= 0);
26     assert!(UNCLAIMED_DENOMINATOR > 0);
27 }
28 
29 #[derive(Copy, Clone, Debug)]
30 pub struct FlowControl {
31     /// Window the peer knows about.
32     ///
33     /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received.
34     ///
35     /// For example, say the peer sends a request and uses 32kb of the window.
36     /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust
37     /// its understanding of the capacity of the window, and that would be:
38     ///
39     /// ```notrust
40     /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb
41     /// ```
42     window_size: Window,
43 
44     /// Window that we know about.
45     ///
46     /// This can go negative if a user declares a smaller target window than
47     /// the peer knows about.
48     available: Window,
49 }
50 
51 impl FlowControl {
new() -> FlowControl52     pub fn new() -> FlowControl {
53         FlowControl {
54             window_size: Window(0),
55             available: Window(0),
56         }
57     }
58 
59     /// Returns the window size as known by the peer
window_size(&self) -> WindowSize60     pub fn window_size(&self) -> WindowSize {
61         self.window_size.as_size()
62     }
63 
64     /// Returns the window size available to the consumer
available(&self) -> Window65     pub fn available(&self) -> Window {
66         self.available
67     }
68 
69     /// Returns true if there is unavailable window capacity
has_unavailable(&self) -> bool70     pub fn has_unavailable(&self) -> bool {
71         if self.window_size < 0 {
72             return false;
73         }
74 
75         self.window_size > self.available
76     }
77 
claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason>78     pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
79         self.available.decrease_by(capacity)
80     }
81 
assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason>82     pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
83         self.available.increase_by(capacity)
84     }
85 
86     /// If a WINDOW_UPDATE frame should be sent, returns a positive number
87     /// representing the increment to be used.
88     ///
89     /// If there is no available bytes to be reclaimed, or the number of
90     /// available bytes does not reach the threshold, this returns `None`.
91     ///
92     /// This represents pending outbound WINDOW_UPDATE frames.
unclaimed_capacity(&self) -> Option<WindowSize>93     pub fn unclaimed_capacity(&self) -> Option<WindowSize> {
94         let available = self.available;
95 
96         if self.window_size >= available {
97             return None;
98         }
99 
100         let unclaimed = available.0 - self.window_size.0;
101         let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR;
102 
103         if unclaimed < threshold {
104             None
105         } else {
106             Some(unclaimed as WindowSize)
107         }
108     }
109 
110     /// Increase the window size.
111     ///
112     /// This is called after receiving a WINDOW_UPDATE frame
inc_window(&mut self, sz: WindowSize) -> Result<(), Reason>113     pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
114         let (val, overflow) = self.window_size.0.overflowing_add(sz as i32);
115 
116         if overflow {
117             return Err(Reason::FLOW_CONTROL_ERROR);
118         }
119 
120         if val > MAX_WINDOW_SIZE as i32 {
121             return Err(Reason::FLOW_CONTROL_ERROR);
122         }
123 
124         tracing::trace!(
125             "inc_window; sz={}; old={}; new={}",
126             sz,
127             self.window_size,
128             val
129         );
130 
131         self.window_size = Window(val);
132         Ok(())
133     }
134 
135     /// Decrement the send-side window size.
136     ///
137     /// This is called after receiving a SETTINGS frame with a lower
138     /// INITIAL_WINDOW_SIZE value.
dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason>139     pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
140         tracing::trace!(
141             "dec_window; sz={}; window={}, available={}",
142             sz,
143             self.window_size,
144             self.available
145         );
146         // ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can.
147         self.window_size.decrease_by(sz)?;
148         Ok(())
149     }
150 
151     /// Decrement the recv-side window size.
152     ///
153     /// This is called after receiving a SETTINGS ACK frame with a lower
154     /// INITIAL_WINDOW_SIZE value.
dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason>155     pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
156         tracing::trace!(
157             "dec_recv_window; sz={}; window={}, available={}",
158             sz,
159             self.window_size,
160             self.available
161         );
162         // This should not be able to overflow `window_size` from the bottom.
163         self.window_size.decrease_by(sz)?;
164         self.available.decrease_by(sz)?;
165         Ok(())
166     }
167 
168     /// Decrements the window reflecting data has actually been sent. The caller
169     /// must ensure that the window has capacity.
send_data(&mut self, sz: WindowSize) -> Result<(), Reason>170     pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> {
171         tracing::trace!(
172             "send_data; sz={}; window={}; available={}",
173             sz,
174             self.window_size,
175             self.available
176         );
177 
178         // If send size is zero it's meaningless to update flow control window
179         if sz > 0 {
180             // Ensure that the argument is correct
181             assert!(self.window_size.0 >= sz as i32);
182 
183             // Update values
184             self.window_size.decrease_by(sz)?;
185             self.available.decrease_by(sz)?;
186         }
187         Ok(())
188     }
189 }
190 
191 /// The current capacity of a flow-controlled Window.
192 ///
193 /// This number can go negative when either side has used a certain amount
194 /// of capacity when the other side advertises a reduction in size.
195 ///
196 /// This type tries to centralize the knowledge of addition and subtraction
197 /// to this capacity, instead of having integer casts throughout the source.
198 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd)]
199 pub struct Window(i32);
200 
201 impl Window {
as_size(&self) -> WindowSize202     pub fn as_size(&self) -> WindowSize {
203         if self.0 < 0 {
204             0
205         } else {
206             self.0 as WindowSize
207         }
208     }
209 
checked_size(&self) -> WindowSize210     pub fn checked_size(&self) -> WindowSize {
211         assert!(self.0 >= 0, "negative Window");
212         self.0 as WindowSize
213     }
214 
decrease_by(&mut self, other: WindowSize) -> Result<(), Reason>215     pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> {
216         if let Some(v) = self.0.checked_sub(other as i32) {
217             self.0 = v;
218             Ok(())
219         } else {
220             Err(Reason::FLOW_CONTROL_ERROR)
221         }
222     }
223 
increase_by(&mut self, other: WindowSize) -> Result<(), Reason>224     pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> {
225         let other = self.add(other)?;
226         self.0 = other.0;
227         Ok(())
228     }
229 
add(&self, other: WindowSize) -> Result<Self, Reason>230     pub fn add(&self, other: WindowSize) -> Result<Self, Reason> {
231         if let Some(v) = self.0.checked_add(other as i32) {
232             Ok(Self(v))
233         } else {
234             Err(Reason::FLOW_CONTROL_ERROR)
235         }
236     }
237 }
238 
239 impl PartialEq<usize> for Window {
eq(&self, other: &usize) -> bool240     fn eq(&self, other: &usize) -> bool {
241         if self.0 < 0 {
242             false
243         } else {
244             (self.0 as usize).eq(other)
245         }
246     }
247 }
248 
249 impl PartialOrd<usize> for Window {
partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering>250     fn partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering> {
251         if self.0 < 0 {
252             Some(::std::cmp::Ordering::Less)
253         } else {
254             (self.0 as usize).partial_cmp(other)
255         }
256     }
257 }
258 
259 impl fmt::Display for Window {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result260     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
261         fmt::Display::fmt(&self.0, f)
262     }
263 }
264 
265 impl From<Window> for isize {
from(w: Window) -> isize266     fn from(w: Window) -> isize {
267         w.0 as isize
268     }
269 }
270