1 use crate::codec::UserError; 2 use crate::frame::{Reason, StreamId}; 3 use crate::{client, server}; 4 5 use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE; 6 use crate::proto::*; 7 8 use bytes::Bytes; 9 use futures_core::Stream; 10 use std::io; 11 use std::marker::PhantomData; 12 use std::pin::Pin; 13 use std::task::{Context, Poll}; 14 use std::time::Duration; 15 use tokio::io::AsyncRead; 16 17 /// An H2 connection 18 #[derive(Debug)] 19 pub(crate) struct Connection<T, P, B: Buf = Bytes> 20 where 21 P: Peer, 22 { 23 /// Read / write frame values 24 codec: Codec<T, Prioritized<B>>, 25 26 inner: ConnectionInner<P, B>, 27 } 28 29 // Extracted part of `Connection` which does not depend on `T`. Reduces the amount of duplicated 30 // method instantiations. 31 #[derive(Debug)] 32 struct ConnectionInner<P, B: Buf = Bytes> 33 where 34 P: Peer, 35 { 36 /// Tracks the connection level state transitions. 37 state: State, 38 39 /// An error to report back once complete. 40 /// 41 /// This exists separately from State in order to support 42 /// graceful shutdown. 43 error: Option<frame::GoAway>, 44 45 /// Pending GOAWAY frames to write. 46 go_away: GoAway, 47 48 /// Ping/pong handler 49 ping_pong: PingPong, 50 51 /// Connection settings 52 settings: Settings, 53 54 /// Stream state handler 55 streams: Streams<B, P>, 56 57 /// A `tracing` span tracking the lifetime of the connection. 58 span: tracing::Span, 59 60 /// Client or server 61 _phantom: PhantomData<P>, 62 } 63 64 struct DynConnection<'a, B: Buf = Bytes> { 65 state: &'a mut State, 66 67 go_away: &'a mut GoAway, 68 69 streams: DynStreams<'a, B>, 70 71 error: &'a mut Option<frame::GoAway>, 72 73 ping_pong: &'a mut PingPong, 74 } 75 76 #[derive(Debug, Clone)] 77 pub(crate) struct Config { 78 pub next_stream_id: StreamId, 79 pub initial_max_send_streams: usize, 80 pub max_send_buffer_size: usize, 81 pub reset_stream_duration: Duration, 82 pub reset_stream_max: usize, 83 pub remote_reset_stream_max: usize, 84 pub local_error_reset_streams_max: Option<usize>, 85 pub settings: frame::Settings, 86 } 87 88 #[derive(Debug)] 89 enum State { 90 /// Currently open in a sane state 91 Open, 92 93 /// The codec must be flushed 94 Closing(Reason, Initiator), 95 96 /// In a closed state 97 Closed(Reason, Initiator), 98 } 99 100 impl<T, P, B> Connection<T, P, B> 101 where 102 T: AsyncRead + AsyncWrite + Unpin, 103 P: Peer, 104 B: Buf, 105 { new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B>106 pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> { 107 fn streams_config(config: &Config) -> streams::Config { 108 streams::Config { 109 initial_max_send_streams: config.initial_max_send_streams, 110 local_max_buffer_size: config.max_send_buffer_size, 111 local_next_stream_id: config.next_stream_id, 112 local_push_enabled: config.settings.is_push_enabled().unwrap_or(true), 113 extended_connect_protocol_enabled: config 114 .settings 115 .is_extended_connect_protocol_enabled() 116 .unwrap_or(false), 117 local_reset_duration: config.reset_stream_duration, 118 local_reset_max: config.reset_stream_max, 119 remote_reset_max: config.remote_reset_stream_max, 120 remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, 121 remote_max_initiated: config 122 .settings 123 .max_concurrent_streams() 124 .map(|max| max as usize), 125 local_max_error_reset_streams: config.local_error_reset_streams_max, 126 } 127 } 128 let streams = Streams::new(streams_config(&config)); 129 Connection { 130 codec, 131 inner: ConnectionInner { 132 state: State::Open, 133 error: None, 134 go_away: GoAway::new(), 135 ping_pong: PingPong::new(), 136 settings: Settings::new(config.settings), 137 streams, 138 span: tracing::debug_span!("Connection", peer = %P::NAME), 139 _phantom: PhantomData, 140 }, 141 } 142 } 143 144 /// connection flow control set_target_window_size(&mut self, size: WindowSize)145 pub(crate) fn set_target_window_size(&mut self, size: WindowSize) { 146 let _res = self.inner.streams.set_target_connection_window_size(size); 147 // TODO: proper error handling 148 debug_assert!(_res.is_ok()); 149 } 150 151 /// Send a new SETTINGS frame with an updated initial window size. set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError>152 pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> { 153 let mut settings = frame::Settings::default(); 154 settings.set_initial_window_size(Some(size)); 155 self.inner.settings.send_settings(settings) 156 } 157 158 /// Send a new SETTINGS frame with extended CONNECT protocol enabled. set_enable_connect_protocol(&mut self) -> Result<(), UserError>159 pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> { 160 let mut settings = frame::Settings::default(); 161 settings.set_enable_connect_protocol(Some(1)); 162 self.inner.settings.send_settings(settings) 163 } 164 165 /// Returns the maximum number of concurrent streams that may be initiated 166 /// by this peer. max_send_streams(&self) -> usize167 pub(crate) fn max_send_streams(&self) -> usize { 168 self.inner.streams.max_send_streams() 169 } 170 171 /// Returns the maximum number of concurrent streams that may be initiated 172 /// by the remote peer. max_recv_streams(&self) -> usize173 pub(crate) fn max_recv_streams(&self) -> usize { 174 self.inner.streams.max_recv_streams() 175 } 176 177 #[cfg(feature = "unstable")] num_wired_streams(&self) -> usize178 pub fn num_wired_streams(&self) -> usize { 179 self.inner.streams.num_wired_streams() 180 } 181 182 /// Returns `Ready` when the connection is ready to receive a frame. 183 /// 184 /// Returns `Error` as this may raise errors that are caused by delayed 185 /// processing of received frames. poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>>186 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> { 187 let _e = self.inner.span.enter(); 188 let span = tracing::trace_span!("poll_ready"); 189 let _e = span.enter(); 190 // The order of these calls don't really matter too much 191 ready!(self.inner.ping_pong.send_pending_pong(cx, &mut self.codec))?; 192 ready!(self.inner.ping_pong.send_pending_ping(cx, &mut self.codec))?; 193 ready!(self 194 .inner 195 .settings 196 .poll_send(cx, &mut self.codec, &mut self.inner.streams))?; 197 ready!(self.inner.streams.send_pending_refusal(cx, &mut self.codec))?; 198 199 Poll::Ready(Ok(())) 200 } 201 202 /// Send any pending GOAWAY frames. 203 /// 204 /// This will return `Some(reason)` if the connection should be closed 205 /// afterwards. If this is a graceful shutdown, this returns `None`. poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>>206 fn poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>> { 207 self.inner.go_away.send_pending_go_away(cx, &mut self.codec) 208 } 209 go_away_from_user(&mut self, e: Reason)210 pub fn go_away_from_user(&mut self, e: Reason) { 211 self.inner.as_dyn().go_away_from_user(e) 212 } 213 take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error>214 fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> { 215 let (debug_data, theirs) = self 216 .inner 217 .error 218 .take() 219 .as_ref() 220 .map_or((Bytes::new(), Reason::NO_ERROR), |frame| { 221 (frame.debug_data().clone(), frame.reason()) 222 }); 223 224 match (ours, theirs) { 225 (Reason::NO_ERROR, Reason::NO_ERROR) => Ok(()), 226 (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)), 227 // If both sides reported an error, give their 228 // error back to th user. We assume our error 229 // was a consequence of their error, and less 230 // important. 231 (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)), 232 } 233 } 234 235 /// Closes the connection by transitioning to a GOAWAY state 236 /// iff there are no streams or references maybe_close_connection_if_no_streams(&mut self)237 pub fn maybe_close_connection_if_no_streams(&mut self) { 238 // If we poll() and realize that there are no streams or references 239 // then we can close the connection by transitioning to GOAWAY 240 if !self.inner.streams.has_streams_or_other_references() { 241 self.inner.as_dyn().go_away_now(Reason::NO_ERROR); 242 } 243 } 244 take_user_pings(&mut self) -> Option<UserPings>245 pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> { 246 self.inner.ping_pong.take_user_pings() 247 } 248 249 /// Advances the internal state of the connection. poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>>250 pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> { 251 // XXX(eliza): cloning the span is unfortunately necessary here in 252 // order to placate the borrow checker — `self` is mutably borrowed by 253 // `poll2`, which means that we can't borrow `self.span` to enter it. 254 // The clone is just an atomic ref bump. 255 let span = self.inner.span.clone(); 256 let _e = span.enter(); 257 let span = tracing::trace_span!("poll"); 258 let _e = span.enter(); 259 260 loop { 261 tracing::trace!(connection.state = ?self.inner.state); 262 // TODO: probably clean up this glob of code 263 match self.inner.state { 264 // When open, continue to poll a frame 265 State::Open => { 266 let result = match self.poll2(cx) { 267 Poll::Ready(result) => result, 268 // The connection is not ready to make progress 269 Poll::Pending => { 270 // Ensure all window updates have been sent. 271 // 272 // This will also handle flushing `self.codec` 273 ready!(self.inner.streams.poll_complete(cx, &mut self.codec))?; 274 275 if (self.inner.error.is_some() 276 || self.inner.go_away.should_close_on_idle()) 277 && !self.inner.streams.has_streams() 278 { 279 self.inner.as_dyn().go_away_now(Reason::NO_ERROR); 280 continue; 281 } 282 283 return Poll::Pending; 284 } 285 }; 286 287 self.inner.as_dyn().handle_poll2_result(result)? 288 } 289 State::Closing(reason, initiator) => { 290 tracing::trace!("connection closing after flush"); 291 // Flush/shutdown the codec 292 ready!(self.codec.shutdown(cx))?; 293 294 // Transition the state to error 295 self.inner.state = State::Closed(reason, initiator); 296 } 297 State::Closed(reason, initiator) => { 298 return Poll::Ready(self.take_error(reason, initiator)); 299 } 300 } 301 } 302 } 303 poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>>304 fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> { 305 // This happens outside of the loop to prevent needing to do a clock 306 // check and then comparison of the queue possibly multiple times a 307 // second (and thus, the clock wouldn't have changed enough to matter). 308 self.clear_expired_reset_streams(); 309 310 loop { 311 // First, ensure that the `Connection` is able to receive a frame 312 // 313 // The order here matters: 314 // - poll_go_away may buffer a graceful shutdown GOAWAY frame 315 // - If it has, we've also added a PING to be sent in poll_ready 316 if let Some(reason) = ready!(self.poll_go_away(cx)?) { 317 if self.inner.go_away.should_close_now() { 318 if self.inner.go_away.is_user_initiated() { 319 // A user initiated abrupt shutdown shouldn't return 320 // the same error back to the user. 321 return Poll::Ready(Ok(())); 322 } else { 323 return Poll::Ready(Err(Error::library_go_away(reason))); 324 } 325 } 326 // Only NO_ERROR should be waiting for idle 327 debug_assert_eq!( 328 reason, 329 Reason::NO_ERROR, 330 "graceful GOAWAY should be NO_ERROR" 331 ); 332 } 333 ready!(self.poll_ready(cx))?; 334 335 match self 336 .inner 337 .as_dyn() 338 .recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))? 339 { 340 ReceivedFrame::Settings(frame) => { 341 self.inner.settings.recv_settings( 342 frame, 343 &mut self.codec, 344 &mut self.inner.streams, 345 )?; 346 } 347 ReceivedFrame::Continue => (), 348 ReceivedFrame::Done => { 349 return Poll::Ready(Ok(())); 350 } 351 } 352 } 353 } 354 clear_expired_reset_streams(&mut self)355 fn clear_expired_reset_streams(&mut self) { 356 self.inner.streams.clear_expired_reset_streams(); 357 } 358 } 359 360 impl<P, B> ConnectionInner<P, B> 361 where 362 P: Peer, 363 B: Buf, 364 { as_dyn(&mut self) -> DynConnection<'_, B>365 fn as_dyn(&mut self) -> DynConnection<'_, B> { 366 let ConnectionInner { 367 state, 368 go_away, 369 streams, 370 error, 371 ping_pong, 372 .. 373 } = self; 374 let streams = streams.as_dyn(); 375 DynConnection { 376 state, 377 go_away, 378 streams, 379 error, 380 ping_pong, 381 } 382 } 383 } 384 385 impl<B> DynConnection<'_, B> 386 where 387 B: Buf, 388 { go_away(&mut self, id: StreamId, e: Reason)389 fn go_away(&mut self, id: StreamId, e: Reason) { 390 let frame = frame::GoAway::new(id, e); 391 self.streams.send_go_away(id); 392 self.go_away.go_away(frame); 393 } 394 go_away_now(&mut self, e: Reason)395 fn go_away_now(&mut self, e: Reason) { 396 let last_processed_id = self.streams.last_processed_id(); 397 let frame = frame::GoAway::new(last_processed_id, e); 398 self.go_away.go_away_now(frame); 399 } 400 go_away_now_data(&mut self, e: Reason, data: Bytes)401 fn go_away_now_data(&mut self, e: Reason, data: Bytes) { 402 let last_processed_id = self.streams.last_processed_id(); 403 let frame = frame::GoAway::with_debug_data(last_processed_id, e, data); 404 self.go_away.go_away_now(frame); 405 } 406 go_away_from_user(&mut self, e: Reason)407 fn go_away_from_user(&mut self, e: Reason) { 408 let last_processed_id = self.streams.last_processed_id(); 409 let frame = frame::GoAway::new(last_processed_id, e); 410 self.go_away.go_away_from_user(frame); 411 412 // Notify all streams of reason we're abruptly closing. 413 self.streams.handle_error(Error::user_go_away(e)); 414 } 415 handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error>416 fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> { 417 match result { 418 // The connection has shutdown normally 419 Ok(()) => { 420 *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library); 421 Ok(()) 422 } 423 // Attempting to read a frame resulted in a connection level 424 // error. This is handled by setting a GOAWAY frame followed by 425 // terminating the connection. 426 Err(Error::GoAway(debug_data, reason, initiator)) => { 427 let e = Error::GoAway(debug_data.clone(), reason, initiator); 428 tracing::debug!(error = ?e, "Connection::poll; connection error"); 429 430 // We may have already sent a GOAWAY for this error, 431 // if so, don't send another, just flush and close up. 432 if self 433 .go_away 434 .going_away() 435 .map_or(false, |frame| frame.reason() == reason) 436 { 437 tracing::trace!(" -> already going away"); 438 *self.state = State::Closing(reason, initiator); 439 return Ok(()); 440 } 441 442 // Reset all active streams 443 self.streams.handle_error(e); 444 self.go_away_now_data(reason, debug_data); 445 Ok(()) 446 } 447 // Attempting to read a frame resulted in a stream level error. 448 // This is handled by resetting the frame then trying to read 449 // another frame. 450 Err(Error::Reset(id, reason, initiator)) => { 451 debug_assert_eq!(initiator, Initiator::Library); 452 tracing::trace!(?id, ?reason, "stream error"); 453 self.streams.send_reset(id, reason); 454 Ok(()) 455 } 456 // Attempting to read a frame resulted in an I/O error. All 457 // active streams must be reset. 458 // 459 // TODO: Are I/O errors recoverable? 460 Err(Error::Io(kind, inner)) => { 461 tracing::debug!(error = ?kind, "Connection::poll; IO error"); 462 let e = Error::Io(kind, inner); 463 464 // Reset all active streams 465 self.streams.handle_error(e.clone()); 466 467 // Some client implementations drop the connections without notifying its peer 468 // Attempting to read after the client dropped the connection results in UnexpectedEof 469 // If as a server, we don't have anything more to send, just close the connection 470 // without error 471 // 472 // See https://github.com/hyperium/hyper/issues/3427 473 if self.streams.is_server() 474 && self.streams.is_buffer_empty() 475 && matches!(kind, io::ErrorKind::UnexpectedEof) 476 { 477 *self.state = State::Closed(Reason::NO_ERROR, Initiator::Library); 478 return Ok(()); 479 } 480 481 // Return the error 482 Err(e) 483 } 484 } 485 } 486 recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error>487 fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> { 488 use crate::frame::Frame::*; 489 match frame { 490 Some(Headers(frame)) => { 491 tracing::trace!(?frame, "recv HEADERS"); 492 self.streams.recv_headers(frame)?; 493 } 494 Some(Data(frame)) => { 495 tracing::trace!(?frame, "recv DATA"); 496 self.streams.recv_data(frame)?; 497 } 498 Some(Reset(frame)) => { 499 tracing::trace!(?frame, "recv RST_STREAM"); 500 self.streams.recv_reset(frame)?; 501 } 502 Some(PushPromise(frame)) => { 503 tracing::trace!(?frame, "recv PUSH_PROMISE"); 504 self.streams.recv_push_promise(frame)?; 505 } 506 Some(Settings(frame)) => { 507 tracing::trace!(?frame, "recv SETTINGS"); 508 return Ok(ReceivedFrame::Settings(frame)); 509 } 510 Some(GoAway(frame)) => { 511 tracing::trace!(?frame, "recv GOAWAY"); 512 // This should prevent starting new streams, 513 // but should allow continuing to process current streams 514 // until they are all EOS. Once they are, State should 515 // transition to GoAway. 516 self.streams.recv_go_away(&frame)?; 517 *self.error = Some(frame); 518 } 519 Some(Ping(frame)) => { 520 tracing::trace!(?frame, "recv PING"); 521 let status = self.ping_pong.recv_ping(frame); 522 if status.is_shutdown() { 523 assert!( 524 self.go_away.is_going_away(), 525 "received unexpected shutdown ping" 526 ); 527 528 let last_processed_id = self.streams.last_processed_id(); 529 self.go_away(last_processed_id, Reason::NO_ERROR); 530 } 531 } 532 Some(WindowUpdate(frame)) => { 533 tracing::trace!(?frame, "recv WINDOW_UPDATE"); 534 self.streams.recv_window_update(frame)?; 535 } 536 Some(Priority(frame)) => { 537 tracing::trace!(?frame, "recv PRIORITY"); 538 // TODO: handle 539 } 540 None => { 541 tracing::trace!("codec closed"); 542 self.streams.recv_eof(false).expect("mutex poisoned"); 543 return Ok(ReceivedFrame::Done); 544 } 545 } 546 Ok(ReceivedFrame::Continue) 547 } 548 } 549 550 enum ReceivedFrame { 551 Settings(frame::Settings), 552 Continue, 553 Done, 554 } 555 556 impl<T, B> Connection<T, client::Peer, B> 557 where 558 T: AsyncRead + AsyncWrite, 559 B: Buf, 560 { streams(&self) -> &Streams<B, client::Peer>561 pub(crate) fn streams(&self) -> &Streams<B, client::Peer> { 562 &self.inner.streams 563 } 564 } 565 566 impl<T, B> Connection<T, server::Peer, B> 567 where 568 T: AsyncRead + AsyncWrite + Unpin, 569 B: Buf, 570 { next_incoming(&mut self) -> Option<StreamRef<B>>571 pub fn next_incoming(&mut self) -> Option<StreamRef<B>> { 572 self.inner.streams.next_incoming() 573 } 574 575 // Graceful shutdown only makes sense for server peers. go_away_gracefully(&mut self)576 pub fn go_away_gracefully(&mut self) { 577 if self.inner.go_away.is_going_away() { 578 // No reason to start a new one. 579 return; 580 } 581 582 // According to http://httpwg.org/specs/rfc7540.html#GOAWAY: 583 // 584 // > A server that is attempting to gracefully shut down a connection 585 // > SHOULD send an initial GOAWAY frame with the last stream 586 // > identifier set to 2^31-1 and a NO_ERROR code. This signals to the 587 // > client that a shutdown is imminent and that initiating further 588 // > requests is prohibited. After allowing time for any in-flight 589 // > stream creation (at least one round-trip time), the server can 590 // > send another GOAWAY frame with an updated last stream identifier. 591 // > This ensures that a connection can be cleanly shut down without 592 // > losing requests. 593 self.inner.as_dyn().go_away(StreamId::MAX, Reason::NO_ERROR); 594 595 // We take the advice of waiting 1 RTT literally, and wait 596 // for a pong before proceeding. 597 self.inner.ping_pong.ping_shutdown(); 598 } 599 } 600 601 impl<T, P, B> Drop for Connection<T, P, B> 602 where 603 P: Peer, 604 B: Buf, 605 { drop(&mut self)606 fn drop(&mut self) { 607 // Ignore errors as this indicates that the mutex is poisoned. 608 let _ = self.inner.streams.recv_eof(true); 609 } 610 } 611