1 use crate::codec::UserError; 2 use crate::error::Reason; 3 use crate::proto::*; 4 use std::task::{Context, Poll}; 5 6 #[derive(Debug)] 7 pub(crate) struct Settings { 8 /// Our local SETTINGS sync state with the remote. 9 local: Local, 10 /// Received SETTINGS frame pending processing. The ACK must be written to 11 /// the socket first then the settings applied **before** receiving any 12 /// further frames. 13 remote: Option<frame::Settings>, 14 /// Whether the connection has received the initial SETTINGS frame from the 15 /// remote peer. 16 has_received_remote_initial_settings: bool, 17 } 18 19 #[derive(Debug)] 20 enum Local { 21 /// We want to send these SETTINGS to the remote when the socket is ready. 22 ToSend(frame::Settings), 23 /// We have sent these SETTINGS and are waiting for the remote to ACK 24 /// before we apply them. 25 WaitingAck(frame::Settings), 26 /// Our local settings are in sync with the remote. 27 Synced, 28 } 29 30 impl Settings { new(local: frame::Settings) -> Self31 pub(crate) fn new(local: frame::Settings) -> Self { 32 Settings { 33 // We assume the initial local SETTINGS were flushed during 34 // the handshake process. 35 local: Local::WaitingAck(local), 36 remote: None, 37 has_received_remote_initial_settings: false, 38 } 39 } 40 recv_settings<T, B, C, P>( &mut self, frame: frame::Settings, codec: &mut Codec<T, B>, streams: &mut Streams<C, P>, ) -> Result<(), Error> where T: AsyncWrite + Unpin, B: Buf, C: Buf, P: Peer,41 pub(crate) fn recv_settings<T, B, C, P>( 42 &mut self, 43 frame: frame::Settings, 44 codec: &mut Codec<T, B>, 45 streams: &mut Streams<C, P>, 46 ) -> Result<(), Error> 47 where 48 T: AsyncWrite + Unpin, 49 B: Buf, 50 C: Buf, 51 P: Peer, 52 { 53 if frame.is_ack() { 54 match &self.local { 55 Local::WaitingAck(local) => { 56 tracing::debug!("received settings ACK; applying {:?}", local); 57 58 if let Some(max) = local.max_frame_size() { 59 codec.set_max_recv_frame_size(max as usize); 60 } 61 62 if let Some(max) = local.max_header_list_size() { 63 codec.set_max_recv_header_list_size(max as usize); 64 } 65 66 if let Some(val) = local.header_table_size() { 67 codec.set_recv_header_table_size(val as usize); 68 } 69 70 streams.apply_local_settings(local)?; 71 self.local = Local::Synced; 72 Ok(()) 73 } 74 Local::ToSend(..) | Local::Synced => { 75 // We haven't sent any SETTINGS frames to be ACKed, so 76 // this is very bizarre! Remote is either buggy or malicious. 77 proto_err!(conn: "received unexpected settings ack"); 78 Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) 79 } 80 } 81 } else { 82 // We always ACK before reading more frames, so `remote` should 83 // always be none! 84 assert!(self.remote.is_none()); 85 self.remote = Some(frame); 86 Ok(()) 87 } 88 } 89 send_settings(&mut self, frame: frame::Settings) -> Result<(), UserError>90 pub(crate) fn send_settings(&mut self, frame: frame::Settings) -> Result<(), UserError> { 91 assert!(!frame.is_ack()); 92 match &self.local { 93 Local::ToSend(..) | Local::WaitingAck(..) => Err(UserError::SendSettingsWhilePending), 94 Local::Synced => { 95 tracing::trace!("queue to send local settings: {:?}", frame); 96 self.local = Local::ToSend(frame); 97 Ok(()) 98 } 99 } 100 } 101 102 /// Sets `true` to `self.has_received_remote_initial_settings`. 103 /// Returns `true` if this method is called for the first time. 104 /// (i.e. it is the initial SETTINGS frame from the remote peer) mark_remote_initial_settings_as_received(&mut self) -> bool105 fn mark_remote_initial_settings_as_received(&mut self) -> bool { 106 let has_received = self.has_received_remote_initial_settings; 107 self.has_received_remote_initial_settings = true; 108 !has_received 109 } 110 poll_send<T, B, C, P>( &mut self, cx: &mut Context, dst: &mut Codec<T, B>, streams: &mut Streams<C, P>, ) -> Poll<Result<(), Error>> where T: AsyncWrite + Unpin, B: Buf, C: Buf, P: Peer,111 pub(crate) fn poll_send<T, B, C, P>( 112 &mut self, 113 cx: &mut Context, 114 dst: &mut Codec<T, B>, 115 streams: &mut Streams<C, P>, 116 ) -> Poll<Result<(), Error>> 117 where 118 T: AsyncWrite + Unpin, 119 B: Buf, 120 C: Buf, 121 P: Peer, 122 { 123 if let Some(settings) = self.remote.clone() { 124 if !dst.poll_ready(cx)?.is_ready() { 125 return Poll::Pending; 126 } 127 128 // Create an ACK settings frame 129 let frame = frame::Settings::ack(); 130 131 // Buffer the settings frame 132 dst.buffer(frame.into()).expect("invalid settings frame"); 133 134 tracing::trace!("ACK sent; applying settings"); 135 136 let is_initial = self.mark_remote_initial_settings_as_received(); 137 streams.apply_remote_settings(&settings, is_initial)?; 138 139 if let Some(val) = settings.header_table_size() { 140 dst.set_send_header_table_size(val as usize); 141 } 142 143 if let Some(val) = settings.max_frame_size() { 144 dst.set_max_send_frame_size(val as usize); 145 } 146 } 147 148 self.remote = None; 149 150 match &self.local { 151 Local::ToSend(settings) => { 152 if !dst.poll_ready(cx)?.is_ready() { 153 return Poll::Pending; 154 } 155 156 // Buffer the settings frame 157 dst.buffer(settings.clone().into()) 158 .expect("invalid settings frame"); 159 tracing::trace!("local settings sent; waiting for ack: {:?}", settings); 160 161 self.local = Local::WaitingAck(settings.clone()); 162 } 163 Local::WaitingAck(..) | Local::Synced => {} 164 } 165 166 Poll::Ready(Ok(())) 167 } 168 } 169