1 #![warn(rust_2018_idioms)]
2
3 use tokio::io::{AsyncRead, ReadBuf};
4 use tokio_test::assert_ready;
5 use tokio_test::task;
6 use tokio_util::codec::{Decoder, FramedRead};
7
8 use bytes::{Buf, BytesMut};
9 use futures::Stream;
10 use std::collections::VecDeque;
11 use std::io;
12 use std::pin::Pin;
13 use std::task::Poll::{Pending, Ready};
14 use std::task::{Context, Poll};
15
16 macro_rules! mock {
17 ($($x:expr,)*) => {{
18 let mut v = VecDeque::new();
19 v.extend(vec![$($x),*]);
20 Mock { calls: v }
21 }};
22 }
23
24 macro_rules! assert_read {
25 ($e:expr, $n:expr) => {{
26 let val = assert_ready!($e);
27 assert_eq!(val.unwrap().unwrap(), $n);
28 }};
29 }
30
31 macro_rules! pin {
32 ($id:ident) => {
33 Pin::new(&mut $id)
34 };
35 }
36
37 struct U32Decoder;
38
39 impl Decoder for U32Decoder {
40 type Item = u32;
41 type Error = io::Error;
42
decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>>43 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
44 if buf.len() < 4 {
45 return Ok(None);
46 }
47
48 let n = buf.split_to(4).get_u32();
49 Ok(Some(n))
50 }
51 }
52
53 struct U64Decoder;
54
55 impl Decoder for U64Decoder {
56 type Item = u64;
57 type Error = io::Error;
58
decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u64>>59 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u64>> {
60 if buf.len() < 8 {
61 return Ok(None);
62 }
63
64 let n = buf.split_to(8).get_u64();
65 Ok(Some(n))
66 }
67 }
68
69 #[test]
read_multi_frame_in_packet()70 fn read_multi_frame_in_packet() {
71 let mut task = task::spawn(());
72 let mock = mock! {
73 Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
74 };
75 let mut framed = FramedRead::new(mock, U32Decoder);
76
77 task.enter(|cx, _| {
78 assert_read!(pin!(framed).poll_next(cx), 0);
79 assert_read!(pin!(framed).poll_next(cx), 1);
80 assert_read!(pin!(framed).poll_next(cx), 2);
81 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
82 });
83 }
84
85 #[test]
read_multi_frame_across_packets()86 fn read_multi_frame_across_packets() {
87 let mut task = task::spawn(());
88 let mock = mock! {
89 Ok(b"\x00\x00\x00\x00".to_vec()),
90 Ok(b"\x00\x00\x00\x01".to_vec()),
91 Ok(b"\x00\x00\x00\x02".to_vec()),
92 };
93 let mut framed = FramedRead::new(mock, U32Decoder);
94
95 task.enter(|cx, _| {
96 assert_read!(pin!(framed).poll_next(cx), 0);
97 assert_read!(pin!(framed).poll_next(cx), 1);
98 assert_read!(pin!(framed).poll_next(cx), 2);
99 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
100 });
101 }
102
103 #[test]
read_multi_frame_in_packet_after_codec_changed()104 fn read_multi_frame_in_packet_after_codec_changed() {
105 let mut task = task::spawn(());
106 let mock = mock! {
107 Ok(b"\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x08".to_vec()),
108 };
109 let mut framed = FramedRead::new(mock, U32Decoder);
110
111 task.enter(|cx, _| {
112 assert_read!(pin!(framed).poll_next(cx), 0x04);
113
114 let mut framed = framed.map_decoder(|_| U64Decoder);
115 assert_read!(pin!(framed).poll_next(cx), 0x08);
116
117 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
118 });
119 }
120
121 #[test]
read_not_ready()122 fn read_not_ready() {
123 let mut task = task::spawn(());
124 let mock = mock! {
125 Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
126 Ok(b"\x00\x00\x00\x00".to_vec()),
127 Ok(b"\x00\x00\x00\x01".to_vec()),
128 };
129 let mut framed = FramedRead::new(mock, U32Decoder);
130
131 task.enter(|cx, _| {
132 assert!(pin!(framed).poll_next(cx).is_pending());
133 assert_read!(pin!(framed).poll_next(cx), 0);
134 assert_read!(pin!(framed).poll_next(cx), 1);
135 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
136 });
137 }
138
139 #[test]
read_partial_then_not_ready()140 fn read_partial_then_not_ready() {
141 let mut task = task::spawn(());
142 let mock = mock! {
143 Ok(b"\x00\x00".to_vec()),
144 Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
145 Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
146 };
147 let mut framed = FramedRead::new(mock, U32Decoder);
148
149 task.enter(|cx, _| {
150 assert!(pin!(framed).poll_next(cx).is_pending());
151 assert_read!(pin!(framed).poll_next(cx), 0);
152 assert_read!(pin!(framed).poll_next(cx), 1);
153 assert_read!(pin!(framed).poll_next(cx), 2);
154 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
155 });
156 }
157
158 #[test]
read_err()159 fn read_err() {
160 let mut task = task::spawn(());
161 let mock = mock! {
162 Err(io::Error::new(io::ErrorKind::Other, "")),
163 };
164 let mut framed = FramedRead::new(mock, U32Decoder);
165
166 task.enter(|cx, _| {
167 assert_eq!(
168 io::ErrorKind::Other,
169 assert_ready!(pin!(framed).poll_next(cx))
170 .unwrap()
171 .unwrap_err()
172 .kind()
173 )
174 });
175 }
176
177 #[test]
read_partial_then_err()178 fn read_partial_then_err() {
179 let mut task = task::spawn(());
180 let mock = mock! {
181 Ok(b"\x00\x00".to_vec()),
182 Err(io::Error::new(io::ErrorKind::Other, "")),
183 };
184 let mut framed = FramedRead::new(mock, U32Decoder);
185
186 task.enter(|cx, _| {
187 assert_eq!(
188 io::ErrorKind::Other,
189 assert_ready!(pin!(framed).poll_next(cx))
190 .unwrap()
191 .unwrap_err()
192 .kind()
193 )
194 });
195 }
196
197 #[test]
read_partial_would_block_then_err()198 fn read_partial_would_block_then_err() {
199 let mut task = task::spawn(());
200 let mock = mock! {
201 Ok(b"\x00\x00".to_vec()),
202 Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
203 Err(io::Error::new(io::ErrorKind::Other, "")),
204 };
205 let mut framed = FramedRead::new(mock, U32Decoder);
206
207 task.enter(|cx, _| {
208 assert!(pin!(framed).poll_next(cx).is_pending());
209 assert_eq!(
210 io::ErrorKind::Other,
211 assert_ready!(pin!(framed).poll_next(cx))
212 .unwrap()
213 .unwrap_err()
214 .kind()
215 )
216 });
217 }
218
219 #[test]
huge_size()220 fn huge_size() {
221 let mut task = task::spawn(());
222 let data = &[0; 32 * 1024][..];
223 let mut framed = FramedRead::new(data, BigDecoder);
224
225 task.enter(|cx, _| {
226 assert_read!(pin!(framed).poll_next(cx), 0);
227 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
228 });
229
230 struct BigDecoder;
231
232 impl Decoder for BigDecoder {
233 type Item = u32;
234 type Error = io::Error;
235
236 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
237 if buf.len() < 32 * 1024 {
238 return Ok(None);
239 }
240 buf.advance(32 * 1024);
241 Ok(Some(0))
242 }
243 }
244 }
245
246 #[test]
data_remaining_is_error()247 fn data_remaining_is_error() {
248 let mut task = task::spawn(());
249 let slice = &[0; 5][..];
250 let mut framed = FramedRead::new(slice, U32Decoder);
251
252 task.enter(|cx, _| {
253 assert_read!(pin!(framed).poll_next(cx), 0);
254 assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
255 });
256 }
257
258 #[test]
multi_frames_on_eof()259 fn multi_frames_on_eof() {
260 let mut task = task::spawn(());
261 struct MyDecoder(Vec<u32>);
262
263 impl Decoder for MyDecoder {
264 type Item = u32;
265 type Error = io::Error;
266
267 fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
268 unreachable!();
269 }
270
271 fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
272 if self.0.is_empty() {
273 return Ok(None);
274 }
275
276 Ok(Some(self.0.remove(0)))
277 }
278 }
279
280 let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
281
282 task.enter(|cx, _| {
283 assert_read!(pin!(framed).poll_next(cx), 0);
284 assert_read!(pin!(framed).poll_next(cx), 1);
285 assert_read!(pin!(framed).poll_next(cx), 2);
286 assert_read!(pin!(framed).poll_next(cx), 3);
287 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
288 });
289 }
290
291 #[test]
read_eof_then_resume()292 fn read_eof_then_resume() {
293 let mut task = task::spawn(());
294 let mock = mock! {
295 Ok(b"\x00\x00\x00\x01".to_vec()),
296 Ok(b"".to_vec()),
297 Ok(b"\x00\x00\x00\x02".to_vec()),
298 Ok(b"".to_vec()),
299 Ok(b"\x00\x00\x00\x03".to_vec()),
300 };
301 let mut framed = FramedRead::new(mock, U32Decoder);
302
303 task.enter(|cx, _| {
304 assert_read!(pin!(framed).poll_next(cx), 1);
305 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
306 assert_read!(pin!(framed).poll_next(cx), 2);
307 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
308 assert_read!(pin!(framed).poll_next(cx), 3);
309 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
310 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
311 });
312 }
313
314 // ===== Mock ======
315
316 struct Mock {
317 calls: VecDeque<io::Result<Vec<u8>>>,
318 }
319
320 impl AsyncRead for Mock {
poll_read( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>321 fn poll_read(
322 mut self: Pin<&mut Self>,
323 _cx: &mut Context<'_>,
324 buf: &mut ReadBuf<'_>,
325 ) -> Poll<io::Result<()>> {
326 use io::ErrorKind::WouldBlock;
327
328 match self.calls.pop_front() {
329 Some(Ok(data)) => {
330 debug_assert!(buf.remaining() >= data.len());
331 buf.put_slice(&data);
332 Ready(Ok(()))
333 }
334 Some(Err(ref e)) if e.kind() == WouldBlock => Pending,
335 Some(Err(e)) => Ready(Err(e)),
336 None => Ready(Ok(())),
337 }
338 }
339 }
340