1 use crate::codec::Codec; 2 use crate::frame::{self, Reason, StreamId}; 3 4 use bytes::Buf; 5 use std::io; 6 use std::task::{Context, Poll}; 7 use tokio::io::AsyncWrite; 8 9 /// Manages our sending of GOAWAY frames. 10 #[derive(Debug)] 11 pub(super) struct GoAway { 12 /// Whether the connection should close now, or wait until idle. 13 close_now: bool, 14 /// Records if we've sent any GOAWAY before. 15 going_away: Option<GoingAway>, 16 /// Whether the user started the GOAWAY by calling `abrupt_shutdown`. 17 is_user_initiated: bool, 18 /// A GOAWAY frame that must be buffered in the Codec immediately. 19 pending: Option<frame::GoAway>, 20 } 21 22 /// Keeps a memory of any GOAWAY frames we've sent before. 23 /// 24 /// This looks very similar to a `frame::GoAway`, but is a separate type. Why? 25 /// Mostly for documentation purposes. This type is to record status. If it 26 /// were a `frame::GoAway`, it might appear like we eventually wanted to 27 /// serialize it. We **only** want to be able to look up these fields at a 28 /// later time. 29 #[derive(Debug)] 30 pub(crate) struct GoingAway { 31 /// Stores the highest stream ID of a GOAWAY that has been sent. 32 /// 33 /// It's illegal to send a subsequent GOAWAY with a higher ID. 34 last_processed_id: StreamId, 35 36 /// Records the error code of any GOAWAY frame sent. 37 reason: Reason, 38 } 39 40 impl GoAway { new() -> Self41 pub fn new() -> Self { 42 GoAway { 43 close_now: false, 44 going_away: None, 45 is_user_initiated: false, 46 pending: None, 47 } 48 } 49 50 /// Enqueue a GOAWAY frame to be written. 51 /// 52 /// The connection is expected to continue to run until idle. go_away(&mut self, f: frame::GoAway)53 pub fn go_away(&mut self, f: frame::GoAway) { 54 if let Some(ref going_away) = self.going_away { 55 assert!( 56 f.last_stream_id() <= going_away.last_processed_id, 57 "GOAWAY stream IDs shouldn't be higher; \ 58 last_processed_id = {:?}, f.last_stream_id() = {:?}", 59 going_away.last_processed_id, 60 f.last_stream_id(), 61 ); 62 } 63 64 self.going_away = Some(GoingAway { 65 last_processed_id: f.last_stream_id(), 66 reason: f.reason(), 67 }); 68 self.pending = Some(f); 69 } 70 go_away_now(&mut self, f: frame::GoAway)71 pub fn go_away_now(&mut self, f: frame::GoAway) { 72 self.close_now = true; 73 if let Some(ref going_away) = self.going_away { 74 // Prevent sending the same GOAWAY twice. 75 if going_away.last_processed_id == f.last_stream_id() && going_away.reason == f.reason() 76 { 77 return; 78 } 79 } 80 self.go_away(f); 81 } 82 go_away_from_user(&mut self, f: frame::GoAway)83 pub fn go_away_from_user(&mut self, f: frame::GoAway) { 84 self.is_user_initiated = true; 85 self.go_away_now(f); 86 } 87 88 /// Return if a GOAWAY has ever been scheduled. is_going_away(&self) -> bool89 pub fn is_going_away(&self) -> bool { 90 self.going_away.is_some() 91 } 92 is_user_initiated(&self) -> bool93 pub fn is_user_initiated(&self) -> bool { 94 self.is_user_initiated 95 } 96 97 /// Returns the going away info, if any. going_away(&self) -> Option<&GoingAway>98 pub fn going_away(&self) -> Option<&GoingAway> { 99 self.going_away.as_ref() 100 } 101 102 /// Returns if the connection should close now, or wait until idle. should_close_now(&self) -> bool103 pub fn should_close_now(&self) -> bool { 104 self.pending.is_none() && self.close_now 105 } 106 107 /// Returns if the connection should be closed when idle. should_close_on_idle(&self) -> bool108 pub fn should_close_on_idle(&self) -> bool { 109 !self.close_now 110 && self 111 .going_away 112 .as_ref() 113 .map(|g| g.last_processed_id != StreamId::MAX) 114 .unwrap_or(false) 115 } 116 117 /// Try to write a pending GOAWAY frame to the buffer. 118 /// 119 /// If a frame is written, the `Reason` of the GOAWAY is returned. send_pending_go_away<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, B>, ) -> Poll<Option<io::Result<Reason>>> where T: AsyncWrite + Unpin, B: Buf,120 pub fn send_pending_go_away<T, B>( 121 &mut self, 122 cx: &mut Context, 123 dst: &mut Codec<T, B>, 124 ) -> Poll<Option<io::Result<Reason>>> 125 where 126 T: AsyncWrite + Unpin, 127 B: Buf, 128 { 129 if let Some(frame) = self.pending.take() { 130 if !dst.poll_ready(cx)?.is_ready() { 131 self.pending = Some(frame); 132 return Poll::Pending; 133 } 134 135 let reason = frame.reason(); 136 dst.buffer(frame.into()).expect("invalid GOAWAY frame"); 137 138 return Poll::Ready(Some(Ok(reason))); 139 } else if self.should_close_now() { 140 return match self.going_away().map(|going_away| going_away.reason) { 141 Some(reason) => Poll::Ready(Some(Ok(reason))), 142 None => Poll::Ready(None), 143 }; 144 } 145 146 Poll::Ready(None) 147 } 148 } 149 150 impl GoingAway { reason(&self) -> Reason151 pub(crate) fn reason(&self) -> Reason { 152 self.reason 153 } 154 } 155