1 #![warn(rust_2018_idioms)]
2 
3 use tokio::io::{AsyncRead, ReadBuf};
4 use tokio_test::assert_ready;
5 use tokio_test::task;
6 use tokio_util::codec::{Decoder, FramedRead};
7 
8 use bytes::{Buf, BytesMut};
9 use futures::Stream;
10 use std::collections::VecDeque;
11 use std::io;
12 use std::pin::Pin;
13 use std::task::Poll::{Pending, Ready};
14 use std::task::{Context, Poll};
15 
16 macro_rules! mock {
17     ($($x:expr,)*) => {{
18         let mut v = VecDeque::new();
19         v.extend(vec![$($x),*]);
20         Mock { calls: v }
21     }};
22 }
23 
24 macro_rules! assert_read {
25     ($e:expr, $n:expr) => {{
26         let val = assert_ready!($e);
27         assert_eq!(val.unwrap().unwrap(), $n);
28     }};
29 }
30 
31 macro_rules! pin {
32     ($id:ident) => {
33         Pin::new(&mut $id)
34     };
35 }
36 
37 struct U32Decoder;
38 
39 impl Decoder for U32Decoder {
40     type Item = u32;
41     type Error = io::Error;
42 
decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>>43     fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
44         if buf.len() < 4 {
45             return Ok(None);
46         }
47 
48         let n = buf.split_to(4).get_u32();
49         Ok(Some(n))
50     }
51 }
52 
53 struct U64Decoder;
54 
55 impl Decoder for U64Decoder {
56     type Item = u64;
57     type Error = io::Error;
58 
decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u64>>59     fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u64>> {
60         if buf.len() < 8 {
61             return Ok(None);
62         }
63 
64         let n = buf.split_to(8).get_u64();
65         Ok(Some(n))
66     }
67 }
68 
69 #[test]
read_multi_frame_in_packet()70 fn read_multi_frame_in_packet() {
71     let mut task = task::spawn(());
72     let mock = mock! {
73         Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
74     };
75     let mut framed = FramedRead::new(mock, U32Decoder);
76 
77     task.enter(|cx, _| {
78         assert_read!(pin!(framed).poll_next(cx), 0);
79         assert_read!(pin!(framed).poll_next(cx), 1);
80         assert_read!(pin!(framed).poll_next(cx), 2);
81         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
82     });
83 }
84 
85 #[test]
read_multi_frame_across_packets()86 fn read_multi_frame_across_packets() {
87     let mut task = task::spawn(());
88     let mock = mock! {
89         Ok(b"\x00\x00\x00\x00".to_vec()),
90         Ok(b"\x00\x00\x00\x01".to_vec()),
91         Ok(b"\x00\x00\x00\x02".to_vec()),
92     };
93     let mut framed = FramedRead::new(mock, U32Decoder);
94 
95     task.enter(|cx, _| {
96         assert_read!(pin!(framed).poll_next(cx), 0);
97         assert_read!(pin!(framed).poll_next(cx), 1);
98         assert_read!(pin!(framed).poll_next(cx), 2);
99         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
100     });
101 }
102 
103 #[test]
read_multi_frame_in_packet_after_codec_changed()104 fn read_multi_frame_in_packet_after_codec_changed() {
105     let mut task = task::spawn(());
106     let mock = mock! {
107         Ok(b"\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x08".to_vec()),
108     };
109     let mut framed = FramedRead::new(mock, U32Decoder);
110 
111     task.enter(|cx, _| {
112         assert_read!(pin!(framed).poll_next(cx), 0x04);
113 
114         let mut framed = framed.map_decoder(|_| U64Decoder);
115         assert_read!(pin!(framed).poll_next(cx), 0x08);
116 
117         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
118     });
119 }
120 
121 #[test]
read_not_ready()122 fn read_not_ready() {
123     let mut task = task::spawn(());
124     let mock = mock! {
125         Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
126         Ok(b"\x00\x00\x00\x00".to_vec()),
127         Ok(b"\x00\x00\x00\x01".to_vec()),
128     };
129     let mut framed = FramedRead::new(mock, U32Decoder);
130 
131     task.enter(|cx, _| {
132         assert!(pin!(framed).poll_next(cx).is_pending());
133         assert_read!(pin!(framed).poll_next(cx), 0);
134         assert_read!(pin!(framed).poll_next(cx), 1);
135         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
136     });
137 }
138 
139 #[test]
read_partial_then_not_ready()140 fn read_partial_then_not_ready() {
141     let mut task = task::spawn(());
142     let mock = mock! {
143         Ok(b"\x00\x00".to_vec()),
144         Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
145         Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
146     };
147     let mut framed = FramedRead::new(mock, U32Decoder);
148 
149     task.enter(|cx, _| {
150         assert!(pin!(framed).poll_next(cx).is_pending());
151         assert_read!(pin!(framed).poll_next(cx), 0);
152         assert_read!(pin!(framed).poll_next(cx), 1);
153         assert_read!(pin!(framed).poll_next(cx), 2);
154         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
155     });
156 }
157 
158 #[test]
read_err()159 fn read_err() {
160     let mut task = task::spawn(());
161     let mock = mock! {
162         Err(io::Error::new(io::ErrorKind::Other, "")),
163     };
164     let mut framed = FramedRead::new(mock, U32Decoder);
165 
166     task.enter(|cx, _| {
167         assert_eq!(
168             io::ErrorKind::Other,
169             assert_ready!(pin!(framed).poll_next(cx))
170                 .unwrap()
171                 .unwrap_err()
172                 .kind()
173         )
174     });
175 }
176 
177 #[test]
read_partial_then_err()178 fn read_partial_then_err() {
179     let mut task = task::spawn(());
180     let mock = mock! {
181         Ok(b"\x00\x00".to_vec()),
182         Err(io::Error::new(io::ErrorKind::Other, "")),
183     };
184     let mut framed = FramedRead::new(mock, U32Decoder);
185 
186     task.enter(|cx, _| {
187         assert_eq!(
188             io::ErrorKind::Other,
189             assert_ready!(pin!(framed).poll_next(cx))
190                 .unwrap()
191                 .unwrap_err()
192                 .kind()
193         )
194     });
195 }
196 
197 #[test]
read_partial_would_block_then_err()198 fn read_partial_would_block_then_err() {
199     let mut task = task::spawn(());
200     let mock = mock! {
201         Ok(b"\x00\x00".to_vec()),
202         Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
203         Err(io::Error::new(io::ErrorKind::Other, "")),
204     };
205     let mut framed = FramedRead::new(mock, U32Decoder);
206 
207     task.enter(|cx, _| {
208         assert!(pin!(framed).poll_next(cx).is_pending());
209         assert_eq!(
210             io::ErrorKind::Other,
211             assert_ready!(pin!(framed).poll_next(cx))
212                 .unwrap()
213                 .unwrap_err()
214                 .kind()
215         )
216     });
217 }
218 
219 #[test]
huge_size()220 fn huge_size() {
221     let mut task = task::spawn(());
222     let data = &[0; 32 * 1024][..];
223     let mut framed = FramedRead::new(data, BigDecoder);
224 
225     task.enter(|cx, _| {
226         assert_read!(pin!(framed).poll_next(cx), 0);
227         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
228     });
229 
230     struct BigDecoder;
231 
232     impl Decoder for BigDecoder {
233         type Item = u32;
234         type Error = io::Error;
235 
236         fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
237             if buf.len() < 32 * 1024 {
238                 return Ok(None);
239             }
240             buf.advance(32 * 1024);
241             Ok(Some(0))
242         }
243     }
244 }
245 
246 #[test]
data_remaining_is_error()247 fn data_remaining_is_error() {
248     let mut task = task::spawn(());
249     let slice = &[0; 5][..];
250     let mut framed = FramedRead::new(slice, U32Decoder);
251 
252     task.enter(|cx, _| {
253         assert_read!(pin!(framed).poll_next(cx), 0);
254         assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
255     });
256 }
257 
258 #[test]
multi_frames_on_eof()259 fn multi_frames_on_eof() {
260     let mut task = task::spawn(());
261     struct MyDecoder(Vec<u32>);
262 
263     impl Decoder for MyDecoder {
264         type Item = u32;
265         type Error = io::Error;
266 
267         fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
268             unreachable!();
269         }
270 
271         fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
272             if self.0.is_empty() {
273                 return Ok(None);
274             }
275 
276             Ok(Some(self.0.remove(0)))
277         }
278     }
279 
280     let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
281 
282     task.enter(|cx, _| {
283         assert_read!(pin!(framed).poll_next(cx), 0);
284         assert_read!(pin!(framed).poll_next(cx), 1);
285         assert_read!(pin!(framed).poll_next(cx), 2);
286         assert_read!(pin!(framed).poll_next(cx), 3);
287         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
288     });
289 }
290 
291 #[test]
read_eof_then_resume()292 fn read_eof_then_resume() {
293     let mut task = task::spawn(());
294     let mock = mock! {
295         Ok(b"\x00\x00\x00\x01".to_vec()),
296         Ok(b"".to_vec()),
297         Ok(b"\x00\x00\x00\x02".to_vec()),
298         Ok(b"".to_vec()),
299         Ok(b"\x00\x00\x00\x03".to_vec()),
300     };
301     let mut framed = FramedRead::new(mock, U32Decoder);
302 
303     task.enter(|cx, _| {
304         assert_read!(pin!(framed).poll_next(cx), 1);
305         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
306         assert_read!(pin!(framed).poll_next(cx), 2);
307         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
308         assert_read!(pin!(framed).poll_next(cx), 3);
309         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
310         assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
311     });
312 }
313 
314 // ===== Mock ======
315 
316 struct Mock {
317     calls: VecDeque<io::Result<Vec<u8>>>,
318 }
319 
320 impl AsyncRead for Mock {
poll_read( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>321     fn poll_read(
322         mut self: Pin<&mut Self>,
323         _cx: &mut Context<'_>,
324         buf: &mut ReadBuf<'_>,
325     ) -> Poll<io::Result<()>> {
326         use io::ErrorKind::WouldBlock;
327 
328         match self.calls.pop_front() {
329             Some(Ok(data)) => {
330                 debug_assert!(buf.remaining() >= data.len());
331                 buf.put_slice(&data);
332                 Ready(Ok(()))
333             }
334             Some(Err(ref e)) if e.kind() == WouldBlock => Pending,
335             Some(Err(e)) => Ready(Err(e)),
336             None => Ready(Ok(())),
337         }
338     }
339 }
340