1 use super::store::Resolve; 2 use super::*; 3 4 use crate::frame::Reason; 5 6 use crate::codec::UserError; 7 use crate::codec::UserError::*; 8 9 use bytes::buf::Take; 10 use std::{ 11 cmp::{self, Ordering}, 12 fmt, io, mem, 13 task::{Context, Poll, Waker}, 14 }; 15 16 /// # Warning 17 /// 18 /// Queued streams are ordered by stream ID, as we need to ensure that 19 /// lower-numbered streams are sent headers before higher-numbered ones. 20 /// This is because "idle" stream IDs – those which have been initiated but 21 /// have yet to receive frames – will be implicitly closed on receipt of a 22 /// frame on a higher stream ID. If these queues was not ordered by stream 23 /// IDs, some mechanism would be necessary to ensure that the lowest-numbered] 24 /// idle stream is opened first. 25 #[derive(Debug)] 26 pub(super) struct Prioritize { 27 /// Queue of streams waiting for socket capacity to send a frame. 28 pending_send: store::Queue<stream::NextSend>, 29 30 /// Queue of streams waiting for window capacity to produce data. 31 pending_capacity: store::Queue<stream::NextSendCapacity>, 32 33 /// Streams waiting for capacity due to max concurrency 34 /// 35 /// The `SendRequest` handle is `Clone`. This enables initiating requests 36 /// from many tasks. However, offering this capability while supporting 37 /// backpressure at some level is tricky. If there are many `SendRequest` 38 /// handles and a single stream becomes available, which handle gets 39 /// assigned that stream? Maybe that handle is no longer ready to send a 40 /// request. 41 /// 42 /// The strategy used is to allow each `SendRequest` handle one buffered 43 /// request. A `SendRequest` handle is ready to send a request if it has no 44 /// associated buffered requests. This is the same strategy as `mpsc` in the 45 /// futures library. 46 pending_open: store::Queue<stream::NextOpen>, 47 48 /// Connection level flow control governing sent data 49 flow: FlowControl, 50 51 /// Stream ID of the last stream opened. 52 last_opened_id: StreamId, 53 54 /// What `DATA` frame is currently being sent in the codec. 55 in_flight_data_frame: InFlightData, 56 57 /// The maximum amount of bytes a stream should buffer. 58 max_buffer_size: usize, 59 } 60 61 #[derive(Debug, Eq, PartialEq)] 62 enum InFlightData { 63 /// There is no `DATA` frame in flight. 64 Nothing, 65 /// There is a `DATA` frame in flight belonging to the given stream. 66 DataFrame(store::Key), 67 /// There was a `DATA` frame, but the stream's queue was since cleared. 68 Drop, 69 } 70 71 pub(crate) struct Prioritized<B> { 72 // The buffer 73 inner: Take<B>, 74 75 end_of_stream: bool, 76 77 // The stream that this is associated with 78 stream: store::Key, 79 } 80 81 // ===== impl Prioritize ===== 82 83 impl Prioritize { new(config: &Config) -> Prioritize84 pub fn new(config: &Config) -> Prioritize { 85 let mut flow = FlowControl::new(); 86 87 flow.inc_window(config.remote_init_window_sz) 88 .expect("invalid initial window size"); 89 90 // TODO: proper error handling 91 let _res = flow.assign_capacity(config.remote_init_window_sz); 92 debug_assert!(_res.is_ok()); 93 94 tracing::trace!("Prioritize::new; flow={:?}", flow); 95 96 Prioritize { 97 pending_send: store::Queue::new(), 98 pending_capacity: store::Queue::new(), 99 pending_open: store::Queue::new(), 100 flow, 101 last_opened_id: StreamId::ZERO, 102 in_flight_data_frame: InFlightData::Nothing, 103 max_buffer_size: config.local_max_buffer_size, 104 } 105 } 106 max_buffer_size(&self) -> usize107 pub(crate) fn max_buffer_size(&self) -> usize { 108 self.max_buffer_size 109 } 110 111 /// Queue a frame to be sent to the remote queue_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, task: &mut Option<Waker>, )112 pub fn queue_frame<B>( 113 &mut self, 114 frame: Frame<B>, 115 buffer: &mut Buffer<Frame<B>>, 116 stream: &mut store::Ptr, 117 task: &mut Option<Waker>, 118 ) { 119 let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); 120 let _e = span.enter(); 121 // Queue the frame in the buffer 122 stream.pending_send.push_back(buffer, frame); 123 self.schedule_send(stream, task); 124 } 125 schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)126 pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { 127 // If the stream is waiting to be opened, nothing more to do. 128 if stream.is_send_ready() { 129 tracing::trace!(?stream.id, "schedule_send"); 130 // Queue the stream 131 self.pending_send.push(stream); 132 133 // Notify the connection. 134 if let Some(task) = task.take() { 135 task.wake(); 136 } 137 } 138 } 139 queue_open(&mut self, stream: &mut store::Ptr)140 pub fn queue_open(&mut self, stream: &mut store::Ptr) { 141 self.pending_open.push(stream); 142 } 143 144 /// Send a data frame send_data<B>( &mut self, frame: frame::Data<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, counts: &mut Counts, task: &mut Option<Waker>, ) -> Result<(), UserError> where B: Buf,145 pub fn send_data<B>( 146 &mut self, 147 frame: frame::Data<B>, 148 buffer: &mut Buffer<Frame<B>>, 149 stream: &mut store::Ptr, 150 counts: &mut Counts, 151 task: &mut Option<Waker>, 152 ) -> Result<(), UserError> 153 where 154 B: Buf, 155 { 156 let sz = frame.payload().remaining(); 157 158 if sz > MAX_WINDOW_SIZE as usize { 159 return Err(UserError::PayloadTooBig); 160 } 161 162 let sz = sz as WindowSize; 163 164 if !stream.state.is_send_streaming() { 165 if stream.state.is_closed() { 166 return Err(InactiveStreamId); 167 } else { 168 return Err(UnexpectedFrameType); 169 } 170 } 171 172 // Update the buffered data counter 173 stream.buffered_send_data += sz as usize; 174 175 let span = 176 tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity); 177 let _e = span.enter(); 178 tracing::trace!(buffered = stream.buffered_send_data); 179 180 // Implicitly request more send capacity if not enough has been 181 // requested yet. 182 if (stream.requested_send_capacity as usize) < stream.buffered_send_data { 183 // Update the target requested capacity 184 stream.requested_send_capacity = 185 cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; 186 187 // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity 188 // cannot be assigned at the time it is called. 189 // 190 // Streams over the max concurrent count will still call `send_data` so we should be 191 // careful not to put it into `pending_capacity` as it will starve the connection 192 // capacity for other streams 193 if !stream.is_pending_open { 194 self.try_assign_capacity(stream); 195 } 196 } 197 198 if frame.is_end_stream() { 199 stream.state.send_close(); 200 self.reserve_capacity(0, stream, counts); 201 } 202 203 tracing::trace!( 204 available = %stream.send_flow.available(), 205 buffered = stream.buffered_send_data, 206 ); 207 208 // The `stream.buffered_send_data == 0` check is here so that, if a zero 209 // length data frame is queued to the front (there is no previously 210 // queued data), it gets sent out immediately even if there is no 211 // available send window. 212 // 213 // Sending out zero length data frames can be done to signal 214 // end-of-stream. 215 // 216 if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { 217 // The stream currently has capacity to send the data frame, so 218 // queue it up and notify the connection task. 219 self.queue_frame(frame.into(), buffer, stream, task); 220 } else { 221 // The stream has no capacity to send the frame now, save it but 222 // don't notify the connection task. Once additional capacity 223 // becomes available, the frame will be flushed. 224 stream.pending_send.push_back(buffer, frame.into()); 225 } 226 227 Ok(()) 228 } 229 230 /// Request capacity to send data reserve_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, counts: &mut Counts, )231 pub fn reserve_capacity( 232 &mut self, 233 capacity: WindowSize, 234 stream: &mut store::Ptr, 235 counts: &mut Counts, 236 ) { 237 let span = tracing::trace_span!( 238 "reserve_capacity", 239 ?stream.id, 240 requested = capacity, 241 effective = (capacity as usize) + stream.buffered_send_data, 242 curr = stream.requested_send_capacity 243 ); 244 let _e = span.enter(); 245 246 // Actual capacity is `capacity` + the current amount of buffered data. 247 // If it were less, then we could never send out the buffered data. 248 let capacity = (capacity as usize) + stream.buffered_send_data; 249 250 match capacity.cmp(&(stream.requested_send_capacity as usize)) { 251 Ordering::Equal => { 252 // Nothing to do 253 } 254 Ordering::Less => { 255 // Update the target requested capacity 256 stream.requested_send_capacity = capacity as WindowSize; 257 258 // Currently available capacity assigned to the stream 259 let available = stream.send_flow.available().as_size(); 260 261 // If the stream has more assigned capacity than requested, reclaim 262 // some for the connection 263 if available as usize > capacity { 264 let diff = available - capacity as WindowSize; 265 266 // TODO: proper error handling 267 let _res = stream.send_flow.claim_capacity(diff); 268 debug_assert!(_res.is_ok()); 269 270 self.assign_connection_capacity(diff, stream, counts); 271 } 272 } 273 Ordering::Greater => { 274 // If trying to *add* capacity, but the stream send side is closed, 275 // there's nothing to be done. 276 if stream.state.is_send_closed() { 277 return; 278 } 279 280 // Update the target requested capacity 281 stream.requested_send_capacity = 282 cmp::min(capacity, WindowSize::MAX as usize) as WindowSize; 283 284 // Try to assign additional capacity to the stream. If none is 285 // currently available, the stream will be queued to receive some 286 // when more becomes available. 287 self.try_assign_capacity(stream); 288 } 289 } 290 } 291 recv_stream_window_update( &mut self, inc: WindowSize, stream: &mut store::Ptr, ) -> Result<(), Reason>292 pub fn recv_stream_window_update( 293 &mut self, 294 inc: WindowSize, 295 stream: &mut store::Ptr, 296 ) -> Result<(), Reason> { 297 let span = tracing::trace_span!( 298 "recv_stream_window_update", 299 ?stream.id, 300 ?stream.state, 301 inc, 302 flow = ?stream.send_flow 303 ); 304 let _e = span.enter(); 305 306 if stream.state.is_send_closed() && stream.buffered_send_data == 0 { 307 // We can't send any data, so don't bother doing anything else. 308 return Ok(()); 309 } 310 311 // Update the stream level flow control. 312 stream.send_flow.inc_window(inc)?; 313 314 // If the stream is waiting on additional capacity, then this will 315 // assign it (if available on the connection) and notify the producer 316 self.try_assign_capacity(stream); 317 318 Ok(()) 319 } 320 recv_connection_window_update( &mut self, inc: WindowSize, store: &mut Store, counts: &mut Counts, ) -> Result<(), Reason>321 pub fn recv_connection_window_update( 322 &mut self, 323 inc: WindowSize, 324 store: &mut Store, 325 counts: &mut Counts, 326 ) -> Result<(), Reason> { 327 // Update the connection's window 328 self.flow.inc_window(inc)?; 329 330 self.assign_connection_capacity(inc, store, counts); 331 Ok(()) 332 } 333 334 /// Reclaim all capacity assigned to the stream and re-assign it to the 335 /// connection reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)336 pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { 337 let available = stream.send_flow.available().as_size(); 338 if available > 0 { 339 // TODO: proper error handling 340 let _res = stream.send_flow.claim_capacity(available); 341 debug_assert!(_res.is_ok()); 342 // Re-assign all capacity to the connection 343 self.assign_connection_capacity(available, stream, counts); 344 } 345 } 346 347 /// Reclaim just reserved capacity, not buffered capacity, and re-assign 348 /// it to the connection reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)349 pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { 350 // only reclaim requested capacity that isn't already buffered 351 if stream.requested_send_capacity as usize > stream.buffered_send_data { 352 let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize; 353 354 // TODO: proper error handling 355 let _res = stream.send_flow.claim_capacity(reserved); 356 debug_assert!(_res.is_ok()); 357 self.assign_connection_capacity(reserved, stream, counts); 358 } 359 } 360 clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts)361 pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) { 362 let span = tracing::trace_span!("clear_pending_capacity"); 363 let _e = span.enter(); 364 while let Some(stream) = self.pending_capacity.pop(store) { 365 counts.transition(stream, |_, stream| { 366 tracing::trace!(?stream.id, "clear_pending_capacity"); 367 }) 368 } 369 } 370 assign_connection_capacity<R>( &mut self, inc: WindowSize, store: &mut R, counts: &mut Counts, ) where R: Resolve,371 pub fn assign_connection_capacity<R>( 372 &mut self, 373 inc: WindowSize, 374 store: &mut R, 375 counts: &mut Counts, 376 ) where 377 R: Resolve, 378 { 379 let span = tracing::trace_span!("assign_connection_capacity", inc); 380 let _e = span.enter(); 381 382 // TODO: proper error handling 383 let _res = self.flow.assign_capacity(inc); 384 debug_assert!(_res.is_ok()); 385 386 // Assign newly acquired capacity to streams pending capacity. 387 while self.flow.available() > 0 { 388 let stream = match self.pending_capacity.pop(store) { 389 Some(stream) => stream, 390 None => return, 391 }; 392 393 // Streams pending capacity may have been reset before capacity 394 // became available. In that case, the stream won't want any 395 // capacity, and so we shouldn't "transition" on it, but just evict 396 // it and continue the loop. 397 if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { 398 continue; 399 } 400 401 counts.transition(stream, |_, stream| { 402 // Try to assign capacity to the stream. This will also re-queue the 403 // stream if there isn't enough connection level capacity to fulfill 404 // the capacity request. 405 self.try_assign_capacity(stream); 406 }) 407 } 408 } 409 410 /// Request capacity to send data try_assign_capacity(&mut self, stream: &mut store::Ptr)411 fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { 412 let total_requested = stream.requested_send_capacity; 413 414 // Total requested should never go below actual assigned 415 // (Note: the window size can go lower than assigned) 416 debug_assert!(stream.send_flow.available() <= total_requested as usize); 417 418 // The amount of additional capacity that the stream requests. 419 // Don't assign more than the window has available! 420 let additional = cmp::min( 421 total_requested - stream.send_flow.available().as_size(), 422 // Can't assign more than what is available 423 stream.send_flow.window_size() - stream.send_flow.available().as_size(), 424 ); 425 let span = tracing::trace_span!("try_assign_capacity", ?stream.id); 426 let _e = span.enter(); 427 tracing::trace!( 428 requested = total_requested, 429 additional, 430 buffered = stream.buffered_send_data, 431 window = stream.send_flow.window_size(), 432 conn = %self.flow.available() 433 ); 434 435 if additional == 0 { 436 // Nothing more to do 437 return; 438 } 439 440 // If the stream has requested capacity, then it must be in the 441 // streaming state (more data could be sent) or there is buffered data 442 // waiting to be sent. 443 debug_assert!( 444 stream.state.is_send_streaming() || stream.buffered_send_data > 0, 445 "state={:?}", 446 stream.state 447 ); 448 449 // The amount of currently available capacity on the connection 450 let conn_available = self.flow.available().as_size(); 451 452 // First check if capacity is immediately available 453 if conn_available > 0 { 454 // The amount of capacity to assign to the stream 455 // TODO: Should prioritization factor into this? 456 let assign = cmp::min(conn_available, additional); 457 458 tracing::trace!(capacity = assign, "assigning"); 459 460 // Assign the capacity to the stream 461 stream.assign_capacity(assign, self.max_buffer_size); 462 463 // Claim the capacity from the connection 464 // TODO: proper error handling 465 let _res = self.flow.claim_capacity(assign); 466 debug_assert!(_res.is_ok()); 467 } 468 469 tracing::trace!( 470 available = %stream.send_flow.available(), 471 requested = stream.requested_send_capacity, 472 buffered = stream.buffered_send_data, 473 has_unavailable = %stream.send_flow.has_unavailable() 474 ); 475 476 if stream.send_flow.available() < stream.requested_send_capacity as usize 477 && stream.send_flow.has_unavailable() 478 { 479 // The stream requires additional capacity and the stream's 480 // window has available capacity, but the connection window 481 // does not. 482 // 483 // In this case, the stream needs to be queued up for when the 484 // connection has more capacity. 485 self.pending_capacity.push(stream); 486 } 487 488 // If data is buffered and the stream is send ready, then 489 // schedule the stream for execution 490 if stream.buffered_send_data > 0 && stream.is_send_ready() { 491 // TODO: This assertion isn't *exactly* correct. There can still be 492 // buffered send data while the stream's pending send queue is 493 // empty. This can happen when a large data frame is in the process 494 // of being **partially** sent. Once the window has been sent, the 495 // data frame will be returned to the prioritization layer to be 496 // re-scheduled. 497 // 498 // That said, it would be nice to figure out how to make this 499 // assertion correctly. 500 // 501 // debug_assert!(!stream.pending_send.is_empty()); 502 503 self.pending_send.push(stream); 504 } 505 } 506 poll_complete<T, B>( &mut self, cx: &mut Context, buffer: &mut Buffer<Frame<B>>, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,507 pub fn poll_complete<T, B>( 508 &mut self, 509 cx: &mut Context, 510 buffer: &mut Buffer<Frame<B>>, 511 store: &mut Store, 512 counts: &mut Counts, 513 dst: &mut Codec<T, Prioritized<B>>, 514 ) -> Poll<io::Result<()>> 515 where 516 T: AsyncWrite + Unpin, 517 B: Buf, 518 { 519 // Ensure codec is ready 520 ready!(dst.poll_ready(cx))?; 521 522 // Reclaim any frame that has previously been written 523 self.reclaim_frame(buffer, store, dst); 524 525 // The max frame length 526 let max_frame_len = dst.max_send_frame_size(); 527 528 tracing::trace!("poll_complete"); 529 530 loop { 531 if let Some(mut stream) = self.pop_pending_open(store, counts) { 532 self.pending_send.push_front(&mut stream); 533 self.try_assign_capacity(&mut stream); 534 } 535 536 match self.pop_frame(buffer, store, max_frame_len, counts) { 537 Some(frame) => { 538 tracing::trace!(?frame, "writing"); 539 540 debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); 541 if let Frame::Data(ref frame) = frame { 542 self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); 543 } 544 dst.buffer(frame).expect("invalid frame"); 545 546 // Ensure the codec is ready to try the loop again. 547 ready!(dst.poll_ready(cx))?; 548 549 // Because, always try to reclaim... 550 self.reclaim_frame(buffer, store, dst); 551 } 552 None => { 553 // Try to flush the codec. 554 ready!(dst.flush(cx))?; 555 556 // This might release a data frame... 557 if !self.reclaim_frame(buffer, store, dst) { 558 return Poll::Ready(Ok(())); 559 } 560 561 // No need to poll ready as poll_complete() does this for 562 // us... 563 } 564 } 565 } 566 } 567 568 /// Tries to reclaim a pending data frame from the codec. 569 /// 570 /// Returns true if a frame was reclaimed. 571 /// 572 /// When a data frame is written to the codec, it may not be written in its 573 /// entirety (large chunks are split up into potentially many data frames). 574 /// In this case, the stream needs to be reprioritized. reclaim_frame<T, B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, dst: &mut Codec<T, Prioritized<B>>, ) -> bool where B: Buf,575 fn reclaim_frame<T, B>( 576 &mut self, 577 buffer: &mut Buffer<Frame<B>>, 578 store: &mut Store, 579 dst: &mut Codec<T, Prioritized<B>>, 580 ) -> bool 581 where 582 B: Buf, 583 { 584 let span = tracing::trace_span!("try_reclaim_frame"); 585 let _e = span.enter(); 586 587 // First check if there are any data chunks to take back 588 if let Some(frame) = dst.take_last_data_frame() { 589 self.reclaim_frame_inner(buffer, store, frame) 590 } else { 591 false 592 } 593 } 594 reclaim_frame_inner<B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, frame: frame::Data<Prioritized<B>>, ) -> bool where B: Buf,595 fn reclaim_frame_inner<B>( 596 &mut self, 597 buffer: &mut Buffer<Frame<B>>, 598 store: &mut Store, 599 frame: frame::Data<Prioritized<B>>, 600 ) -> bool 601 where 602 B: Buf, 603 { 604 tracing::trace!( 605 ?frame, 606 sz = frame.payload().inner.get_ref().remaining(), 607 "reclaimed" 608 ); 609 610 let mut eos = false; 611 let key = frame.payload().stream; 612 613 match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { 614 InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"), 615 InFlightData::Drop => { 616 tracing::trace!("not reclaiming frame for cancelled stream"); 617 return false; 618 } 619 InFlightData::DataFrame(k) => { 620 debug_assert_eq!(k, key); 621 } 622 } 623 624 let mut frame = frame.map(|prioritized| { 625 // TODO: Ensure fully written 626 eos = prioritized.end_of_stream; 627 prioritized.inner.into_inner() 628 }); 629 630 if frame.payload().has_remaining() { 631 let mut stream = store.resolve(key); 632 633 if eos { 634 frame.set_end_stream(true); 635 } 636 637 self.push_back_frame(frame.into(), buffer, &mut stream); 638 639 return true; 640 } 641 642 false 643 } 644 645 /// Push the frame to the front of the stream's deque, scheduling the 646 /// stream if needed. push_back_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, )647 fn push_back_frame<B>( 648 &mut self, 649 frame: Frame<B>, 650 buffer: &mut Buffer<Frame<B>>, 651 stream: &mut store::Ptr, 652 ) { 653 // Push the frame to the front of the stream's deque 654 stream.pending_send.push_front(buffer, frame); 655 656 // If needed, schedule the sender 657 if stream.send_flow.available() > 0 { 658 debug_assert!(!stream.pending_send.is_empty()); 659 self.pending_send.push(stream); 660 } 661 } 662 clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr)663 pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { 664 let span = tracing::trace_span!("clear_queue", ?stream.id); 665 let _e = span.enter(); 666 667 // TODO: make this more efficient? 668 while let Some(frame) = stream.pending_send.pop_front(buffer) { 669 tracing::trace!(?frame, "dropping"); 670 } 671 672 stream.buffered_send_data = 0; 673 stream.requested_send_capacity = 0; 674 if let InFlightData::DataFrame(key) = self.in_flight_data_frame { 675 if stream.key() == key { 676 // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. 677 self.in_flight_data_frame = InFlightData::Drop; 678 } 679 } 680 } 681 clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts)682 pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) { 683 while let Some(stream) = self.pending_send.pop(store) { 684 let is_pending_reset = stream.is_pending_reset_expiration(); 685 counts.transition_after(stream, is_pending_reset); 686 } 687 } 688 clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts)689 pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { 690 while let Some(stream) = self.pending_open.pop(store) { 691 let is_pending_reset = stream.is_pending_reset_expiration(); 692 counts.transition_after(stream, is_pending_reset); 693 } 694 } 695 pop_frame<B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, max_len: usize, counts: &mut Counts, ) -> Option<Frame<Prioritized<B>>> where B: Buf,696 fn pop_frame<B>( 697 &mut self, 698 buffer: &mut Buffer<Frame<B>>, 699 store: &mut Store, 700 max_len: usize, 701 counts: &mut Counts, 702 ) -> Option<Frame<Prioritized<B>>> 703 where 704 B: Buf, 705 { 706 let span = tracing::trace_span!("pop_frame"); 707 let _e = span.enter(); 708 709 loop { 710 match self.pending_send.pop(store) { 711 Some(mut stream) => { 712 let span = tracing::trace_span!("popped", ?stream.id, ?stream.state); 713 let _e = span.enter(); 714 715 // It's possible that this stream, besides having data to send, 716 // is also queued to send a reset, and thus is already in the queue 717 // to wait for "some time" after a reset. 718 // 719 // To be safe, we just always ask the stream. 720 let is_pending_reset = stream.is_pending_reset_expiration(); 721 722 tracing::trace!(is_pending_reset); 723 724 let frame = match stream.pending_send.pop_front(buffer) { 725 Some(Frame::Data(mut frame)) => { 726 // Get the amount of capacity remaining for stream's 727 // window. 728 let stream_capacity = stream.send_flow.available(); 729 let sz = frame.payload().remaining(); 730 731 tracing::trace!( 732 sz, 733 eos = frame.is_end_stream(), 734 window = %stream_capacity, 735 available = %stream.send_flow.available(), 736 requested = stream.requested_send_capacity, 737 buffered = stream.buffered_send_data, 738 "data frame" 739 ); 740 741 // Zero length data frames always have capacity to 742 // be sent. 743 if sz > 0 && stream_capacity == 0 { 744 tracing::trace!("stream capacity is 0"); 745 746 // Ensure that the stream is waiting for 747 // connection level capacity 748 // 749 // TODO: uncomment 750 // debug_assert!(stream.is_pending_send_capacity); 751 752 // The stream has no more capacity, this can 753 // happen if the remote reduced the stream 754 // window. In this case, we need to buffer the 755 // frame and wait for a window update... 756 stream.pending_send.push_front(buffer, frame.into()); 757 758 continue; 759 } 760 761 // Only send up to the max frame length 762 let len = cmp::min(sz, max_len); 763 764 // Only send up to the stream's window capacity 765 let len = 766 cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; 767 768 // There *must* be be enough connection level 769 // capacity at this point. 770 debug_assert!(len <= self.flow.window_size()); 771 772 // Check if the stream level window the peer knows is available. In some 773 // scenarios, maybe the window we know is available but the window which 774 // peer knows is not. 775 if len > 0 && len > stream.send_flow.window_size() { 776 stream.pending_send.push_front(buffer, frame.into()); 777 continue; 778 } 779 780 tracing::trace!(len, "sending data frame"); 781 782 // Update the flow control 783 tracing::trace_span!("updating stream flow").in_scope(|| { 784 stream.send_data(len, self.max_buffer_size); 785 786 // Assign the capacity back to the connection that 787 // was just consumed from the stream in the previous 788 // line. 789 // TODO: proper error handling 790 let _res = self.flow.assign_capacity(len); 791 debug_assert!(_res.is_ok()); 792 }); 793 794 let (eos, len) = tracing::trace_span!("updating connection flow") 795 .in_scope(|| { 796 // TODO: proper error handling 797 let _res = self.flow.send_data(len); 798 debug_assert!(_res.is_ok()); 799 800 // Wrap the frame's data payload to ensure that the 801 // correct amount of data gets written. 802 803 let eos = frame.is_end_stream(); 804 let len = len as usize; 805 806 if frame.payload().remaining() > len { 807 frame.set_end_stream(false); 808 } 809 (eos, len) 810 }); 811 812 Frame::Data(frame.map(|buf| Prioritized { 813 inner: buf.take(len), 814 end_of_stream: eos, 815 stream: stream.key(), 816 })) 817 } 818 Some(Frame::PushPromise(pp)) => { 819 let mut pushed = 820 stream.store_mut().find_mut(&pp.promised_id()).unwrap(); 821 pushed.is_pending_push = false; 822 // Transition stream from pending_push to pending_open 823 // if possible 824 if !pushed.pending_send.is_empty() { 825 if counts.can_inc_num_send_streams() { 826 counts.inc_num_send_streams(&mut pushed); 827 self.pending_send.push(&mut pushed); 828 } else { 829 self.queue_open(&mut pushed); 830 } 831 } 832 Frame::PushPromise(pp) 833 } 834 Some(frame) => frame.map(|_| { 835 unreachable!( 836 "Frame::map closure will only be called \ 837 on DATA frames." 838 ) 839 }), 840 None => { 841 if let Some(reason) = stream.state.get_scheduled_reset() { 842 let stream_id = stream.id; 843 stream 844 .state 845 .set_reset(stream_id, reason, Initiator::Library); 846 847 let frame = frame::Reset::new(stream.id, reason); 848 Frame::Reset(frame) 849 } else { 850 // If the stream receives a RESET from the peer, it may have 851 // had data buffered to be sent, but all the frames are cleared 852 // in clear_queue(). Instead of doing O(N) traversal through queue 853 // to remove, lets just ignore the stream here. 854 tracing::trace!("removing dangling stream from pending_send"); 855 // Since this should only happen as a consequence of `clear_queue`, 856 // we must be in a closed state of some kind. 857 debug_assert!(stream.state.is_closed()); 858 counts.transition_after(stream, is_pending_reset); 859 continue; 860 } 861 } 862 }; 863 864 tracing::trace!("pop_frame; frame={:?}", frame); 865 866 if cfg!(debug_assertions) && stream.state.is_idle() { 867 debug_assert!(stream.id > self.last_opened_id); 868 self.last_opened_id = stream.id; 869 } 870 871 if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() { 872 // TODO: Only requeue the sender IF it is ready to send 873 // the next frame. i.e. don't requeue it if the next 874 // frame is a data frame and the stream does not have 875 // any more capacity. 876 self.pending_send.push(&mut stream); 877 } 878 879 counts.transition_after(stream, is_pending_reset); 880 881 return Some(frame); 882 } 883 None => return None, 884 } 885 } 886 } 887 pop_pending_open<'s>( &mut self, store: &'s mut Store, counts: &mut Counts, ) -> Option<store::Ptr<'s>>888 fn pop_pending_open<'s>( 889 &mut self, 890 store: &'s mut Store, 891 counts: &mut Counts, 892 ) -> Option<store::Ptr<'s>> { 893 tracing::trace!("schedule_pending_open"); 894 // check for any pending open streams 895 if counts.can_inc_num_send_streams() { 896 if let Some(mut stream) = self.pending_open.pop(store) { 897 tracing::trace!("schedule_pending_open; stream={:?}", stream.id); 898 899 counts.inc_num_send_streams(&mut stream); 900 stream.notify_send(); 901 return Some(stream); 902 } 903 } 904 905 None 906 } 907 } 908 909 // ===== impl Prioritized ===== 910 911 impl<B> Buf for Prioritized<B> 912 where 913 B: Buf, 914 { remaining(&self) -> usize915 fn remaining(&self) -> usize { 916 self.inner.remaining() 917 } 918 chunk(&self) -> &[u8]919 fn chunk(&self) -> &[u8] { 920 self.inner.chunk() 921 } 922 chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize923 fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { 924 self.inner.chunks_vectored(dst) 925 } 926 advance(&mut self, cnt: usize)927 fn advance(&mut self, cnt: usize) { 928 self.inner.advance(cnt) 929 } 930 } 931 932 impl<B: Buf> fmt::Debug for Prioritized<B> { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result933 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 934 fmt.debug_struct("Prioritized") 935 .field("remaining", &self.inner.get_ref().remaining()) 936 .field("end_of_stream", &self.end_of_stream) 937 .field("stream", &self.stream) 938 .finish() 939 } 940 } 941