1 use super::store::Resolve;
2 use super::*;
3 
4 use crate::frame::Reason;
5 
6 use crate::codec::UserError;
7 use crate::codec::UserError::*;
8 
9 use bytes::buf::Take;
10 use std::{
11     cmp::{self, Ordering},
12     fmt, io, mem,
13     task::{Context, Poll, Waker},
14 };
15 
16 /// # Warning
17 ///
18 /// Queued streams are ordered by stream ID, as we need to ensure that
19 /// lower-numbered streams are sent headers before higher-numbered ones.
20 /// This is because "idle" stream IDs – those which have been initiated but
21 /// have yet to receive frames – will be implicitly closed on receipt of a
22 /// frame on a higher stream ID. If these queues was not ordered by stream
23 /// IDs, some mechanism would be necessary to ensure that the lowest-numbered]
24 /// idle stream is opened first.
25 #[derive(Debug)]
26 pub(super) struct Prioritize {
27     /// Queue of streams waiting for socket capacity to send a frame.
28     pending_send: store::Queue<stream::NextSend>,
29 
30     /// Queue of streams waiting for window capacity to produce data.
31     pending_capacity: store::Queue<stream::NextSendCapacity>,
32 
33     /// Streams waiting for capacity due to max concurrency
34     ///
35     /// The `SendRequest` handle is `Clone`. This enables initiating requests
36     /// from many tasks. However, offering this capability while supporting
37     /// backpressure at some level is tricky. If there are many `SendRequest`
38     /// handles and a single stream becomes available, which handle gets
39     /// assigned that stream? Maybe that handle is no longer ready to send a
40     /// request.
41     ///
42     /// The strategy used is to allow each `SendRequest` handle one buffered
43     /// request. A `SendRequest` handle is ready to send a request if it has no
44     /// associated buffered requests. This is the same strategy as `mpsc` in the
45     /// futures library.
46     pending_open: store::Queue<stream::NextOpen>,
47 
48     /// Connection level flow control governing sent data
49     flow: FlowControl,
50 
51     /// Stream ID of the last stream opened.
52     last_opened_id: StreamId,
53 
54     /// What `DATA` frame is currently being sent in the codec.
55     in_flight_data_frame: InFlightData,
56 
57     /// The maximum amount of bytes a stream should buffer.
58     max_buffer_size: usize,
59 }
60 
61 #[derive(Debug, Eq, PartialEq)]
62 enum InFlightData {
63     /// There is no `DATA` frame in flight.
64     Nothing,
65     /// There is a `DATA` frame in flight belonging to the given stream.
66     DataFrame(store::Key),
67     /// There was a `DATA` frame, but the stream's queue was since cleared.
68     Drop,
69 }
70 
71 pub(crate) struct Prioritized<B> {
72     // The buffer
73     inner: Take<B>,
74 
75     end_of_stream: bool,
76 
77     // The stream that this is associated with
78     stream: store::Key,
79 }
80 
81 // ===== impl Prioritize =====
82 
83 impl Prioritize {
new(config: &Config) -> Prioritize84     pub fn new(config: &Config) -> Prioritize {
85         let mut flow = FlowControl::new();
86 
87         flow.inc_window(config.remote_init_window_sz)
88             .expect("invalid initial window size");
89 
90         // TODO: proper error handling
91         let _res = flow.assign_capacity(config.remote_init_window_sz);
92         debug_assert!(_res.is_ok());
93 
94         tracing::trace!("Prioritize::new; flow={:?}", flow);
95 
96         Prioritize {
97             pending_send: store::Queue::new(),
98             pending_capacity: store::Queue::new(),
99             pending_open: store::Queue::new(),
100             flow,
101             last_opened_id: StreamId::ZERO,
102             in_flight_data_frame: InFlightData::Nothing,
103             max_buffer_size: config.local_max_buffer_size,
104         }
105     }
106 
max_buffer_size(&self) -> usize107     pub(crate) fn max_buffer_size(&self) -> usize {
108         self.max_buffer_size
109     }
110 
111     /// Queue a frame to be sent to the remote
queue_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, task: &mut Option<Waker>, )112     pub fn queue_frame<B>(
113         &mut self,
114         frame: Frame<B>,
115         buffer: &mut Buffer<Frame<B>>,
116         stream: &mut store::Ptr,
117         task: &mut Option<Waker>,
118     ) {
119         let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
120         let _e = span.enter();
121         // Queue the frame in the buffer
122         stream.pending_send.push_back(buffer, frame);
123         self.schedule_send(stream, task);
124     }
125 
schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)126     pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
127         // If the stream is waiting to be opened, nothing more to do.
128         if stream.is_send_ready() {
129             tracing::trace!(?stream.id, "schedule_send");
130             // Queue the stream
131             self.pending_send.push(stream);
132 
133             // Notify the connection.
134             if let Some(task) = task.take() {
135                 task.wake();
136             }
137         }
138     }
139 
queue_open(&mut self, stream: &mut store::Ptr)140     pub fn queue_open(&mut self, stream: &mut store::Ptr) {
141         self.pending_open.push(stream);
142     }
143 
144     /// Send a data frame
send_data<B>( &mut self, frame: frame::Data<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, counts: &mut Counts, task: &mut Option<Waker>, ) -> Result<(), UserError> where B: Buf,145     pub fn send_data<B>(
146         &mut self,
147         frame: frame::Data<B>,
148         buffer: &mut Buffer<Frame<B>>,
149         stream: &mut store::Ptr,
150         counts: &mut Counts,
151         task: &mut Option<Waker>,
152     ) -> Result<(), UserError>
153     where
154         B: Buf,
155     {
156         let sz = frame.payload().remaining();
157 
158         if sz > MAX_WINDOW_SIZE as usize {
159             return Err(UserError::PayloadTooBig);
160         }
161 
162         let sz = sz as WindowSize;
163 
164         if !stream.state.is_send_streaming() {
165             if stream.state.is_closed() {
166                 return Err(InactiveStreamId);
167             } else {
168                 return Err(UnexpectedFrameType);
169             }
170         }
171 
172         // Update the buffered data counter
173         stream.buffered_send_data += sz as usize;
174 
175         let span =
176             tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
177         let _e = span.enter();
178         tracing::trace!(buffered = stream.buffered_send_data);
179 
180         // Implicitly request more send capacity if not enough has been
181         // requested yet.
182         if (stream.requested_send_capacity as usize) < stream.buffered_send_data {
183             // Update the target requested capacity
184             stream.requested_send_capacity =
185                 cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;
186 
187             // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
188             // cannot be assigned at the time it is called.
189             //
190             // Streams over the max concurrent count will still call `send_data` so we should be
191             // careful not to put it into `pending_capacity` as it will starve the connection
192             // capacity for other streams
193             if !stream.is_pending_open {
194                 self.try_assign_capacity(stream);
195             }
196         }
197 
198         if frame.is_end_stream() {
199             stream.state.send_close();
200             self.reserve_capacity(0, stream, counts);
201         }
202 
203         tracing::trace!(
204             available = %stream.send_flow.available(),
205             buffered = stream.buffered_send_data,
206         );
207 
208         // The `stream.buffered_send_data == 0` check is here so that, if a zero
209         // length data frame is queued to the front (there is no previously
210         // queued data), it gets sent out immediately even if there is no
211         // available send window.
212         //
213         // Sending out zero length data frames can be done to signal
214         // end-of-stream.
215         //
216         if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
217             // The stream currently has capacity to send the data frame, so
218             // queue it up and notify the connection task.
219             self.queue_frame(frame.into(), buffer, stream, task);
220         } else {
221             // The stream has no capacity to send the frame now, save it but
222             // don't notify the connection task. Once additional capacity
223             // becomes available, the frame will be flushed.
224             stream.pending_send.push_back(buffer, frame.into());
225         }
226 
227         Ok(())
228     }
229 
230     /// Request capacity to send data
reserve_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, counts: &mut Counts, )231     pub fn reserve_capacity(
232         &mut self,
233         capacity: WindowSize,
234         stream: &mut store::Ptr,
235         counts: &mut Counts,
236     ) {
237         let span = tracing::trace_span!(
238             "reserve_capacity",
239             ?stream.id,
240             requested = capacity,
241             effective = (capacity as usize) + stream.buffered_send_data,
242             curr = stream.requested_send_capacity
243         );
244         let _e = span.enter();
245 
246         // Actual capacity is `capacity` + the current amount of buffered data.
247         // If it were less, then we could never send out the buffered data.
248         let capacity = (capacity as usize) + stream.buffered_send_data;
249 
250         match capacity.cmp(&(stream.requested_send_capacity as usize)) {
251             Ordering::Equal => {
252                 // Nothing to do
253             }
254             Ordering::Less => {
255                 // Update the target requested capacity
256                 stream.requested_send_capacity = capacity as WindowSize;
257 
258                 // Currently available capacity assigned to the stream
259                 let available = stream.send_flow.available().as_size();
260 
261                 // If the stream has more assigned capacity than requested, reclaim
262                 // some for the connection
263                 if available as usize > capacity {
264                     let diff = available - capacity as WindowSize;
265 
266                     // TODO: proper error handling
267                     let _res = stream.send_flow.claim_capacity(diff);
268                     debug_assert!(_res.is_ok());
269 
270                     self.assign_connection_capacity(diff, stream, counts);
271                 }
272             }
273             Ordering::Greater => {
274                 // If trying to *add* capacity, but the stream send side is closed,
275                 // there's nothing to be done.
276                 if stream.state.is_send_closed() {
277                     return;
278                 }
279 
280                 // Update the target requested capacity
281                 stream.requested_send_capacity =
282                     cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;
283 
284                 // Try to assign additional capacity to the stream. If none is
285                 // currently available, the stream will be queued to receive some
286                 // when more becomes available.
287                 self.try_assign_capacity(stream);
288             }
289         }
290     }
291 
recv_stream_window_update( &mut self, inc: WindowSize, stream: &mut store::Ptr, ) -> Result<(), Reason>292     pub fn recv_stream_window_update(
293         &mut self,
294         inc: WindowSize,
295         stream: &mut store::Ptr,
296     ) -> Result<(), Reason> {
297         let span = tracing::trace_span!(
298             "recv_stream_window_update",
299             ?stream.id,
300             ?stream.state,
301             inc,
302             flow = ?stream.send_flow
303         );
304         let _e = span.enter();
305 
306         if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
307             // We can't send any data, so don't bother doing anything else.
308             return Ok(());
309         }
310 
311         // Update the stream level flow control.
312         stream.send_flow.inc_window(inc)?;
313 
314         // If the stream is waiting on additional capacity, then this will
315         // assign it (if available on the connection) and notify the producer
316         self.try_assign_capacity(stream);
317 
318         Ok(())
319     }
320 
recv_connection_window_update( &mut self, inc: WindowSize, store: &mut Store, counts: &mut Counts, ) -> Result<(), Reason>321     pub fn recv_connection_window_update(
322         &mut self,
323         inc: WindowSize,
324         store: &mut Store,
325         counts: &mut Counts,
326     ) -> Result<(), Reason> {
327         // Update the connection's window
328         self.flow.inc_window(inc)?;
329 
330         self.assign_connection_capacity(inc, store, counts);
331         Ok(())
332     }
333 
334     /// Reclaim all capacity assigned to the stream and re-assign it to the
335     /// connection
reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)336     pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
337         let available = stream.send_flow.available().as_size();
338         if available > 0 {
339             // TODO: proper error handling
340             let _res = stream.send_flow.claim_capacity(available);
341             debug_assert!(_res.is_ok());
342             // Re-assign all capacity to the connection
343             self.assign_connection_capacity(available, stream, counts);
344         }
345     }
346 
347     /// Reclaim just reserved capacity, not buffered capacity, and re-assign
348     /// it to the connection
reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)349     pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
350         // only reclaim requested capacity that isn't already buffered
351         if stream.requested_send_capacity as usize > stream.buffered_send_data {
352             let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;
353 
354             // TODO: proper error handling
355             let _res = stream.send_flow.claim_capacity(reserved);
356             debug_assert!(_res.is_ok());
357             self.assign_connection_capacity(reserved, stream, counts);
358         }
359     }
360 
clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts)361     pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
362         let span = tracing::trace_span!("clear_pending_capacity");
363         let _e = span.enter();
364         while let Some(stream) = self.pending_capacity.pop(store) {
365             counts.transition(stream, |_, stream| {
366                 tracing::trace!(?stream.id, "clear_pending_capacity");
367             })
368         }
369     }
370 
assign_connection_capacity<R>( &mut self, inc: WindowSize, store: &mut R, counts: &mut Counts, ) where R: Resolve,371     pub fn assign_connection_capacity<R>(
372         &mut self,
373         inc: WindowSize,
374         store: &mut R,
375         counts: &mut Counts,
376     ) where
377         R: Resolve,
378     {
379         let span = tracing::trace_span!("assign_connection_capacity", inc);
380         let _e = span.enter();
381 
382         // TODO: proper error handling
383         let _res = self.flow.assign_capacity(inc);
384         debug_assert!(_res.is_ok());
385 
386         // Assign newly acquired capacity to streams pending capacity.
387         while self.flow.available() > 0 {
388             let stream = match self.pending_capacity.pop(store) {
389                 Some(stream) => stream,
390                 None => return,
391             };
392 
393             // Streams pending capacity may have been reset before capacity
394             // became available. In that case, the stream won't want any
395             // capacity, and so we shouldn't "transition" on it, but just evict
396             // it and continue the loop.
397             if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
398                 continue;
399             }
400 
401             counts.transition(stream, |_, stream| {
402                 // Try to assign capacity to the stream. This will also re-queue the
403                 // stream if there isn't enough connection level capacity to fulfill
404                 // the capacity request.
405                 self.try_assign_capacity(stream);
406             })
407         }
408     }
409 
410     /// Request capacity to send data
try_assign_capacity(&mut self, stream: &mut store::Ptr)411     fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
412         let total_requested = stream.requested_send_capacity;
413 
414         // Total requested should never go below actual assigned
415         // (Note: the window size can go lower than assigned)
416         debug_assert!(stream.send_flow.available() <= total_requested as usize);
417 
418         // The amount of additional capacity that the stream requests.
419         // Don't assign more than the window has available!
420         let additional = cmp::min(
421             total_requested - stream.send_flow.available().as_size(),
422             // Can't assign more than what is available
423             stream.send_flow.window_size() - stream.send_flow.available().as_size(),
424         );
425         let span = tracing::trace_span!("try_assign_capacity", ?stream.id);
426         let _e = span.enter();
427         tracing::trace!(
428             requested = total_requested,
429             additional,
430             buffered = stream.buffered_send_data,
431             window = stream.send_flow.window_size(),
432             conn = %self.flow.available()
433         );
434 
435         if additional == 0 {
436             // Nothing more to do
437             return;
438         }
439 
440         // If the stream has requested capacity, then it must be in the
441         // streaming state (more data could be sent) or there is buffered data
442         // waiting to be sent.
443         debug_assert!(
444             stream.state.is_send_streaming() || stream.buffered_send_data > 0,
445             "state={:?}",
446             stream.state
447         );
448 
449         // The amount of currently available capacity on the connection
450         let conn_available = self.flow.available().as_size();
451 
452         // First check if capacity is immediately available
453         if conn_available > 0 {
454             // The amount of capacity to assign to the stream
455             // TODO: Should prioritization factor into this?
456             let assign = cmp::min(conn_available, additional);
457 
458             tracing::trace!(capacity = assign, "assigning");
459 
460             // Assign the capacity to the stream
461             stream.assign_capacity(assign, self.max_buffer_size);
462 
463             // Claim the capacity from the connection
464             // TODO: proper error handling
465             let _res = self.flow.claim_capacity(assign);
466             debug_assert!(_res.is_ok());
467         }
468 
469         tracing::trace!(
470             available = %stream.send_flow.available(),
471             requested = stream.requested_send_capacity,
472             buffered = stream.buffered_send_data,
473             has_unavailable = %stream.send_flow.has_unavailable()
474         );
475 
476         if stream.send_flow.available() < stream.requested_send_capacity as usize
477             && stream.send_flow.has_unavailable()
478         {
479             // The stream requires additional capacity and the stream's
480             // window has available capacity, but the connection window
481             // does not.
482             //
483             // In this case, the stream needs to be queued up for when the
484             // connection has more capacity.
485             self.pending_capacity.push(stream);
486         }
487 
488         // If data is buffered and the stream is send ready, then
489         // schedule the stream for execution
490         if stream.buffered_send_data > 0 && stream.is_send_ready() {
491             // TODO: This assertion isn't *exactly* correct. There can still be
492             // buffered send data while the stream's pending send queue is
493             // empty. This can happen when a large data frame is in the process
494             // of being **partially** sent. Once the window has been sent, the
495             // data frame will be returned to the prioritization layer to be
496             // re-scheduled.
497             //
498             // That said, it would be nice to figure out how to make this
499             // assertion correctly.
500             //
501             // debug_assert!(!stream.pending_send.is_empty());
502 
503             self.pending_send.push(stream);
504         }
505     }
506 
poll_complete<T, B>( &mut self, cx: &mut Context, buffer: &mut Buffer<Frame<B>>, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,507     pub fn poll_complete<T, B>(
508         &mut self,
509         cx: &mut Context,
510         buffer: &mut Buffer<Frame<B>>,
511         store: &mut Store,
512         counts: &mut Counts,
513         dst: &mut Codec<T, Prioritized<B>>,
514     ) -> Poll<io::Result<()>>
515     where
516         T: AsyncWrite + Unpin,
517         B: Buf,
518     {
519         // Ensure codec is ready
520         ready!(dst.poll_ready(cx))?;
521 
522         // Reclaim any frame that has previously been written
523         self.reclaim_frame(buffer, store, dst);
524 
525         // The max frame length
526         let max_frame_len = dst.max_send_frame_size();
527 
528         tracing::trace!("poll_complete");
529 
530         loop {
531             if let Some(mut stream) = self.pop_pending_open(store, counts) {
532                 self.pending_send.push_front(&mut stream);
533                 self.try_assign_capacity(&mut stream);
534             }
535 
536             match self.pop_frame(buffer, store, max_frame_len, counts) {
537                 Some(frame) => {
538                     tracing::trace!(?frame, "writing");
539 
540                     debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
541                     if let Frame::Data(ref frame) = frame {
542                         self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
543                     }
544                     dst.buffer(frame).expect("invalid frame");
545 
546                     // Ensure the codec is ready to try the loop again.
547                     ready!(dst.poll_ready(cx))?;
548 
549                     // Because, always try to reclaim...
550                     self.reclaim_frame(buffer, store, dst);
551                 }
552                 None => {
553                     // Try to flush the codec.
554                     ready!(dst.flush(cx))?;
555 
556                     // This might release a data frame...
557                     if !self.reclaim_frame(buffer, store, dst) {
558                         return Poll::Ready(Ok(()));
559                     }
560 
561                     // No need to poll ready as poll_complete() does this for
562                     // us...
563                 }
564             }
565         }
566     }
567 
568     /// Tries to reclaim a pending data frame from the codec.
569     ///
570     /// Returns true if a frame was reclaimed.
571     ///
572     /// When a data frame is written to the codec, it may not be written in its
573     /// entirety (large chunks are split up into potentially many data frames).
574     /// In this case, the stream needs to be reprioritized.
reclaim_frame<T, B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, dst: &mut Codec<T, Prioritized<B>>, ) -> bool where B: Buf,575     fn reclaim_frame<T, B>(
576         &mut self,
577         buffer: &mut Buffer<Frame<B>>,
578         store: &mut Store,
579         dst: &mut Codec<T, Prioritized<B>>,
580     ) -> bool
581     where
582         B: Buf,
583     {
584         let span = tracing::trace_span!("try_reclaim_frame");
585         let _e = span.enter();
586 
587         // First check if there are any data chunks to take back
588         if let Some(frame) = dst.take_last_data_frame() {
589             self.reclaim_frame_inner(buffer, store, frame)
590         } else {
591             false
592         }
593     }
594 
reclaim_frame_inner<B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, frame: frame::Data<Prioritized<B>>, ) -> bool where B: Buf,595     fn reclaim_frame_inner<B>(
596         &mut self,
597         buffer: &mut Buffer<Frame<B>>,
598         store: &mut Store,
599         frame: frame::Data<Prioritized<B>>,
600     ) -> bool
601     where
602         B: Buf,
603     {
604         tracing::trace!(
605             ?frame,
606             sz = frame.payload().inner.get_ref().remaining(),
607             "reclaimed"
608         );
609 
610         let mut eos = false;
611         let key = frame.payload().stream;
612 
613         match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
614             InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
615             InFlightData::Drop => {
616                 tracing::trace!("not reclaiming frame for cancelled stream");
617                 return false;
618             }
619             InFlightData::DataFrame(k) => {
620                 debug_assert_eq!(k, key);
621             }
622         }
623 
624         let mut frame = frame.map(|prioritized| {
625             // TODO: Ensure fully written
626             eos = prioritized.end_of_stream;
627             prioritized.inner.into_inner()
628         });
629 
630         if frame.payload().has_remaining() {
631             let mut stream = store.resolve(key);
632 
633             if eos {
634                 frame.set_end_stream(true);
635             }
636 
637             self.push_back_frame(frame.into(), buffer, &mut stream);
638 
639             return true;
640         }
641 
642         false
643     }
644 
645     /// Push the frame to the front of the stream's deque, scheduling the
646     /// stream if needed.
push_back_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, )647     fn push_back_frame<B>(
648         &mut self,
649         frame: Frame<B>,
650         buffer: &mut Buffer<Frame<B>>,
651         stream: &mut store::Ptr,
652     ) {
653         // Push the frame to the front of the stream's deque
654         stream.pending_send.push_front(buffer, frame);
655 
656         // If needed, schedule the sender
657         if stream.send_flow.available() > 0 {
658             debug_assert!(!stream.pending_send.is_empty());
659             self.pending_send.push(stream);
660         }
661     }
662 
clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr)663     pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
664         let span = tracing::trace_span!("clear_queue", ?stream.id);
665         let _e = span.enter();
666 
667         // TODO: make this more efficient?
668         while let Some(frame) = stream.pending_send.pop_front(buffer) {
669             tracing::trace!(?frame, "dropping");
670         }
671 
672         stream.buffered_send_data = 0;
673         stream.requested_send_capacity = 0;
674         if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
675             if stream.key() == key {
676                 // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
677                 self.in_flight_data_frame = InFlightData::Drop;
678             }
679         }
680     }
681 
clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts)682     pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
683         while let Some(stream) = self.pending_send.pop(store) {
684             let is_pending_reset = stream.is_pending_reset_expiration();
685             counts.transition_after(stream, is_pending_reset);
686         }
687     }
688 
clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts)689     pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
690         while let Some(stream) = self.pending_open.pop(store) {
691             let is_pending_reset = stream.is_pending_reset_expiration();
692             counts.transition_after(stream, is_pending_reset);
693         }
694     }
695 
pop_frame<B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, max_len: usize, counts: &mut Counts, ) -> Option<Frame<Prioritized<B>>> where B: Buf,696     fn pop_frame<B>(
697         &mut self,
698         buffer: &mut Buffer<Frame<B>>,
699         store: &mut Store,
700         max_len: usize,
701         counts: &mut Counts,
702     ) -> Option<Frame<Prioritized<B>>>
703     where
704         B: Buf,
705     {
706         let span = tracing::trace_span!("pop_frame");
707         let _e = span.enter();
708 
709         loop {
710             match self.pending_send.pop(store) {
711                 Some(mut stream) => {
712                     let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
713                     let _e = span.enter();
714 
715                     // It's possible that this stream, besides having data to send,
716                     // is also queued to send a reset, and thus is already in the queue
717                     // to wait for "some time" after a reset.
718                     //
719                     // To be safe, we just always ask the stream.
720                     let is_pending_reset = stream.is_pending_reset_expiration();
721 
722                     tracing::trace!(is_pending_reset);
723 
724                     let frame = match stream.pending_send.pop_front(buffer) {
725                         Some(Frame::Data(mut frame)) => {
726                             // Get the amount of capacity remaining for stream's
727                             // window.
728                             let stream_capacity = stream.send_flow.available();
729                             let sz = frame.payload().remaining();
730 
731                             tracing::trace!(
732                                 sz,
733                                 eos = frame.is_end_stream(),
734                                 window = %stream_capacity,
735                                 available = %stream.send_flow.available(),
736                                 requested = stream.requested_send_capacity,
737                                 buffered = stream.buffered_send_data,
738                                 "data frame"
739                             );
740 
741                             // Zero length data frames always have capacity to
742                             // be sent.
743                             if sz > 0 && stream_capacity == 0 {
744                                 tracing::trace!("stream capacity is 0");
745 
746                                 // Ensure that the stream is waiting for
747                                 // connection level capacity
748                                 //
749                                 // TODO: uncomment
750                                 // debug_assert!(stream.is_pending_send_capacity);
751 
752                                 // The stream has no more capacity, this can
753                                 // happen if the remote reduced the stream
754                                 // window. In this case, we need to buffer the
755                                 // frame and wait for a window update...
756                                 stream.pending_send.push_front(buffer, frame.into());
757 
758                                 continue;
759                             }
760 
761                             // Only send up to the max frame length
762                             let len = cmp::min(sz, max_len);
763 
764                             // Only send up to the stream's window capacity
765                             let len =
766                                 cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
767 
768                             // There *must* be be enough connection level
769                             // capacity at this point.
770                             debug_assert!(len <= self.flow.window_size());
771 
772                             // Check if the stream level window the peer knows is available. In some
773                             // scenarios, maybe the window we know is available but the window which
774                             // peer knows is not.
775                             if len > 0 && len > stream.send_flow.window_size() {
776                                 stream.pending_send.push_front(buffer, frame.into());
777                                 continue;
778                             }
779 
780                             tracing::trace!(len, "sending data frame");
781 
782                             // Update the flow control
783                             tracing::trace_span!("updating stream flow").in_scope(|| {
784                                 stream.send_data(len, self.max_buffer_size);
785 
786                                 // Assign the capacity back to the connection that
787                                 // was just consumed from the stream in the previous
788                                 // line.
789                                 // TODO: proper error handling
790                                 let _res = self.flow.assign_capacity(len);
791                                 debug_assert!(_res.is_ok());
792                             });
793 
794                             let (eos, len) = tracing::trace_span!("updating connection flow")
795                                 .in_scope(|| {
796                                     // TODO: proper error handling
797                                     let _res = self.flow.send_data(len);
798                                     debug_assert!(_res.is_ok());
799 
800                                     // Wrap the frame's data payload to ensure that the
801                                     // correct amount of data gets written.
802 
803                                     let eos = frame.is_end_stream();
804                                     let len = len as usize;
805 
806                                     if frame.payload().remaining() > len {
807                                         frame.set_end_stream(false);
808                                     }
809                                     (eos, len)
810                                 });
811 
812                             Frame::Data(frame.map(|buf| Prioritized {
813                                 inner: buf.take(len),
814                                 end_of_stream: eos,
815                                 stream: stream.key(),
816                             }))
817                         }
818                         Some(Frame::PushPromise(pp)) => {
819                             let mut pushed =
820                                 stream.store_mut().find_mut(&pp.promised_id()).unwrap();
821                             pushed.is_pending_push = false;
822                             // Transition stream from pending_push to pending_open
823                             // if possible
824                             if !pushed.pending_send.is_empty() {
825                                 if counts.can_inc_num_send_streams() {
826                                     counts.inc_num_send_streams(&mut pushed);
827                                     self.pending_send.push(&mut pushed);
828                                 } else {
829                                     self.queue_open(&mut pushed);
830                                 }
831                             }
832                             Frame::PushPromise(pp)
833                         }
834                         Some(frame) => frame.map(|_| {
835                             unreachable!(
836                                 "Frame::map closure will only be called \
837                                  on DATA frames."
838                             )
839                         }),
840                         None => {
841                             if let Some(reason) = stream.state.get_scheduled_reset() {
842                                 let stream_id = stream.id;
843                                 stream
844                                     .state
845                                     .set_reset(stream_id, reason, Initiator::Library);
846 
847                                 let frame = frame::Reset::new(stream.id, reason);
848                                 Frame::Reset(frame)
849                             } else {
850                                 // If the stream receives a RESET from the peer, it may have
851                                 // had data buffered to be sent, but all the frames are cleared
852                                 // in clear_queue(). Instead of doing O(N) traversal through queue
853                                 // to remove, lets just ignore the stream here.
854                                 tracing::trace!("removing dangling stream from pending_send");
855                                 // Since this should only happen as a consequence of `clear_queue`,
856                                 // we must be in a closed state of some kind.
857                                 debug_assert!(stream.state.is_closed());
858                                 counts.transition_after(stream, is_pending_reset);
859                                 continue;
860                             }
861                         }
862                     };
863 
864                     tracing::trace!("pop_frame; frame={:?}", frame);
865 
866                     if cfg!(debug_assertions) && stream.state.is_idle() {
867                         debug_assert!(stream.id > self.last_opened_id);
868                         self.last_opened_id = stream.id;
869                     }
870 
871                     if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
872                         // TODO: Only requeue the sender IF it is ready to send
873                         // the next frame. i.e. don't requeue it if the next
874                         // frame is a data frame and the stream does not have
875                         // any more capacity.
876                         self.pending_send.push(&mut stream);
877                     }
878 
879                     counts.transition_after(stream, is_pending_reset);
880 
881                     return Some(frame);
882                 }
883                 None => return None,
884             }
885         }
886     }
887 
pop_pending_open<'s>( &mut self, store: &'s mut Store, counts: &mut Counts, ) -> Option<store::Ptr<'s>>888     fn pop_pending_open<'s>(
889         &mut self,
890         store: &'s mut Store,
891         counts: &mut Counts,
892     ) -> Option<store::Ptr<'s>> {
893         tracing::trace!("schedule_pending_open");
894         // check for any pending open streams
895         if counts.can_inc_num_send_streams() {
896             if let Some(mut stream) = self.pending_open.pop(store) {
897                 tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
898 
899                 counts.inc_num_send_streams(&mut stream);
900                 stream.notify_send();
901                 return Some(stream);
902             }
903         }
904 
905         None
906     }
907 }
908 
909 // ===== impl Prioritized =====
910 
911 impl<B> Buf for Prioritized<B>
912 where
913     B: Buf,
914 {
remaining(&self) -> usize915     fn remaining(&self) -> usize {
916         self.inner.remaining()
917     }
918 
chunk(&self) -> &[u8]919     fn chunk(&self) -> &[u8] {
920         self.inner.chunk()
921     }
922 
chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize923     fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
924         self.inner.chunks_vectored(dst)
925     }
926 
advance(&mut self, cnt: usize)927     fn advance(&mut self, cnt: usize) {
928         self.inner.advance(cnt)
929     }
930 }
931 
932 impl<B: Buf> fmt::Debug for Prioritized<B> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result933     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
934         fmt.debug_struct("Prioritized")
935             .field("remaining", &self.inner.get_ref().remaining())
936             .field("end_of_stream", &self.end_of_stream)
937             .field("stream", &self.stream)
938             .finish()
939     }
940 }
941