1 //! Generic WebSocket message stream.
2 
3 pub mod frame;
4 
5 mod message;
6 
7 pub use self::{frame::CloseFrame, message::Message};
8 
9 use self::{
10     frame::{
11         coding::{CloseCode, Control as OpCtl, Data as OpData, OpCode},
12         Frame, FrameCodec,
13     },
14     message::{IncompleteMessage, IncompleteMessageType},
15 };
16 use crate::error::{Error, ProtocolError, Result};
17 use log::*;
18 use std::{
19     io::{self, Read, Write},
20     mem::replace,
21 };
22 
23 /// Indicates a Client or Server role of the websocket
24 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
25 pub enum Role {
26     /// This socket is a server
27     Server,
28     /// This socket is a client
29     Client,
30 }
31 
32 /// The configuration for WebSocket connection.
33 #[derive(Debug, Clone, Copy)]
34 pub struct WebSocketConfig {
35     /// Does nothing, instead use `max_write_buffer_size`.
36     #[deprecated]
37     pub max_send_queue: Option<usize>,
38     /// The target minimum size of the write buffer to reach before writing the data
39     /// to the underlying stream.
40     /// The default value is 128 KiB.
41     ///
42     /// If set to `0` each message will be eagerly written to the underlying stream.
43     /// It is often more optimal to allow them to buffer a little, hence the default value.
44     ///
45     /// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless.
46     pub write_buffer_size: usize,
47     /// The max size of the write buffer in bytes. Setting this can provide backpressure
48     /// in the case the write buffer is filling up due to write errors.
49     /// The default value is unlimited.
50     ///
51     /// Note: The write buffer only builds up past [`write_buffer_size`](Self::write_buffer_size)
52     /// when writes to the underlying stream are failing. So the **write buffer can not
53     /// fill up if you are not observing write errors even if not flushing**.
54     ///
55     /// Note: Should always be at least [`write_buffer_size + 1 message`](Self::write_buffer_size)
56     /// and probably a little more depending on error handling strategy.
57     pub max_write_buffer_size: usize,
58     /// The maximum size of an incoming message. `None` means no size limit. The default value is 64 MiB
59     /// which should be reasonably big for all normal use-cases but small enough to prevent
60     /// memory eating by a malicious user.
61     pub max_message_size: Option<usize>,
62     /// The maximum size of a single incoming message frame. `None` means no size limit. The limit is for
63     /// frame payload NOT including the frame header. The default value is 16 MiB which should
64     /// be reasonably big for all normal use-cases but small enough to prevent memory eating
65     /// by a malicious user.
66     pub max_frame_size: Option<usize>,
67     /// When set to `true`, the server will accept and handle unmasked frames
68     /// from the client. According to the RFC 6455, the server must close the
69     /// connection to the client in such cases, however it seems like there are
70     /// some popular libraries that are sending unmasked frames, ignoring the RFC.
71     /// By default this option is set to `false`, i.e. according to RFC 6455.
72     pub accept_unmasked_frames: bool,
73 }
74 
75 impl Default for WebSocketConfig {
default() -> Self76     fn default() -> Self {
77         #[allow(deprecated)]
78         WebSocketConfig {
79             max_send_queue: None,
80             write_buffer_size: 128 * 1024,
81             max_write_buffer_size: usize::MAX,
82             max_message_size: Some(64 << 20),
83             max_frame_size: Some(16 << 20),
84             accept_unmasked_frames: false,
85         }
86     }
87 }
88 
89 impl WebSocketConfig {
90     /// Panic if values are invalid.
assert_valid(&self)91     pub(crate) fn assert_valid(&self) {
92         assert!(
93             self.max_write_buffer_size > self.write_buffer_size,
94             "WebSocketConfig::max_write_buffer_size must be greater than write_buffer_size, \
95             see WebSocketConfig docs`"
96         );
97     }
98 }
99 
100 /// WebSocket input-output stream.
101 ///
102 /// This is THE structure you want to create to be able to speak the WebSocket protocol.
103 /// It may be created by calling `connect`, `accept` or `client` functions.
104 ///
105 /// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages.
106 #[derive(Debug)]
107 pub struct WebSocket<Stream> {
108     /// The underlying socket.
109     socket: Stream,
110     /// The context for managing a WebSocket.
111     context: WebSocketContext,
112 }
113 
114 impl<Stream> WebSocket<Stream> {
115     /// Convert a raw socket into a WebSocket without performing a handshake.
116     ///
117     /// Call this function if you're using Tungstenite as a part of a web framework
118     /// or together with an existing one. If you need an initial handshake, use
119     /// `connect()` or `accept()` functions of the crate to construct a websocket.
120     ///
121     /// # Panics
122     /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self123     pub fn from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self {
124         WebSocket { socket: stream, context: WebSocketContext::new(role, config) }
125     }
126 
127     /// Convert a raw socket into a WebSocket without performing a handshake.
128     ///
129     /// Call this function if you're using Tungstenite as a part of a web framework
130     /// or together with an existing one. If you need an initial handshake, use
131     /// `connect()` or `accept()` functions of the crate to construct a websocket.
132     ///
133     /// # Panics
134     /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
from_partially_read( stream: Stream, part: Vec<u8>, role: Role, config: Option<WebSocketConfig>, ) -> Self135     pub fn from_partially_read(
136         stream: Stream,
137         part: Vec<u8>,
138         role: Role,
139         config: Option<WebSocketConfig>,
140     ) -> Self {
141         WebSocket {
142             socket: stream,
143             context: WebSocketContext::from_partially_read(part, role, config),
144         }
145     }
146 
147     /// Returns a shared reference to the inner stream.
get_ref(&self) -> &Stream148     pub fn get_ref(&self) -> &Stream {
149         &self.socket
150     }
151     /// Returns a mutable reference to the inner stream.
get_mut(&mut self) -> &mut Stream152     pub fn get_mut(&mut self) -> &mut Stream {
153         &mut self.socket
154     }
155 
156     /// Change the configuration.
157     ///
158     /// # Panics
159     /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig))160     pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
161         self.context.set_config(set_func)
162     }
163 
164     /// Read the configuration.
get_config(&self) -> &WebSocketConfig165     pub fn get_config(&self) -> &WebSocketConfig {
166         self.context.get_config()
167     }
168 
169     /// Check if it is possible to read messages.
170     ///
171     /// Reading is impossible after receiving `Message::Close`. It is still possible after
172     /// sending close frame since the peer still may send some data before confirming close.
can_read(&self) -> bool173     pub fn can_read(&self) -> bool {
174         self.context.can_read()
175     }
176 
177     /// Check if it is possible to write messages.
178     ///
179     /// Writing gets impossible immediately after sending or receiving `Message::Close`.
can_write(&self) -> bool180     pub fn can_write(&self) -> bool {
181         self.context.can_write()
182     }
183 }
184 
185 impl<Stream: Read + Write> WebSocket<Stream> {
186     /// Read a message from stream, if possible.
187     ///
188     /// This will also queue responses to ping and close messages. These responses
189     /// will be written and flushed on the next call to [`read`](Self::read),
190     /// [`write`](Self::write) or [`flush`](Self::flush).
191     ///
192     /// # Closing the connection
193     /// When the remote endpoint decides to close the connection this will return
194     /// the close message with an optional close frame.
195     ///
196     /// You should continue calling [`read`](Self::read), [`write`](Self::write) or
197     /// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`]
198     /// is returned. Once that happens it is safe to drop the underlying connection.
read(&mut self) -> Result<Message>199     pub fn read(&mut self) -> Result<Message> {
200         self.context.read(&mut self.socket)
201     }
202 
203     /// Writes and immediately flushes a message.
204     /// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush).
send(&mut self, message: Message) -> Result<()>205     pub fn send(&mut self, message: Message) -> Result<()> {
206         self.write(message)?;
207         self.flush()
208     }
209 
210     /// Write a message to the provided stream, if possible.
211     ///
212     /// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
213     ///
214     /// In the event of stream write failure the message frame will be stored
215     /// in the write buffer and will try again on the next call to [`write`](Self::write)
216     /// or [`flush`](Self::flush).
217     ///
218     /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
219     /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
220     ///
221     /// This call will generally not flush. However, if there are queued automatic messages
222     /// they will be written and eagerly flushed.
223     ///
224     /// For example, upon receiving ping messages tungstenite queues pong replies automatically.
225     /// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush)
226     /// will write & flush the pong reply. This means you should not respond to ping frames manually.
227     ///
228     /// You can however send pong frames manually in order to indicate a unidirectional heartbeat
229     /// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
230     /// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing
231     /// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the
232     /// ping will not be sent as it will be replaced by your custom pong message.
233     ///
234     /// # Errors
235     /// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned
236     ///   along with the equivalent passed message frame.
237     /// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`].
238     /// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from
239     ///   [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program
240     ///   error on your part.
241     /// - [`Error::Io`] is returned if the underlying connection returns an error
242     ///   (consider these fatal except for WouldBlock).
243     /// - [`Error::Capacity`] if your message size is bigger than the configured max message size.
write(&mut self, message: Message) -> Result<()>244     pub fn write(&mut self, message: Message) -> Result<()> {
245         self.context.write(&mut self.socket, message)
246     }
247 
248     /// Flush writes.
249     ///
250     /// Ensures all messages previously passed to [`write`](Self::write) and automatic
251     /// queued pong responses are written & flushed into the underlying stream.
flush(&mut self) -> Result<()>252     pub fn flush(&mut self) -> Result<()> {
253         self.context.flush(&mut self.socket)
254     }
255 
256     /// Close the connection.
257     ///
258     /// This function guarantees that the close frame will be queued.
259     /// There is no need to call it again. Calling this function is
260     /// the same as calling `write(Message::Close(..))`.
261     ///
262     /// After queuing the close frame you should continue calling [`read`](Self::read) or
263     /// [`flush`](Self::flush) to drive the close handshake to completion.
264     ///
265     /// The websocket RFC defines that the underlying connection should be closed
266     /// by the server. Tungstenite takes care of this asymmetry for you.
267     ///
268     /// When the close handshake is finished (we have both sent and received
269     /// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return
270     /// [Error::ConnectionClosed] if this endpoint is the server.
271     ///
272     /// If this endpoint is a client, [Error::ConnectionClosed] will only be
273     /// returned after the server has closed the underlying connection.
274     ///
275     /// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed]
276     /// is returned from [`read`](Self::read) or [`flush`](Self::flush).
close(&mut self, code: Option<CloseFrame>) -> Result<()>277     pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
278         self.context.close(&mut self.socket, code)
279     }
280 
281     /// Old name for [`read`](Self::read).
282     #[deprecated(note = "Use `read`")]
read_message(&mut self) -> Result<Message>283     pub fn read_message(&mut self) -> Result<Message> {
284         self.read()
285     }
286 
287     /// Old name for [`send`](Self::send).
288     #[deprecated(note = "Use `send`")]
write_message(&mut self, message: Message) -> Result<()>289     pub fn write_message(&mut self, message: Message) -> Result<()> {
290         self.send(message)
291     }
292 
293     /// Old name for [`flush`](Self::flush).
294     #[deprecated(note = "Use `flush`")]
write_pending(&mut self) -> Result<()>295     pub fn write_pending(&mut self) -> Result<()> {
296         self.flush()
297     }
298 }
299 
300 /// A context for managing WebSocket stream.
301 #[derive(Debug)]
302 pub struct WebSocketContext {
303     /// Server or client?
304     role: Role,
305     /// encoder/decoder of frame.
306     frame: FrameCodec,
307     /// The state of processing, either "active" or "closing".
308     state: WebSocketState,
309     /// Receive: an incomplete message being processed.
310     incomplete: Option<IncompleteMessage>,
311     /// Send in addition to regular messages E.g. "pong" or "close".
312     additional_send: Option<Frame>,
313     /// True indicates there is an additional message (like a pong)
314     /// that failed to flush previously and we should try again.
315     unflushed_additional: bool,
316     /// The configuration for the websocket session.
317     config: WebSocketConfig,
318 }
319 
320 impl WebSocketContext {
321     /// Create a WebSocket context that manages a post-handshake stream.
322     ///
323     /// # Panics
324     /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
new(role: Role, config: Option<WebSocketConfig>) -> Self325     pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self {
326         Self::_new(role, FrameCodec::new(), config.unwrap_or_default())
327     }
328 
329     /// Create a WebSocket context that manages an post-handshake stream.
330     ///
331     /// # Panics
332     /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self333     pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self {
334         Self::_new(role, FrameCodec::from_partially_read(part), config.unwrap_or_default())
335     }
336 
_new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self337     fn _new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self {
338         config.assert_valid();
339         frame.set_max_out_buffer_len(config.max_write_buffer_size);
340         frame.set_out_buffer_write_len(config.write_buffer_size);
341         Self {
342             role,
343             frame,
344             state: WebSocketState::Active,
345             incomplete: None,
346             additional_send: None,
347             unflushed_additional: false,
348             config,
349         }
350     }
351 
352     /// Change the configuration.
353     ///
354     /// # Panics
355     /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig))356     pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
357         set_func(&mut self.config);
358         self.config.assert_valid();
359         self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size);
360         self.frame.set_out_buffer_write_len(self.config.write_buffer_size);
361     }
362 
363     /// Read the configuration.
get_config(&self) -> &WebSocketConfig364     pub fn get_config(&self) -> &WebSocketConfig {
365         &self.config
366     }
367 
368     /// Check if it is possible to read messages.
369     ///
370     /// Reading is impossible after receiving `Message::Close`. It is still possible after
371     /// sending close frame since the peer still may send some data before confirming close.
can_read(&self) -> bool372     pub fn can_read(&self) -> bool {
373         self.state.can_read()
374     }
375 
376     /// Check if it is possible to write messages.
377     ///
378     /// Writing gets impossible immediately after sending or receiving `Message::Close`.
can_write(&self) -> bool379     pub fn can_write(&self) -> bool {
380         self.state.is_active()
381     }
382 
383     /// Read a message from the provided stream, if possible.
384     ///
385     /// This function sends pong and close responses automatically.
386     /// However, it never blocks on write.
read<Stream>(&mut self, stream: &mut Stream) -> Result<Message> where Stream: Read + Write,387     pub fn read<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
388     where
389         Stream: Read + Write,
390     {
391         // Do not read from already closed connections.
392         self.state.check_not_terminated()?;
393 
394         loop {
395             if self.additional_send.is_some() || self.unflushed_additional {
396                 // Since we may get ping or close, we need to reply to the messages even during read.
397                 match self.flush(stream) {
398                     Ok(_) => {}
399                     Err(Error::Io(err)) if err.kind() == io::ErrorKind::WouldBlock => {
400                         // If blocked continue reading, but try again later
401                         self.unflushed_additional = true;
402                     }
403                     Err(err) => return Err(err),
404                 }
405             } else if self.role == Role::Server && !self.state.can_read() {
406                 self.state = WebSocketState::Terminated;
407                 return Err(Error::ConnectionClosed);
408             }
409 
410             // If we get here, either write blocks or we have nothing to write.
411             // Thus if read blocks, just let it return WouldBlock.
412             if let Some(message) = self.read_message_frame(stream)? {
413                 trace!("Received message {}", message);
414                 return Ok(message);
415             }
416         }
417     }
418 
419     /// Write a message to the provided stream.
420     ///
421     /// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
422     ///
423     /// In the event of stream write failure the message frame will be stored
424     /// in the write buffer and will try again on the next call to [`write`](Self::write)
425     /// or [`flush`](Self::flush).
426     ///
427     /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
428     /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
write<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()> where Stream: Read + Write,429     pub fn write<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()>
430     where
431         Stream: Read + Write,
432     {
433         // When terminated, return AlreadyClosed.
434         self.state.check_not_terminated()?;
435 
436         // Do not write after sending a close frame.
437         if !self.state.is_active() {
438             return Err(Error::Protocol(ProtocolError::SendAfterClosing));
439         }
440 
441         let frame = match message {
442             Message::Text(data) => Frame::message(data.into(), OpCode::Data(OpData::Text), true),
443             Message::Binary(data) => Frame::message(data, OpCode::Data(OpData::Binary), true),
444             Message::Ping(data) => Frame::ping(data),
445             Message::Pong(data) => {
446                 self.set_additional(Frame::pong(data));
447                 // Note: user pongs can be user flushed so no need to flush here
448                 return self._write(stream, None).map(|_| ());
449             }
450             Message::Close(code) => return self.close(stream, code),
451             Message::Frame(f) => f,
452         };
453 
454         let should_flush = self._write(stream, Some(frame))?;
455         if should_flush {
456             self.flush(stream)?;
457         }
458         Ok(())
459     }
460 
461     /// Flush writes.
462     ///
463     /// Ensures all messages previously passed to [`write`](Self::write) and automatically
464     /// queued pong responses are written & flushed into the `stream`.
465     #[inline]
flush<Stream>(&mut self, stream: &mut Stream) -> Result<()> where Stream: Read + Write,466     pub fn flush<Stream>(&mut self, stream: &mut Stream) -> Result<()>
467     where
468         Stream: Read + Write,
469     {
470         self._write(stream, None)?;
471         self.frame.write_out_buffer(stream)?;
472         stream.flush()?;
473         self.unflushed_additional = false;
474         Ok(())
475     }
476 
477     /// Writes any data in the out_buffer, `additional_send` and given `data`.
478     ///
479     /// Does **not** flush.
480     ///
481     /// Returns true if the write contents indicate we should flush immediately.
_write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool> where Stream: Read + Write,482     fn _write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool>
483     where
484         Stream: Read + Write,
485     {
486         if let Some(data) = data {
487             self.buffer_frame(stream, data)?;
488         }
489 
490         // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
491         // response, unless it already received a Close frame. It SHOULD
492         // respond with Pong frame as soon as is practical. (RFC 6455)
493         let should_flush = if let Some(msg) = self.additional_send.take() {
494             trace!("Sending pong/close");
495             match self.buffer_frame(stream, msg) {
496                 Err(Error::WriteBufferFull(Message::Frame(msg))) => {
497                     // if an system message would exceed the buffer put it back in
498                     // `additional_send` for retry. Otherwise returning this error
499                     // may not make sense to the user, e.g. calling `flush`.
500                     self.set_additional(msg);
501                     false
502                 }
503                 Err(err) => return Err(err),
504                 Ok(_) => true,
505             }
506         } else {
507             self.unflushed_additional
508         };
509 
510         // If we're closing and there is nothing to send anymore, we should close the connection.
511         if self.role == Role::Server && !self.state.can_read() {
512             // The underlying TCP connection, in most normal cases, SHOULD be closed
513             // first by the server, so that it holds the TIME_WAIT state and not the
514             // client (as this would prevent it from re-opening the connection for 2
515             // maximum segment lifetimes (2MSL), while there is no corresponding
516             // server impact as a TIME_WAIT connection is immediately reopened upon
517             // a new SYN with a higher seq number). (RFC 6455)
518             self.frame.write_out_buffer(stream)?;
519             self.state = WebSocketState::Terminated;
520             Err(Error::ConnectionClosed)
521         } else {
522             Ok(should_flush)
523         }
524     }
525 
526     /// Close the connection.
527     ///
528     /// This function guarantees that the close frame will be queued.
529     /// There is no need to call it again. Calling this function is
530     /// the same as calling `send(Message::Close(..))`.
close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()> where Stream: Read + Write,531     pub fn close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()>
532     where
533         Stream: Read + Write,
534     {
535         if let WebSocketState::Active = self.state {
536             self.state = WebSocketState::ClosedByUs;
537             let frame = Frame::close(code);
538             self._write(stream, Some(frame))?;
539         }
540         self.flush(stream)
541     }
542 
543     /// Try to decode one message frame. May return None.
read_message_frame<Stream>(&mut self, stream: &mut Stream) -> Result<Option<Message>> where Stream: Read + Write,544     fn read_message_frame<Stream>(&mut self, stream: &mut Stream) -> Result<Option<Message>>
545     where
546         Stream: Read + Write,
547     {
548         if let Some(mut frame) = self
549             .frame
550             .read_frame(stream, self.config.max_frame_size)
551             .check_connection_reset(self.state)?
552         {
553             if !self.state.can_read() {
554                 return Err(Error::Protocol(ProtocolError::ReceivedAfterClosing));
555             }
556             // MUST be 0 unless an extension is negotiated that defines meanings
557             // for non-zero values.  If a nonzero value is received and none of
558             // the negotiated extensions defines the meaning of such a nonzero
559             // value, the receiving endpoint MUST _Fail the WebSocket
560             // Connection_.
561             {
562                 let hdr = frame.header();
563                 if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 {
564                     return Err(Error::Protocol(ProtocolError::NonZeroReservedBits));
565                 }
566             }
567 
568             match self.role {
569                 Role::Server => {
570                     if frame.is_masked() {
571                         // A server MUST remove masking for data frames received from a client
572                         // as described in Section 5.3. (RFC 6455)
573                         frame.apply_mask()
574                     } else if !self.config.accept_unmasked_frames {
575                         // The server MUST close the connection upon receiving a
576                         // frame that is not masked. (RFC 6455)
577                         // The only exception here is if the user explicitly accepts given
578                         // stream by setting WebSocketConfig.accept_unmasked_frames to true
579                         return Err(Error::Protocol(ProtocolError::UnmaskedFrameFromClient));
580                     }
581                 }
582                 Role::Client => {
583                     if frame.is_masked() {
584                         // A client MUST close a connection if it detects a masked frame. (RFC 6455)
585                         return Err(Error::Protocol(ProtocolError::MaskedFrameFromServer));
586                     }
587                 }
588             }
589 
590             match frame.header().opcode {
591                 OpCode::Control(ctl) => {
592                     match ctl {
593                         // All control frames MUST have a payload length of 125 bytes or less
594                         // and MUST NOT be fragmented. (RFC 6455)
595                         _ if !frame.header().is_final => {
596                             Err(Error::Protocol(ProtocolError::FragmentedControlFrame))
597                         }
598                         _ if frame.payload().len() > 125 => {
599                             Err(Error::Protocol(ProtocolError::ControlFrameTooBig))
600                         }
601                         OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)),
602                         OpCtl::Reserved(i) => {
603                             Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i)))
604                         }
605                         OpCtl::Ping => {
606                             let data = frame.into_data();
607                             // No ping processing after we sent a close frame.
608                             if self.state.is_active() {
609                                 self.set_additional(Frame::pong(data.clone()));
610                             }
611                             Ok(Some(Message::Ping(data)))
612                         }
613                         OpCtl::Pong => Ok(Some(Message::Pong(frame.into_data()))),
614                     }
615                 }
616 
617                 OpCode::Data(data) => {
618                     let fin = frame.header().is_final;
619                     match data {
620                         OpData::Continue => {
621                             if let Some(ref mut msg) = self.incomplete {
622                                 msg.extend(frame.into_data(), self.config.max_message_size)?;
623                             } else {
624                                 return Err(Error::Protocol(
625                                     ProtocolError::UnexpectedContinueFrame,
626                                 ));
627                             }
628                             if fin {
629                                 Ok(Some(self.incomplete.take().unwrap().complete()?))
630                             } else {
631                                 Ok(None)
632                             }
633                         }
634                         c if self.incomplete.is_some() => {
635                             Err(Error::Protocol(ProtocolError::ExpectedFragment(c)))
636                         }
637                         OpData::Text | OpData::Binary => {
638                             let msg = {
639                                 let message_type = match data {
640                                     OpData::Text => IncompleteMessageType::Text,
641                                     OpData::Binary => IncompleteMessageType::Binary,
642                                     _ => panic!("Bug: message is not text nor binary"),
643                                 };
644                                 let mut m = IncompleteMessage::new(message_type);
645                                 m.extend(frame.into_data(), self.config.max_message_size)?;
646                                 m
647                             };
648                             if fin {
649                                 Ok(Some(msg.complete()?))
650                             } else {
651                                 self.incomplete = Some(msg);
652                                 Ok(None)
653                             }
654                         }
655                         OpData::Reserved(i) => {
656                             Err(Error::Protocol(ProtocolError::UnknownDataFrameType(i)))
657                         }
658                     }
659                 }
660             } // match opcode
661         } else {
662             // Connection closed by peer
663             match replace(&mut self.state, WebSocketState::Terminated) {
664                 WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
665                     Err(Error::ConnectionClosed)
666                 }
667                 _ => Err(Error::Protocol(ProtocolError::ResetWithoutClosingHandshake)),
668             }
669         }
670     }
671 
672     /// Received a close frame. Tells if we need to return a close frame to the user.
673     #[allow(clippy::option_option)]
do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>>674     fn do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>> {
675         debug!("Received close frame: {:?}", close);
676         match self.state {
677             WebSocketState::Active => {
678                 self.state = WebSocketState::ClosedByPeer;
679 
680                 let close = close.map(|frame| {
681                     if !frame.code.is_allowed() {
682                         CloseFrame {
683                             code: CloseCode::Protocol,
684                             reason: "Protocol violation".into(),
685                         }
686                     } else {
687                         frame
688                     }
689                 });
690 
691                 let reply = Frame::close(close.clone());
692                 debug!("Replying to close with {:?}", reply);
693                 self.set_additional(reply);
694 
695                 Some(close)
696             }
697             WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
698                 // It is already closed, just ignore.
699                 None
700             }
701             WebSocketState::ClosedByUs => {
702                 // We received a reply.
703                 self.state = WebSocketState::CloseAcknowledged;
704                 Some(close)
705             }
706             WebSocketState::Terminated => unreachable!(),
707         }
708     }
709 
710     /// Write a single frame into the write-buffer.
buffer_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> where Stream: Read + Write,711     fn buffer_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()>
712     where
713         Stream: Read + Write,
714     {
715         match self.role {
716             Role::Server => {}
717             Role::Client => {
718                 // 5.  If the data is being sent by the client, the frame(s) MUST be
719                 // masked as defined in Section 5.3. (RFC 6455)
720                 frame.set_random_mask();
721             }
722         }
723 
724         trace!("Sending frame: {:?}", frame);
725         self.frame.buffer_frame(stream, frame).check_connection_reset(self.state)
726     }
727 
728     /// Replace `additional_send` if it is currently a `Pong` message.
set_additional(&mut self, add: Frame)729     fn set_additional(&mut self, add: Frame) {
730         let empty_or_pong = self
731             .additional_send
732             .as_ref()
733             .map_or(true, |f| f.header().opcode == OpCode::Control(OpCtl::Pong));
734         if empty_or_pong {
735             self.additional_send.replace(add);
736         }
737     }
738 }
739 
740 /// The current connection state.
741 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
742 enum WebSocketState {
743     /// The connection is active.
744     Active,
745     /// We initiated a close handshake.
746     ClosedByUs,
747     /// The peer initiated a close handshake.
748     ClosedByPeer,
749     /// The peer replied to our close handshake.
750     CloseAcknowledged,
751     /// The connection does not exist anymore.
752     Terminated,
753 }
754 
755 impl WebSocketState {
756     /// Tell if we're allowed to process normal messages.
is_active(self) -> bool757     fn is_active(self) -> bool {
758         matches!(self, WebSocketState::Active)
759     }
760 
761     /// Tell if we should process incoming data. Note that if we send a close frame
762     /// but the remote hasn't confirmed, they might have sent data before they receive our
763     /// close frame, so we should still pass those to client code, hence ClosedByUs is valid.
can_read(self) -> bool764     fn can_read(self) -> bool {
765         matches!(self, WebSocketState::Active | WebSocketState::ClosedByUs)
766     }
767 
768     /// Check if the state is active, return error if not.
check_not_terminated(self) -> Result<()>769     fn check_not_terminated(self) -> Result<()> {
770         match self {
771             WebSocketState::Terminated => Err(Error::AlreadyClosed),
772             _ => Ok(()),
773         }
774     }
775 }
776 
777 /// Translate "Connection reset by peer" into `ConnectionClosed` if appropriate.
778 trait CheckConnectionReset {
check_connection_reset(self, state: WebSocketState) -> Self779     fn check_connection_reset(self, state: WebSocketState) -> Self;
780 }
781 
782 impl<T> CheckConnectionReset for Result<T> {
check_connection_reset(self, state: WebSocketState) -> Self783     fn check_connection_reset(self, state: WebSocketState) -> Self {
784         match self {
785             Err(Error::Io(io_error)) => Err({
786                 if !state.can_read() && io_error.kind() == io::ErrorKind::ConnectionReset {
787                     Error::ConnectionClosed
788                 } else {
789                     Error::Io(io_error)
790                 }
791             }),
792             x => x,
793         }
794     }
795 }
796 
797 #[cfg(test)]
798 mod tests {
799     use super::{Message, Role, WebSocket, WebSocketConfig};
800     use crate::error::{CapacityError, Error};
801 
802     use std::{io, io::Cursor};
803 
804     struct WriteMoc<Stream>(Stream);
805 
806     impl<Stream> io::Write for WriteMoc<Stream> {
write(&mut self, buf: &[u8]) -> io::Result<usize>807         fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
808             Ok(buf.len())
809         }
flush(&mut self) -> io::Result<()>810         fn flush(&mut self) -> io::Result<()> {
811             Ok(())
812         }
813     }
814 
815     impl<Stream: io::Read> io::Read for WriteMoc<Stream> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>816         fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
817             self.0.read(buf)
818         }
819     }
820 
821     #[test]
receive_messages()822     fn receive_messages() {
823         let incoming = Cursor::new(vec![
824             0x89, 0x02, 0x01, 0x02, 0x8a, 0x01, 0x03, 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f,
825             0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21, 0x82, 0x03, 0x01, 0x02,
826             0x03,
827         ]);
828         let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None);
829         assert_eq!(socket.read().unwrap(), Message::Ping(vec![1, 2]));
830         assert_eq!(socket.read().unwrap(), Message::Pong(vec![3]));
831         assert_eq!(socket.read().unwrap(), Message::Text("Hello, World!".into()));
832         assert_eq!(socket.read().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03]));
833     }
834 
835     #[test]
size_limiting_text_fragmented()836     fn size_limiting_text_fragmented() {
837         let incoming = Cursor::new(vec![
838             0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72,
839             0x6c, 0x64, 0x21,
840         ]);
841         let limit = WebSocketConfig { max_message_size: Some(10), ..WebSocketConfig::default() };
842         let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
843 
844         assert!(matches!(
845             socket.read(),
846             Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 }))
847         ));
848     }
849 
850     #[test]
size_limiting_binary()851     fn size_limiting_binary() {
852         let incoming = Cursor::new(vec![0x82, 0x03, 0x01, 0x02, 0x03]);
853         let limit = WebSocketConfig { max_message_size: Some(2), ..WebSocketConfig::default() };
854         let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
855 
856         assert!(matches!(
857             socket.read(),
858             Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 }))
859         ));
860     }
861 }
862