1 //! Generic WebSocket message stream. 2 3 pub mod frame; 4 5 mod message; 6 7 pub use self::{frame::CloseFrame, message::Message}; 8 9 use self::{ 10 frame::{ 11 coding::{CloseCode, Control as OpCtl, Data as OpData, OpCode}, 12 Frame, FrameCodec, 13 }, 14 message::{IncompleteMessage, IncompleteMessageType}, 15 }; 16 use crate::error::{Error, ProtocolError, Result}; 17 use log::*; 18 use std::{ 19 io::{self, Read, Write}, 20 mem::replace, 21 }; 22 23 /// Indicates a Client or Server role of the websocket 24 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 25 pub enum Role { 26 /// This socket is a server 27 Server, 28 /// This socket is a client 29 Client, 30 } 31 32 /// The configuration for WebSocket connection. 33 #[derive(Debug, Clone, Copy)] 34 pub struct WebSocketConfig { 35 /// Does nothing, instead use `max_write_buffer_size`. 36 #[deprecated] 37 pub max_send_queue: Option<usize>, 38 /// The target minimum size of the write buffer to reach before writing the data 39 /// to the underlying stream. 40 /// The default value is 128 KiB. 41 /// 42 /// If set to `0` each message will be eagerly written to the underlying stream. 43 /// It is often more optimal to allow them to buffer a little, hence the default value. 44 /// 45 /// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless. 46 pub write_buffer_size: usize, 47 /// The max size of the write buffer in bytes. Setting this can provide backpressure 48 /// in the case the write buffer is filling up due to write errors. 49 /// The default value is unlimited. 50 /// 51 /// Note: The write buffer only builds up past [`write_buffer_size`](Self::write_buffer_size) 52 /// when writes to the underlying stream are failing. So the **write buffer can not 53 /// fill up if you are not observing write errors even if not flushing**. 54 /// 55 /// Note: Should always be at least [`write_buffer_size + 1 message`](Self::write_buffer_size) 56 /// and probably a little more depending on error handling strategy. 57 pub max_write_buffer_size: usize, 58 /// The maximum size of an incoming message. `None` means no size limit. The default value is 64 MiB 59 /// which should be reasonably big for all normal use-cases but small enough to prevent 60 /// memory eating by a malicious user. 61 pub max_message_size: Option<usize>, 62 /// The maximum size of a single incoming message frame. `None` means no size limit. The limit is for 63 /// frame payload NOT including the frame header. The default value is 16 MiB which should 64 /// be reasonably big for all normal use-cases but small enough to prevent memory eating 65 /// by a malicious user. 66 pub max_frame_size: Option<usize>, 67 /// When set to `true`, the server will accept and handle unmasked frames 68 /// from the client. According to the RFC 6455, the server must close the 69 /// connection to the client in such cases, however it seems like there are 70 /// some popular libraries that are sending unmasked frames, ignoring the RFC. 71 /// By default this option is set to `false`, i.e. according to RFC 6455. 72 pub accept_unmasked_frames: bool, 73 } 74 75 impl Default for WebSocketConfig { default() -> Self76 fn default() -> Self { 77 #[allow(deprecated)] 78 WebSocketConfig { 79 max_send_queue: None, 80 write_buffer_size: 128 * 1024, 81 max_write_buffer_size: usize::MAX, 82 max_message_size: Some(64 << 20), 83 max_frame_size: Some(16 << 20), 84 accept_unmasked_frames: false, 85 } 86 } 87 } 88 89 impl WebSocketConfig { 90 /// Panic if values are invalid. assert_valid(&self)91 pub(crate) fn assert_valid(&self) { 92 assert!( 93 self.max_write_buffer_size > self.write_buffer_size, 94 "WebSocketConfig::max_write_buffer_size must be greater than write_buffer_size, \ 95 see WebSocketConfig docs`" 96 ); 97 } 98 } 99 100 /// WebSocket input-output stream. 101 /// 102 /// This is THE structure you want to create to be able to speak the WebSocket protocol. 103 /// It may be created by calling `connect`, `accept` or `client` functions. 104 /// 105 /// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages. 106 #[derive(Debug)] 107 pub struct WebSocket<Stream> { 108 /// The underlying socket. 109 socket: Stream, 110 /// The context for managing a WebSocket. 111 context: WebSocketContext, 112 } 113 114 impl<Stream> WebSocket<Stream> { 115 /// Convert a raw socket into a WebSocket without performing a handshake. 116 /// 117 /// Call this function if you're using Tungstenite as a part of a web framework 118 /// or together with an existing one. If you need an initial handshake, use 119 /// `connect()` or `accept()` functions of the crate to construct a websocket. 120 /// 121 /// # Panics 122 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self123 pub fn from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self { 124 WebSocket { socket: stream, context: WebSocketContext::new(role, config) } 125 } 126 127 /// Convert a raw socket into a WebSocket without performing a handshake. 128 /// 129 /// Call this function if you're using Tungstenite as a part of a web framework 130 /// or together with an existing one. If you need an initial handshake, use 131 /// `connect()` or `accept()` functions of the crate to construct a websocket. 132 /// 133 /// # Panics 134 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. from_partially_read( stream: Stream, part: Vec<u8>, role: Role, config: Option<WebSocketConfig>, ) -> Self135 pub fn from_partially_read( 136 stream: Stream, 137 part: Vec<u8>, 138 role: Role, 139 config: Option<WebSocketConfig>, 140 ) -> Self { 141 WebSocket { 142 socket: stream, 143 context: WebSocketContext::from_partially_read(part, role, config), 144 } 145 } 146 147 /// Returns a shared reference to the inner stream. get_ref(&self) -> &Stream148 pub fn get_ref(&self) -> &Stream { 149 &self.socket 150 } 151 /// Returns a mutable reference to the inner stream. get_mut(&mut self) -> &mut Stream152 pub fn get_mut(&mut self) -> &mut Stream { 153 &mut self.socket 154 } 155 156 /// Change the configuration. 157 /// 158 /// # Panics 159 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig))160 pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { 161 self.context.set_config(set_func) 162 } 163 164 /// Read the configuration. get_config(&self) -> &WebSocketConfig165 pub fn get_config(&self) -> &WebSocketConfig { 166 self.context.get_config() 167 } 168 169 /// Check if it is possible to read messages. 170 /// 171 /// Reading is impossible after receiving `Message::Close`. It is still possible after 172 /// sending close frame since the peer still may send some data before confirming close. can_read(&self) -> bool173 pub fn can_read(&self) -> bool { 174 self.context.can_read() 175 } 176 177 /// Check if it is possible to write messages. 178 /// 179 /// Writing gets impossible immediately after sending or receiving `Message::Close`. can_write(&self) -> bool180 pub fn can_write(&self) -> bool { 181 self.context.can_write() 182 } 183 } 184 185 impl<Stream: Read + Write> WebSocket<Stream> { 186 /// Read a message from stream, if possible. 187 /// 188 /// This will also queue responses to ping and close messages. These responses 189 /// will be written and flushed on the next call to [`read`](Self::read), 190 /// [`write`](Self::write) or [`flush`](Self::flush). 191 /// 192 /// # Closing the connection 193 /// When the remote endpoint decides to close the connection this will return 194 /// the close message with an optional close frame. 195 /// 196 /// You should continue calling [`read`](Self::read), [`write`](Self::write) or 197 /// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`] 198 /// is returned. Once that happens it is safe to drop the underlying connection. read(&mut self) -> Result<Message>199 pub fn read(&mut self) -> Result<Message> { 200 self.context.read(&mut self.socket) 201 } 202 203 /// Writes and immediately flushes a message. 204 /// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush). send(&mut self, message: Message) -> Result<()>205 pub fn send(&mut self, message: Message) -> Result<()> { 206 self.write(message)?; 207 self.flush() 208 } 209 210 /// Write a message to the provided stream, if possible. 211 /// 212 /// A subsequent call should be made to [`flush`](Self::flush) to flush writes. 213 /// 214 /// In the event of stream write failure the message frame will be stored 215 /// in the write buffer and will try again on the next call to [`write`](Self::write) 216 /// or [`flush`](Self::flush). 217 /// 218 /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`] 219 /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned. 220 /// 221 /// This call will generally not flush. However, if there are queued automatic messages 222 /// they will be written and eagerly flushed. 223 /// 224 /// For example, upon receiving ping messages tungstenite queues pong replies automatically. 225 /// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush) 226 /// will write & flush the pong reply. This means you should not respond to ping frames manually. 227 /// 228 /// You can however send pong frames manually in order to indicate a unidirectional heartbeat 229 /// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that 230 /// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing 231 /// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the 232 /// ping will not be sent as it will be replaced by your custom pong message. 233 /// 234 /// # Errors 235 /// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned 236 /// along with the equivalent passed message frame. 237 /// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`]. 238 /// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from 239 /// [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program 240 /// error on your part. 241 /// - [`Error::Io`] is returned if the underlying connection returns an error 242 /// (consider these fatal except for WouldBlock). 243 /// - [`Error::Capacity`] if your message size is bigger than the configured max message size. write(&mut self, message: Message) -> Result<()>244 pub fn write(&mut self, message: Message) -> Result<()> { 245 self.context.write(&mut self.socket, message) 246 } 247 248 /// Flush writes. 249 /// 250 /// Ensures all messages previously passed to [`write`](Self::write) and automatic 251 /// queued pong responses are written & flushed into the underlying stream. flush(&mut self) -> Result<()>252 pub fn flush(&mut self) -> Result<()> { 253 self.context.flush(&mut self.socket) 254 } 255 256 /// Close the connection. 257 /// 258 /// This function guarantees that the close frame will be queued. 259 /// There is no need to call it again. Calling this function is 260 /// the same as calling `write(Message::Close(..))`. 261 /// 262 /// After queuing the close frame you should continue calling [`read`](Self::read) or 263 /// [`flush`](Self::flush) to drive the close handshake to completion. 264 /// 265 /// The websocket RFC defines that the underlying connection should be closed 266 /// by the server. Tungstenite takes care of this asymmetry for you. 267 /// 268 /// When the close handshake is finished (we have both sent and received 269 /// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return 270 /// [Error::ConnectionClosed] if this endpoint is the server. 271 /// 272 /// If this endpoint is a client, [Error::ConnectionClosed] will only be 273 /// returned after the server has closed the underlying connection. 274 /// 275 /// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed] 276 /// is returned from [`read`](Self::read) or [`flush`](Self::flush). close(&mut self, code: Option<CloseFrame>) -> Result<()>277 pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> { 278 self.context.close(&mut self.socket, code) 279 } 280 281 /// Old name for [`read`](Self::read). 282 #[deprecated(note = "Use `read`")] read_message(&mut self) -> Result<Message>283 pub fn read_message(&mut self) -> Result<Message> { 284 self.read() 285 } 286 287 /// Old name for [`send`](Self::send). 288 #[deprecated(note = "Use `send`")] write_message(&mut self, message: Message) -> Result<()>289 pub fn write_message(&mut self, message: Message) -> Result<()> { 290 self.send(message) 291 } 292 293 /// Old name for [`flush`](Self::flush). 294 #[deprecated(note = "Use `flush`")] write_pending(&mut self) -> Result<()>295 pub fn write_pending(&mut self) -> Result<()> { 296 self.flush() 297 } 298 } 299 300 /// A context for managing WebSocket stream. 301 #[derive(Debug)] 302 pub struct WebSocketContext { 303 /// Server or client? 304 role: Role, 305 /// encoder/decoder of frame. 306 frame: FrameCodec, 307 /// The state of processing, either "active" or "closing". 308 state: WebSocketState, 309 /// Receive: an incomplete message being processed. 310 incomplete: Option<IncompleteMessage>, 311 /// Send in addition to regular messages E.g. "pong" or "close". 312 additional_send: Option<Frame>, 313 /// True indicates there is an additional message (like a pong) 314 /// that failed to flush previously and we should try again. 315 unflushed_additional: bool, 316 /// The configuration for the websocket session. 317 config: WebSocketConfig, 318 } 319 320 impl WebSocketContext { 321 /// Create a WebSocket context that manages a post-handshake stream. 322 /// 323 /// # Panics 324 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. new(role: Role, config: Option<WebSocketConfig>) -> Self325 pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self { 326 Self::_new(role, FrameCodec::new(), config.unwrap_or_default()) 327 } 328 329 /// Create a WebSocket context that manages an post-handshake stream. 330 /// 331 /// # Panics 332 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self333 pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self { 334 Self::_new(role, FrameCodec::from_partially_read(part), config.unwrap_or_default()) 335 } 336 _new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self337 fn _new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self { 338 config.assert_valid(); 339 frame.set_max_out_buffer_len(config.max_write_buffer_size); 340 frame.set_out_buffer_write_len(config.write_buffer_size); 341 Self { 342 role, 343 frame, 344 state: WebSocketState::Active, 345 incomplete: None, 346 additional_send: None, 347 unflushed_additional: false, 348 config, 349 } 350 } 351 352 /// Change the configuration. 353 /// 354 /// # Panics 355 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`. set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig))356 pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { 357 set_func(&mut self.config); 358 self.config.assert_valid(); 359 self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size); 360 self.frame.set_out_buffer_write_len(self.config.write_buffer_size); 361 } 362 363 /// Read the configuration. get_config(&self) -> &WebSocketConfig364 pub fn get_config(&self) -> &WebSocketConfig { 365 &self.config 366 } 367 368 /// Check if it is possible to read messages. 369 /// 370 /// Reading is impossible after receiving `Message::Close`. It is still possible after 371 /// sending close frame since the peer still may send some data before confirming close. can_read(&self) -> bool372 pub fn can_read(&self) -> bool { 373 self.state.can_read() 374 } 375 376 /// Check if it is possible to write messages. 377 /// 378 /// Writing gets impossible immediately after sending or receiving `Message::Close`. can_write(&self) -> bool379 pub fn can_write(&self) -> bool { 380 self.state.is_active() 381 } 382 383 /// Read a message from the provided stream, if possible. 384 /// 385 /// This function sends pong and close responses automatically. 386 /// However, it never blocks on write. read<Stream>(&mut self, stream: &mut Stream) -> Result<Message> where Stream: Read + Write,387 pub fn read<Stream>(&mut self, stream: &mut Stream) -> Result<Message> 388 where 389 Stream: Read + Write, 390 { 391 // Do not read from already closed connections. 392 self.state.check_not_terminated()?; 393 394 loop { 395 if self.additional_send.is_some() || self.unflushed_additional { 396 // Since we may get ping or close, we need to reply to the messages even during read. 397 match self.flush(stream) { 398 Ok(_) => {} 399 Err(Error::Io(err)) if err.kind() == io::ErrorKind::WouldBlock => { 400 // If blocked continue reading, but try again later 401 self.unflushed_additional = true; 402 } 403 Err(err) => return Err(err), 404 } 405 } else if self.role == Role::Server && !self.state.can_read() { 406 self.state = WebSocketState::Terminated; 407 return Err(Error::ConnectionClosed); 408 } 409 410 // If we get here, either write blocks or we have nothing to write. 411 // Thus if read blocks, just let it return WouldBlock. 412 if let Some(message) = self.read_message_frame(stream)? { 413 trace!("Received message {}", message); 414 return Ok(message); 415 } 416 } 417 } 418 419 /// Write a message to the provided stream. 420 /// 421 /// A subsequent call should be made to [`flush`](Self::flush) to flush writes. 422 /// 423 /// In the event of stream write failure the message frame will be stored 424 /// in the write buffer and will try again on the next call to [`write`](Self::write) 425 /// or [`flush`](Self::flush). 426 /// 427 /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`] 428 /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned. write<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()> where Stream: Read + Write,429 pub fn write<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()> 430 where 431 Stream: Read + Write, 432 { 433 // When terminated, return AlreadyClosed. 434 self.state.check_not_terminated()?; 435 436 // Do not write after sending a close frame. 437 if !self.state.is_active() { 438 return Err(Error::Protocol(ProtocolError::SendAfterClosing)); 439 } 440 441 let frame = match message { 442 Message::Text(data) => Frame::message(data.into(), OpCode::Data(OpData::Text), true), 443 Message::Binary(data) => Frame::message(data, OpCode::Data(OpData::Binary), true), 444 Message::Ping(data) => Frame::ping(data), 445 Message::Pong(data) => { 446 self.set_additional(Frame::pong(data)); 447 // Note: user pongs can be user flushed so no need to flush here 448 return self._write(stream, None).map(|_| ()); 449 } 450 Message::Close(code) => return self.close(stream, code), 451 Message::Frame(f) => f, 452 }; 453 454 let should_flush = self._write(stream, Some(frame))?; 455 if should_flush { 456 self.flush(stream)?; 457 } 458 Ok(()) 459 } 460 461 /// Flush writes. 462 /// 463 /// Ensures all messages previously passed to [`write`](Self::write) and automatically 464 /// queued pong responses are written & flushed into the `stream`. 465 #[inline] flush<Stream>(&mut self, stream: &mut Stream) -> Result<()> where Stream: Read + Write,466 pub fn flush<Stream>(&mut self, stream: &mut Stream) -> Result<()> 467 where 468 Stream: Read + Write, 469 { 470 self._write(stream, None)?; 471 self.frame.write_out_buffer(stream)?; 472 stream.flush()?; 473 self.unflushed_additional = false; 474 Ok(()) 475 } 476 477 /// Writes any data in the out_buffer, `additional_send` and given `data`. 478 /// 479 /// Does **not** flush. 480 /// 481 /// Returns true if the write contents indicate we should flush immediately. _write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool> where Stream: Read + Write,482 fn _write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool> 483 where 484 Stream: Read + Write, 485 { 486 if let Some(data) = data { 487 self.buffer_frame(stream, data)?; 488 } 489 490 // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in 491 // response, unless it already received a Close frame. It SHOULD 492 // respond with Pong frame as soon as is practical. (RFC 6455) 493 let should_flush = if let Some(msg) = self.additional_send.take() { 494 trace!("Sending pong/close"); 495 match self.buffer_frame(stream, msg) { 496 Err(Error::WriteBufferFull(Message::Frame(msg))) => { 497 // if an system message would exceed the buffer put it back in 498 // `additional_send` for retry. Otherwise returning this error 499 // may not make sense to the user, e.g. calling `flush`. 500 self.set_additional(msg); 501 false 502 } 503 Err(err) => return Err(err), 504 Ok(_) => true, 505 } 506 } else { 507 self.unflushed_additional 508 }; 509 510 // If we're closing and there is nothing to send anymore, we should close the connection. 511 if self.role == Role::Server && !self.state.can_read() { 512 // The underlying TCP connection, in most normal cases, SHOULD be closed 513 // first by the server, so that it holds the TIME_WAIT state and not the 514 // client (as this would prevent it from re-opening the connection for 2 515 // maximum segment lifetimes (2MSL), while there is no corresponding 516 // server impact as a TIME_WAIT connection is immediately reopened upon 517 // a new SYN with a higher seq number). (RFC 6455) 518 self.frame.write_out_buffer(stream)?; 519 self.state = WebSocketState::Terminated; 520 Err(Error::ConnectionClosed) 521 } else { 522 Ok(should_flush) 523 } 524 } 525 526 /// Close the connection. 527 /// 528 /// This function guarantees that the close frame will be queued. 529 /// There is no need to call it again. Calling this function is 530 /// the same as calling `send(Message::Close(..))`. close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()> where Stream: Read + Write,531 pub fn close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()> 532 where 533 Stream: Read + Write, 534 { 535 if let WebSocketState::Active = self.state { 536 self.state = WebSocketState::ClosedByUs; 537 let frame = Frame::close(code); 538 self._write(stream, Some(frame))?; 539 } 540 self.flush(stream) 541 } 542 543 /// Try to decode one message frame. May return None. read_message_frame<Stream>(&mut self, stream: &mut Stream) -> Result<Option<Message>> where Stream: Read + Write,544 fn read_message_frame<Stream>(&mut self, stream: &mut Stream) -> Result<Option<Message>> 545 where 546 Stream: Read + Write, 547 { 548 if let Some(mut frame) = self 549 .frame 550 .read_frame(stream, self.config.max_frame_size) 551 .check_connection_reset(self.state)? 552 { 553 if !self.state.can_read() { 554 return Err(Error::Protocol(ProtocolError::ReceivedAfterClosing)); 555 } 556 // MUST be 0 unless an extension is negotiated that defines meanings 557 // for non-zero values. If a nonzero value is received and none of 558 // the negotiated extensions defines the meaning of such a nonzero 559 // value, the receiving endpoint MUST _Fail the WebSocket 560 // Connection_. 561 { 562 let hdr = frame.header(); 563 if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 { 564 return Err(Error::Protocol(ProtocolError::NonZeroReservedBits)); 565 } 566 } 567 568 match self.role { 569 Role::Server => { 570 if frame.is_masked() { 571 // A server MUST remove masking for data frames received from a client 572 // as described in Section 5.3. (RFC 6455) 573 frame.apply_mask() 574 } else if !self.config.accept_unmasked_frames { 575 // The server MUST close the connection upon receiving a 576 // frame that is not masked. (RFC 6455) 577 // The only exception here is if the user explicitly accepts given 578 // stream by setting WebSocketConfig.accept_unmasked_frames to true 579 return Err(Error::Protocol(ProtocolError::UnmaskedFrameFromClient)); 580 } 581 } 582 Role::Client => { 583 if frame.is_masked() { 584 // A client MUST close a connection if it detects a masked frame. (RFC 6455) 585 return Err(Error::Protocol(ProtocolError::MaskedFrameFromServer)); 586 } 587 } 588 } 589 590 match frame.header().opcode { 591 OpCode::Control(ctl) => { 592 match ctl { 593 // All control frames MUST have a payload length of 125 bytes or less 594 // and MUST NOT be fragmented. (RFC 6455) 595 _ if !frame.header().is_final => { 596 Err(Error::Protocol(ProtocolError::FragmentedControlFrame)) 597 } 598 _ if frame.payload().len() > 125 => { 599 Err(Error::Protocol(ProtocolError::ControlFrameTooBig)) 600 } 601 OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)), 602 OpCtl::Reserved(i) => { 603 Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i))) 604 } 605 OpCtl::Ping => { 606 let data = frame.into_data(); 607 // No ping processing after we sent a close frame. 608 if self.state.is_active() { 609 self.set_additional(Frame::pong(data.clone())); 610 } 611 Ok(Some(Message::Ping(data))) 612 } 613 OpCtl::Pong => Ok(Some(Message::Pong(frame.into_data()))), 614 } 615 } 616 617 OpCode::Data(data) => { 618 let fin = frame.header().is_final; 619 match data { 620 OpData::Continue => { 621 if let Some(ref mut msg) = self.incomplete { 622 msg.extend(frame.into_data(), self.config.max_message_size)?; 623 } else { 624 return Err(Error::Protocol( 625 ProtocolError::UnexpectedContinueFrame, 626 )); 627 } 628 if fin { 629 Ok(Some(self.incomplete.take().unwrap().complete()?)) 630 } else { 631 Ok(None) 632 } 633 } 634 c if self.incomplete.is_some() => { 635 Err(Error::Protocol(ProtocolError::ExpectedFragment(c))) 636 } 637 OpData::Text | OpData::Binary => { 638 let msg = { 639 let message_type = match data { 640 OpData::Text => IncompleteMessageType::Text, 641 OpData::Binary => IncompleteMessageType::Binary, 642 _ => panic!("Bug: message is not text nor binary"), 643 }; 644 let mut m = IncompleteMessage::new(message_type); 645 m.extend(frame.into_data(), self.config.max_message_size)?; 646 m 647 }; 648 if fin { 649 Ok(Some(msg.complete()?)) 650 } else { 651 self.incomplete = Some(msg); 652 Ok(None) 653 } 654 } 655 OpData::Reserved(i) => { 656 Err(Error::Protocol(ProtocolError::UnknownDataFrameType(i))) 657 } 658 } 659 } 660 } // match opcode 661 } else { 662 // Connection closed by peer 663 match replace(&mut self.state, WebSocketState::Terminated) { 664 WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => { 665 Err(Error::ConnectionClosed) 666 } 667 _ => Err(Error::Protocol(ProtocolError::ResetWithoutClosingHandshake)), 668 } 669 } 670 } 671 672 /// Received a close frame. Tells if we need to return a close frame to the user. 673 #[allow(clippy::option_option)] do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>>674 fn do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>> { 675 debug!("Received close frame: {:?}", close); 676 match self.state { 677 WebSocketState::Active => { 678 self.state = WebSocketState::ClosedByPeer; 679 680 let close = close.map(|frame| { 681 if !frame.code.is_allowed() { 682 CloseFrame { 683 code: CloseCode::Protocol, 684 reason: "Protocol violation".into(), 685 } 686 } else { 687 frame 688 } 689 }); 690 691 let reply = Frame::close(close.clone()); 692 debug!("Replying to close with {:?}", reply); 693 self.set_additional(reply); 694 695 Some(close) 696 } 697 WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => { 698 // It is already closed, just ignore. 699 None 700 } 701 WebSocketState::ClosedByUs => { 702 // We received a reply. 703 self.state = WebSocketState::CloseAcknowledged; 704 Some(close) 705 } 706 WebSocketState::Terminated => unreachable!(), 707 } 708 } 709 710 /// Write a single frame into the write-buffer. buffer_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> where Stream: Read + Write,711 fn buffer_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> 712 where 713 Stream: Read + Write, 714 { 715 match self.role { 716 Role::Server => {} 717 Role::Client => { 718 // 5. If the data is being sent by the client, the frame(s) MUST be 719 // masked as defined in Section 5.3. (RFC 6455) 720 frame.set_random_mask(); 721 } 722 } 723 724 trace!("Sending frame: {:?}", frame); 725 self.frame.buffer_frame(stream, frame).check_connection_reset(self.state) 726 } 727 728 /// Replace `additional_send` if it is currently a `Pong` message. set_additional(&mut self, add: Frame)729 fn set_additional(&mut self, add: Frame) { 730 let empty_or_pong = self 731 .additional_send 732 .as_ref() 733 .map_or(true, |f| f.header().opcode == OpCode::Control(OpCtl::Pong)); 734 if empty_or_pong { 735 self.additional_send.replace(add); 736 } 737 } 738 } 739 740 /// The current connection state. 741 #[derive(Debug, PartialEq, Eq, Clone, Copy)] 742 enum WebSocketState { 743 /// The connection is active. 744 Active, 745 /// We initiated a close handshake. 746 ClosedByUs, 747 /// The peer initiated a close handshake. 748 ClosedByPeer, 749 /// The peer replied to our close handshake. 750 CloseAcknowledged, 751 /// The connection does not exist anymore. 752 Terminated, 753 } 754 755 impl WebSocketState { 756 /// Tell if we're allowed to process normal messages. is_active(self) -> bool757 fn is_active(self) -> bool { 758 matches!(self, WebSocketState::Active) 759 } 760 761 /// Tell if we should process incoming data. Note that if we send a close frame 762 /// but the remote hasn't confirmed, they might have sent data before they receive our 763 /// close frame, so we should still pass those to client code, hence ClosedByUs is valid. can_read(self) -> bool764 fn can_read(self) -> bool { 765 matches!(self, WebSocketState::Active | WebSocketState::ClosedByUs) 766 } 767 768 /// Check if the state is active, return error if not. check_not_terminated(self) -> Result<()>769 fn check_not_terminated(self) -> Result<()> { 770 match self { 771 WebSocketState::Terminated => Err(Error::AlreadyClosed), 772 _ => Ok(()), 773 } 774 } 775 } 776 777 /// Translate "Connection reset by peer" into `ConnectionClosed` if appropriate. 778 trait CheckConnectionReset { check_connection_reset(self, state: WebSocketState) -> Self779 fn check_connection_reset(self, state: WebSocketState) -> Self; 780 } 781 782 impl<T> CheckConnectionReset for Result<T> { check_connection_reset(self, state: WebSocketState) -> Self783 fn check_connection_reset(self, state: WebSocketState) -> Self { 784 match self { 785 Err(Error::Io(io_error)) => Err({ 786 if !state.can_read() && io_error.kind() == io::ErrorKind::ConnectionReset { 787 Error::ConnectionClosed 788 } else { 789 Error::Io(io_error) 790 } 791 }), 792 x => x, 793 } 794 } 795 } 796 797 #[cfg(test)] 798 mod tests { 799 use super::{Message, Role, WebSocket, WebSocketConfig}; 800 use crate::error::{CapacityError, Error}; 801 802 use std::{io, io::Cursor}; 803 804 struct WriteMoc<Stream>(Stream); 805 806 impl<Stream> io::Write for WriteMoc<Stream> { write(&mut self, buf: &[u8]) -> io::Result<usize>807 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 808 Ok(buf.len()) 809 } flush(&mut self) -> io::Result<()>810 fn flush(&mut self) -> io::Result<()> { 811 Ok(()) 812 } 813 } 814 815 impl<Stream: io::Read> io::Read for WriteMoc<Stream> { read(&mut self, buf: &mut [u8]) -> io::Result<usize>816 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 817 self.0.read(buf) 818 } 819 } 820 821 #[test] receive_messages()822 fn receive_messages() { 823 let incoming = Cursor::new(vec![ 824 0x89, 0x02, 0x01, 0x02, 0x8a, 0x01, 0x03, 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 825 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21, 0x82, 0x03, 0x01, 0x02, 826 0x03, 827 ]); 828 let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None); 829 assert_eq!(socket.read().unwrap(), Message::Ping(vec![1, 2])); 830 assert_eq!(socket.read().unwrap(), Message::Pong(vec![3])); 831 assert_eq!(socket.read().unwrap(), Message::Text("Hello, World!".into())); 832 assert_eq!(socket.read().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03])); 833 } 834 835 #[test] size_limiting_text_fragmented()836 fn size_limiting_text_fragmented() { 837 let incoming = Cursor::new(vec![ 838 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72, 839 0x6c, 0x64, 0x21, 840 ]); 841 let limit = WebSocketConfig { max_message_size: Some(10), ..WebSocketConfig::default() }; 842 let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); 843 844 assert!(matches!( 845 socket.read(), 846 Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 })) 847 )); 848 } 849 850 #[test] size_limiting_binary()851 fn size_limiting_binary() { 852 let incoming = Cursor::new(vec![0x82, 0x03, 0x01, 0x02, 0x03]); 853 let limit = WebSocketConfig { max_message_size: Some(2), ..WebSocketConfig::default() }; 854 let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); 855 856 assert!(matches!( 857 socket.read(), 858 Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 })) 859 )); 860 } 861 } 862