1 use super::*;
2 
3 use std::task::{Context, Waker};
4 use std::time::Instant;
5 use std::usize;
6 
7 /// Tracks Stream related state
8 ///
9 /// # Reference counting
10 ///
11 /// There can be a number of outstanding handles to a single Stream. These are
12 /// tracked using reference counting. The `ref_count` field represents the
13 /// number of outstanding userspace handles that can reach this stream.
14 ///
15 /// It's important to note that when the stream is placed in an internal queue
16 /// (such as an accept queue), this is **not** tracked by a reference count.
17 /// Thus, `ref_count` can be zero and the stream still has to be kept around.
18 #[derive(Debug)]
19 pub(super) struct Stream {
20     /// The h2 stream identifier
21     pub id: StreamId,
22 
23     /// Current state of the stream
24     pub state: State,
25 
26     /// Set to `true` when the stream is counted against the connection's max
27     /// concurrent streams.
28     pub is_counted: bool,
29 
30     /// Number of outstanding handles pointing to this stream
31     pub ref_count: usize,
32 
33     // ===== Fields related to sending =====
34     /// Next node in the accept linked list
35     pub next_pending_send: Option<store::Key>,
36 
37     /// Set to true when the stream is pending accept
38     pub is_pending_send: bool,
39 
40     /// Send data flow control
41     pub send_flow: FlowControl,
42 
43     /// Amount of send capacity that has been requested, but not yet allocated.
44     pub requested_send_capacity: WindowSize,
45 
46     /// Amount of data buffered at the prioritization layer.
47     /// TODO: Technically this could be greater than the window size...
48     pub buffered_send_data: usize,
49 
50     /// Task tracking additional send capacity (i.e. window updates).
51     send_task: Option<Waker>,
52 
53     /// Frames pending for this stream being sent to the socket
54     pub pending_send: buffer::Deque,
55 
56     /// Next node in the linked list of streams waiting for additional
57     /// connection level capacity.
58     pub next_pending_send_capacity: Option<store::Key>,
59 
60     /// True if the stream is waiting for outbound connection capacity
61     pub is_pending_send_capacity: bool,
62 
63     /// Set to true when the send capacity has been incremented
64     pub send_capacity_inc: bool,
65 
66     /// Next node in the open linked list
67     pub next_open: Option<store::Key>,
68 
69     /// Set to true when the stream is pending to be opened
70     pub is_pending_open: bool,
71 
72     /// Set to true when a push is pending for this stream
73     pub is_pending_push: bool,
74 
75     // ===== Fields related to receiving =====
76     /// Next node in the accept linked list
77     pub next_pending_accept: Option<store::Key>,
78 
79     /// Set to true when the stream is pending accept
80     pub is_pending_accept: bool,
81 
82     /// Receive data flow control
83     pub recv_flow: FlowControl,
84 
85     pub in_flight_recv_data: WindowSize,
86 
87     /// Next node in the linked list of streams waiting to send window updates.
88     pub next_window_update: Option<store::Key>,
89 
90     /// True if the stream is waiting to send a window update
91     pub is_pending_window_update: bool,
92 
93     /// The time when this stream may have been locally reset.
94     pub reset_at: Option<Instant>,
95 
96     /// Next node in list of reset streams that should expire eventually
97     pub next_reset_expire: Option<store::Key>,
98 
99     /// Frames pending for this stream to read
100     pub pending_recv: buffer::Deque,
101 
102     /// When the RecvStream drop occurs, no data should be received.
103     pub is_recv: bool,
104 
105     /// Task tracking receiving frames
106     pub recv_task: Option<Waker>,
107 
108     /// The stream's pending push promises
109     pub pending_push_promises: store::Queue<NextAccept>,
110 
111     /// Validate content-length headers
112     pub content_length: ContentLength,
113 }
114 
115 /// State related to validating a stream's content-length
116 #[derive(Debug)]
117 pub enum ContentLength {
118     Omitted,
119     Head,
120     Remaining(u64),
121 }
122 
123 #[derive(Debug)]
124 pub(super) struct NextAccept;
125 
126 #[derive(Debug)]
127 pub(super) struct NextSend;
128 
129 #[derive(Debug)]
130 pub(super) struct NextSendCapacity;
131 
132 #[derive(Debug)]
133 pub(super) struct NextWindowUpdate;
134 
135 #[derive(Debug)]
136 pub(super) struct NextOpen;
137 
138 #[derive(Debug)]
139 pub(super) struct NextResetExpire;
140 
141 impl Stream {
new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream142     pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream {
143         let mut send_flow = FlowControl::new();
144         let mut recv_flow = FlowControl::new();
145 
146         recv_flow
147             .inc_window(init_recv_window)
148             .expect("invalid initial receive window");
149         // TODO: proper error handling?
150         let _res = recv_flow.assign_capacity(init_recv_window);
151         debug_assert!(_res.is_ok());
152 
153         send_flow
154             .inc_window(init_send_window)
155             .expect("invalid initial send window size");
156 
157         Stream {
158             id,
159             state: State::default(),
160             ref_count: 0,
161             is_counted: false,
162 
163             // ===== Fields related to sending =====
164             next_pending_send: None,
165             is_pending_send: false,
166             send_flow,
167             requested_send_capacity: 0,
168             buffered_send_data: 0,
169             send_task: None,
170             pending_send: buffer::Deque::new(),
171             is_pending_send_capacity: false,
172             next_pending_send_capacity: None,
173             send_capacity_inc: false,
174             is_pending_open: false,
175             next_open: None,
176             is_pending_push: false,
177 
178             // ===== Fields related to receiving =====
179             next_pending_accept: None,
180             is_pending_accept: false,
181             recv_flow,
182             in_flight_recv_data: 0,
183             next_window_update: None,
184             is_pending_window_update: false,
185             reset_at: None,
186             next_reset_expire: None,
187             pending_recv: buffer::Deque::new(),
188             is_recv: true,
189             recv_task: None,
190             pending_push_promises: store::Queue::new(),
191             content_length: ContentLength::Omitted,
192         }
193     }
194 
195     /// Increment the stream's ref count
ref_inc(&mut self)196     pub fn ref_inc(&mut self) {
197         assert!(self.ref_count < usize::MAX);
198         self.ref_count += 1;
199     }
200 
201     /// Decrements the stream's ref count
ref_dec(&mut self)202     pub fn ref_dec(&mut self) {
203         assert!(self.ref_count > 0);
204         self.ref_count -= 1;
205     }
206 
207     /// Returns true if stream is currently being held for some time because of
208     /// a local reset.
is_pending_reset_expiration(&self) -> bool209     pub fn is_pending_reset_expiration(&self) -> bool {
210         self.reset_at.is_some()
211     }
212 
213     /// Returns true if frames for this stream are ready to be sent over the wire
is_send_ready(&self) -> bool214     pub fn is_send_ready(&self) -> bool {
215         // Why do we check pending_open?
216         //
217         // We allow users to call send_request() which schedules a stream to be pending_open
218         // if there is no room according to the concurrency limit (max_send_streams), and we
219         // also allow data to be buffered for send with send_data() if there is no capacity for
220         // the stream to send the data, which attempts to place the stream in pending_send.
221         // If the stream is not open, we don't want the stream to be scheduled for
222         // execution (pending_send). Note that if the stream is in pending_open, it will be
223         // pushed to pending_send when there is room for an open stream.
224         //
225         // In pending_push we track whether a PushPromise still needs to be sent
226         // from a different stream before we can start sending frames on this one.
227         // This is different from the "open" check because reserved streams don't count
228         // toward the concurrency limit.
229         // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2
230         !self.is_pending_open && !self.is_pending_push
231     }
232 
233     /// Returns true if the stream is closed
is_closed(&self) -> bool234     pub fn is_closed(&self) -> bool {
235         // The state has fully transitioned to closed.
236         self.state.is_closed() &&
237             // Because outbound frames transition the stream state before being
238             // buffered, we have to ensure that all frames have been flushed.
239             self.pending_send.is_empty() &&
240             // Sometimes large data frames are sent out in chunks. After a chunk
241             // of the frame is sent, the remainder is pushed back onto the send
242             // queue to be rescheduled.
243             //
244             // Checking for additional buffered data lets us catch this case.
245             self.buffered_send_data == 0
246     }
247 
248     /// Returns true if the stream is no longer in use
is_released(&self) -> bool249     pub fn is_released(&self) -> bool {
250         // The stream is closed and fully flushed
251         self.is_closed() &&
252             // There are no more outstanding references to the stream
253             self.ref_count == 0 &&
254             // The stream is not in any queue
255             !self.is_pending_send && !self.is_pending_send_capacity &&
256             !self.is_pending_accept && !self.is_pending_window_update &&
257             !self.is_pending_open && self.reset_at.is_none()
258     }
259 
260     /// Returns true when the consumer of the stream has dropped all handles
261     /// (indicating no further interest in the stream) and the stream state is
262     /// not actually closed.
263     ///
264     /// In this case, a reset should be sent.
is_canceled_interest(&self) -> bool265     pub fn is_canceled_interest(&self) -> bool {
266         self.ref_count == 0 && !self.state.is_closed()
267     }
268 
269     /// Current available stream send capacity
capacity(&self, max_buffer_size: usize) -> WindowSize270     pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
271         let available = self.send_flow.available().as_size() as usize;
272         let buffered = self.buffered_send_data;
273 
274         available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
275     }
276 
assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize)277     pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
278         let prev_capacity = self.capacity(max_buffer_size);
279         debug_assert!(capacity > 0);
280         // TODO: proper error handling
281         let _res = self.send_flow.assign_capacity(capacity);
282         debug_assert!(_res.is_ok());
283 
284         tracing::trace!(
285             "  assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
286             self.send_flow.available(),
287             self.buffered_send_data,
288             self.id,
289             max_buffer_size,
290             prev_capacity,
291         );
292 
293         if prev_capacity < self.capacity(max_buffer_size) {
294             self.notify_capacity();
295         }
296     }
297 
send_data(&mut self, len: WindowSize, max_buffer_size: usize)298     pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
299         let prev_capacity = self.capacity(max_buffer_size);
300 
301         // TODO: proper error handling
302         let _res = self.send_flow.send_data(len);
303         debug_assert!(_res.is_ok());
304 
305         // Decrement the stream's buffered data counter
306         debug_assert!(self.buffered_send_data >= len as usize);
307         self.buffered_send_data -= len as usize;
308         self.requested_send_capacity -= len;
309 
310         tracing::trace!(
311             "  sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
312             self.send_flow.available(),
313             self.buffered_send_data,
314             self.id,
315             max_buffer_size,
316             prev_capacity,
317         );
318 
319         if prev_capacity < self.capacity(max_buffer_size) {
320             self.notify_capacity();
321         }
322     }
323 
324     /// If the capacity was limited because of the max_send_buffer_size,
325     /// then consider waking the send task again...
notify_capacity(&mut self)326     pub fn notify_capacity(&mut self) {
327         self.send_capacity_inc = true;
328         tracing::trace!("  notifying task");
329         self.notify_send();
330     }
331 
332     /// Returns `Err` when the decrement cannot be completed due to overflow.
dec_content_length(&mut self, len: usize) -> Result<(), ()>333     pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
334         match self.content_length {
335             ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
336                 Some(val) => *rem = val,
337                 None => return Err(()),
338             },
339             ContentLength::Head => {
340                 if len != 0 {
341                     return Err(());
342                 }
343             }
344             _ => {}
345         }
346 
347         Ok(())
348     }
349 
ensure_content_length_zero(&self) -> Result<(), ()>350     pub fn ensure_content_length_zero(&self) -> Result<(), ()> {
351         match self.content_length {
352             ContentLength::Remaining(0) => Ok(()),
353             ContentLength::Remaining(_) => Err(()),
354             _ => Ok(()),
355         }
356     }
357 
notify_send(&mut self)358     pub fn notify_send(&mut self) {
359         if let Some(task) = self.send_task.take() {
360             task.wake();
361         }
362     }
363 
wait_send(&mut self, cx: &Context)364     pub fn wait_send(&mut self, cx: &Context) {
365         self.send_task = Some(cx.waker().clone());
366     }
367 
notify_recv(&mut self)368     pub fn notify_recv(&mut self) {
369         if let Some(task) = self.recv_task.take() {
370             task.wake();
371         }
372     }
373 }
374 
375 impl store::Next for NextAccept {
next(stream: &Stream) -> Option<store::Key>376     fn next(stream: &Stream) -> Option<store::Key> {
377         stream.next_pending_accept
378     }
379 
set_next(stream: &mut Stream, key: Option<store::Key>)380     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
381         stream.next_pending_accept = key;
382     }
383 
take_next(stream: &mut Stream) -> Option<store::Key>384     fn take_next(stream: &mut Stream) -> Option<store::Key> {
385         stream.next_pending_accept.take()
386     }
387 
is_queued(stream: &Stream) -> bool388     fn is_queued(stream: &Stream) -> bool {
389         stream.is_pending_accept
390     }
391 
set_queued(stream: &mut Stream, val: bool)392     fn set_queued(stream: &mut Stream, val: bool) {
393         stream.is_pending_accept = val;
394     }
395 }
396 
397 impl store::Next for NextSend {
next(stream: &Stream) -> Option<store::Key>398     fn next(stream: &Stream) -> Option<store::Key> {
399         stream.next_pending_send
400     }
401 
set_next(stream: &mut Stream, key: Option<store::Key>)402     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
403         stream.next_pending_send = key;
404     }
405 
take_next(stream: &mut Stream) -> Option<store::Key>406     fn take_next(stream: &mut Stream) -> Option<store::Key> {
407         stream.next_pending_send.take()
408     }
409 
is_queued(stream: &Stream) -> bool410     fn is_queued(stream: &Stream) -> bool {
411         stream.is_pending_send
412     }
413 
set_queued(stream: &mut Stream, val: bool)414     fn set_queued(stream: &mut Stream, val: bool) {
415         if val {
416             // ensure that stream is not queued for being opened
417             // if it's being put into queue for sending data
418             debug_assert!(!stream.is_pending_open);
419         }
420         stream.is_pending_send = val;
421     }
422 }
423 
424 impl store::Next for NextSendCapacity {
next(stream: &Stream) -> Option<store::Key>425     fn next(stream: &Stream) -> Option<store::Key> {
426         stream.next_pending_send_capacity
427     }
428 
set_next(stream: &mut Stream, key: Option<store::Key>)429     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
430         stream.next_pending_send_capacity = key;
431     }
432 
take_next(stream: &mut Stream) -> Option<store::Key>433     fn take_next(stream: &mut Stream) -> Option<store::Key> {
434         stream.next_pending_send_capacity.take()
435     }
436 
is_queued(stream: &Stream) -> bool437     fn is_queued(stream: &Stream) -> bool {
438         stream.is_pending_send_capacity
439     }
440 
set_queued(stream: &mut Stream, val: bool)441     fn set_queued(stream: &mut Stream, val: bool) {
442         stream.is_pending_send_capacity = val;
443     }
444 }
445 
446 impl store::Next for NextWindowUpdate {
next(stream: &Stream) -> Option<store::Key>447     fn next(stream: &Stream) -> Option<store::Key> {
448         stream.next_window_update
449     }
450 
set_next(stream: &mut Stream, key: Option<store::Key>)451     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
452         stream.next_window_update = key;
453     }
454 
take_next(stream: &mut Stream) -> Option<store::Key>455     fn take_next(stream: &mut Stream) -> Option<store::Key> {
456         stream.next_window_update.take()
457     }
458 
is_queued(stream: &Stream) -> bool459     fn is_queued(stream: &Stream) -> bool {
460         stream.is_pending_window_update
461     }
462 
set_queued(stream: &mut Stream, val: bool)463     fn set_queued(stream: &mut Stream, val: bool) {
464         stream.is_pending_window_update = val;
465     }
466 }
467 
468 impl store::Next for NextOpen {
next(stream: &Stream) -> Option<store::Key>469     fn next(stream: &Stream) -> Option<store::Key> {
470         stream.next_open
471     }
472 
set_next(stream: &mut Stream, key: Option<store::Key>)473     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
474         stream.next_open = key;
475     }
476 
take_next(stream: &mut Stream) -> Option<store::Key>477     fn take_next(stream: &mut Stream) -> Option<store::Key> {
478         stream.next_open.take()
479     }
480 
is_queued(stream: &Stream) -> bool481     fn is_queued(stream: &Stream) -> bool {
482         stream.is_pending_open
483     }
484 
set_queued(stream: &mut Stream, val: bool)485     fn set_queued(stream: &mut Stream, val: bool) {
486         if val {
487             // ensure that stream is not queued for being sent
488             // if it's being put into queue for opening the stream
489             debug_assert!(!stream.is_pending_send);
490         }
491         stream.is_pending_open = val;
492     }
493 }
494 
495 impl store::Next for NextResetExpire {
next(stream: &Stream) -> Option<store::Key>496     fn next(stream: &Stream) -> Option<store::Key> {
497         stream.next_reset_expire
498     }
499 
set_next(stream: &mut Stream, key: Option<store::Key>)500     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
501         stream.next_reset_expire = key;
502     }
503 
take_next(stream: &mut Stream) -> Option<store::Key>504     fn take_next(stream: &mut Stream) -> Option<store::Key> {
505         stream.next_reset_expire.take()
506     }
507 
is_queued(stream: &Stream) -> bool508     fn is_queued(stream: &Stream) -> bool {
509         stream.reset_at.is_some()
510     }
511 
set_queued(stream: &mut Stream, val: bool)512     fn set_queued(stream: &mut Stream, val: bool) {
513         if val {
514             stream.reset_at = Some(Instant::now());
515         } else {
516             stream.reset_at = None;
517         }
518     }
519 }
520 
521 // ===== impl ContentLength =====
522 
523 impl ContentLength {
is_head(&self) -> bool524     pub fn is_head(&self) -> bool {
525         matches!(*self, Self::Head)
526     }
527 }
528