1 use crate::codec::decoder::Decoder;
2 use crate::codec::encoder::Encoder;
3 
4 use bytes::{Buf, BufMut, BytesMut};
5 use std::{cmp, fmt, io, str};
6 
7 /// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into lines.
8 ///
9 /// This uses the `\n` character as the line ending on all platforms.
10 ///
11 /// [`Decoder`]: crate::codec::Decoder
12 /// [`Encoder`]: crate::codec::Encoder
13 #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
14 pub struct LinesCodec {
15     // Stored index of the next index to examine for a `\n` character.
16     // This is used to optimize searching.
17     // For example, if `decode` was called with `abc`, it would hold `3`,
18     // because that is the next index to examine.
19     // The next time `decode` is called with `abcde\n`, the method will
20     // only look at `de\n` before returning.
21     next_index: usize,
22 
23     /// The maximum length for a given line. If `usize::MAX`, lines will be
24     /// read until a `\n` character is reached.
25     max_length: usize,
26 
27     /// Are we currently discarding the remainder of a line which was over
28     /// the length limit?
29     is_discarding: bool,
30 }
31 
32 impl LinesCodec {
33     /// Returns a `LinesCodec` for splitting up data into lines.
34     ///
35     /// # Note
36     ///
37     /// The returned `LinesCodec` will not have an upper bound on the length
38     /// of a buffered line. See the documentation for [`new_with_max_length`]
39     /// for information on why this could be a potential security risk.
40     ///
41     /// [`new_with_max_length`]: crate::codec::LinesCodec::new_with_max_length()
new() -> LinesCodec42     pub fn new() -> LinesCodec {
43         LinesCodec {
44             next_index: 0,
45             max_length: usize::MAX,
46             is_discarding: false,
47         }
48     }
49 
50     /// Returns a `LinesCodec` with a maximum line length limit.
51     ///
52     /// If this is set, calls to `LinesCodec::decode` will return a
53     /// [`LinesCodecError`] when a line exceeds the length limit. Subsequent calls
54     /// will discard up to `limit` bytes from that line until a newline
55     /// character is reached, returning `None` until the line over the limit
56     /// has been fully discarded. After that point, calls to `decode` will
57     /// function as normal.
58     ///
59     /// # Note
60     ///
61     /// Setting a length limit is highly recommended for any `LinesCodec` which
62     /// will be exposed to untrusted input. Otherwise, the size of the buffer
63     /// that holds the line currently being read is unbounded. An attacker could
64     /// exploit this unbounded buffer by sending an unbounded amount of input
65     /// without any `\n` characters, causing unbounded memory consumption.
66     ///
67     /// [`LinesCodecError`]: crate::codec::LinesCodecError
new_with_max_length(max_length: usize) -> Self68     pub fn new_with_max_length(max_length: usize) -> Self {
69         LinesCodec {
70             max_length,
71             ..LinesCodec::new()
72         }
73     }
74 
75     /// Returns the maximum line length when decoding.
76     ///
77     /// ```
78     /// use std::usize;
79     /// use tokio_util::codec::LinesCodec;
80     ///
81     /// let codec = LinesCodec::new();
82     /// assert_eq!(codec.max_length(), usize::MAX);
83     /// ```
84     /// ```
85     /// use tokio_util::codec::LinesCodec;
86     ///
87     /// let codec = LinesCodec::new_with_max_length(256);
88     /// assert_eq!(codec.max_length(), 256);
89     /// ```
max_length(&self) -> usize90     pub fn max_length(&self) -> usize {
91         self.max_length
92     }
93 }
94 
utf8(buf: &[u8]) -> Result<&str, io::Error>95 fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
96     str::from_utf8(buf)
97         .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to decode input as UTF8"))
98 }
99 
without_carriage_return(s: &[u8]) -> &[u8]100 fn without_carriage_return(s: &[u8]) -> &[u8] {
101     if let Some(&b'\r') = s.last() {
102         &s[..s.len() - 1]
103     } else {
104         s
105     }
106 }
107 
108 impl Decoder for LinesCodec {
109     type Item = String;
110     type Error = LinesCodecError;
111 
decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError>112     fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> {
113         loop {
114             // Determine how far into the buffer we'll search for a newline. If
115             // there's no max_length set, we'll read to the end of the buffer.
116             let read_to = cmp::min(self.max_length.saturating_add(1), buf.len());
117 
118             let newline_offset = buf[self.next_index..read_to]
119                 .iter()
120                 .position(|b| *b == b'\n');
121 
122             match (self.is_discarding, newline_offset) {
123                 (true, Some(offset)) => {
124                     // If we found a newline, discard up to that offset and
125                     // then stop discarding. On the next iteration, we'll try
126                     // to read a line normally.
127                     buf.advance(offset + self.next_index + 1);
128                     self.is_discarding = false;
129                     self.next_index = 0;
130                 }
131                 (true, None) => {
132                     // Otherwise, we didn't find a newline, so we'll discard
133                     // everything we read. On the next iteration, we'll continue
134                     // discarding up to max_len bytes unless we find a newline.
135                     buf.advance(read_to);
136                     self.next_index = 0;
137                     if buf.is_empty() {
138                         return Ok(None);
139                     }
140                 }
141                 (false, Some(offset)) => {
142                     // Found a line!
143                     let newline_index = offset + self.next_index;
144                     self.next_index = 0;
145                     let line = buf.split_to(newline_index + 1);
146                     let line = &line[..line.len() - 1];
147                     let line = without_carriage_return(line);
148                     let line = utf8(line)?;
149                     return Ok(Some(line.to_string()));
150                 }
151                 (false, None) if buf.len() > self.max_length => {
152                     // Reached the maximum length without finding a
153                     // newline, return an error and start discarding on the
154                     // next call.
155                     self.is_discarding = true;
156                     return Err(LinesCodecError::MaxLineLengthExceeded);
157                 }
158                 (false, None) => {
159                     // We didn't find a line or reach the length limit, so the next
160                     // call will resume searching at the current offset.
161                     self.next_index = read_to;
162                     return Ok(None);
163                 }
164             }
165         }
166     }
167 
decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError>168     fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> {
169         Ok(match self.decode(buf)? {
170             Some(frame) => Some(frame),
171             None => {
172                 self.next_index = 0;
173                 // No terminating newline - return remaining data, if any
174                 if buf.is_empty() || buf == &b"\r"[..] {
175                     None
176                 } else {
177                     let line = buf.split_to(buf.len());
178                     let line = without_carriage_return(&line);
179                     let line = utf8(line)?;
180                     Some(line.to_string())
181                 }
182             }
183         })
184     }
185 }
186 
187 impl<T> Encoder<T> for LinesCodec
188 where
189     T: AsRef<str>,
190 {
191     type Error = LinesCodecError;
192 
encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), LinesCodecError>193     fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), LinesCodecError> {
194         let line = line.as_ref();
195         buf.reserve(line.len() + 1);
196         buf.put(line.as_bytes());
197         buf.put_u8(b'\n');
198         Ok(())
199     }
200 }
201 
202 impl Default for LinesCodec {
default() -> Self203     fn default() -> Self {
204         Self::new()
205     }
206 }
207 
208 /// An error occurred while encoding or decoding a line.
209 #[derive(Debug)]
210 pub enum LinesCodecError {
211     /// The maximum line length was exceeded.
212     MaxLineLengthExceeded,
213     /// An IO error occurred.
214     Io(io::Error),
215 }
216 
217 impl fmt::Display for LinesCodecError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result218     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219         match self {
220             LinesCodecError::MaxLineLengthExceeded => write!(f, "max line length exceeded"),
221             LinesCodecError::Io(e) => write!(f, "{e}"),
222         }
223     }
224 }
225 
226 impl From<io::Error> for LinesCodecError {
from(e: io::Error) -> LinesCodecError227     fn from(e: io::Error) -> LinesCodecError {
228         LinesCodecError::Io(e)
229     }
230 }
231 
232 impl std::error::Error for LinesCodecError {}
233