1 #![cfg(feature = "std")]
2 #![cfg(feature = "tokio")]
3 
4 use std::{cell::Cell, io::Cursor, rc::Rc, str};
5 
6 use {futures_03_dep as futures, tokio_dep as tokio};
7 
8 use {
9     bytes::{Buf, BytesMut},
10     combine::{
11         error::{ParseError, StreamError},
12         parser::{
13             byte::digit,
14             combinator::{any_partial_state, AnyPartialState},
15             range::{range, recognize, take},
16         },
17         skip_many, skip_many1,
18         stream::{easy, PartialStream, RangeStream, StreamErrorFor},
19         Parser,
20     },
21     futures::prelude::*,
22     partial_io::PartialOp,
23     tokio_util::codec::{Decoder, FramedRead},
24 };
25 
26 // Workaround partial_io not working with tokio-0.2
27 #[path = "../tests/support/mod.rs"]
28 mod support;
29 use support::*;
30 
31 pub struct LanguageServerDecoder {
32     state: AnyPartialState,
33     content_length_parses: Rc<Cell<i32>>,
34 }
35 
36 impl Default for LanguageServerDecoder {
default() -> Self37     fn default() -> Self {
38         LanguageServerDecoder {
39             state: Default::default(),
40             content_length_parses: Rc::new(Cell::new(0)),
41         }
42     }
43 }
44 
45 /// Parses blocks of data with length headers
46 ///
47 /// ```
48 /// Content-Length: 18
49 ///
50 /// { "some": "data" }
51 /// ```
52 // The `content_length_parses` parameter only exists to demonstrate that `content_length` only
53 // gets parsed once per message
decode_parser<'a, Input>( content_length_parses: Rc<Cell<i32>>, ) -> impl Parser<Input, Output = Vec<u8>, PartialState = AnyPartialState> + 'a where Input: RangeStream<Token = u8, Range = &'a [u8]> + 'a,54 fn decode_parser<'a, Input>(
55     content_length_parses: Rc<Cell<i32>>,
56 ) -> impl Parser<Input, Output = Vec<u8>, PartialState = AnyPartialState> + 'a
57 where
58     Input: RangeStream<Token = u8, Range = &'a [u8]> + 'a,
59 {
60     let content_length = range(&b"Content-Length: "[..])
61         .with(recognize(skip_many1(digit())).and_then(|digits: &[u8]| {
62             str::from_utf8(digits)
63                 .unwrap()
64                 .parse::<usize>()
65                 // Convert the error from `.parse` into an error combine understands
66                 .map_err(StreamErrorFor::<Input>::other)
67         }))
68         .map(move |x| {
69             content_length_parses.set(content_length_parses.get() + 1);
70             x
71         });
72 
73     // `any_partial_state` boxes the state which hides the type and lets us store it in
74     // `self`
75     any_partial_state(
76         (
77             skip_many(range(&b"\r\n"[..])),
78             content_length,
79             range(&b"\r\n\r\n"[..]).map(|_| ()),
80         )
81             .then_partial(|&mut (_, message_length, _)| {
82                 take(message_length).map(|bytes: &[u8]| bytes.to_owned())
83             }),
84     )
85 }
86 
87 impl Decoder for LanguageServerDecoder {
88     type Item = String;
89     type Error = Box<dyn std::error::Error + Send + Sync>;
90 
decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>91     fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
92         println!("Decoding `{:?}`", str::from_utf8(src).unwrap_or("NOT UTF8"));
93 
94         let (opt, removed_len) = combine::stream::decode(
95             decode_parser(self.content_length_parses.clone()),
96             // easy::Stream gives us nice error messages
97             // (the same error messages that combine has had since its inception)
98             // PartialStream lets the parser know that more input should be
99             // expected if end of input is unexpectedly reached
100             &mut easy::Stream(PartialStream(&src[..])),
101             &mut self.state,
102         )
103         .map_err(|err| {
104             // Since err contains references into `src` we must replace these before
105             // we can return an error or call `advance` to remove the input we
106             // just committed
107             let err = err
108                 .map_range(|r| {
109                     str::from_utf8(r)
110                         .ok()
111                         .map_or_else(|| format!("{:?}", r), |s| s.to_string())
112                 })
113                 .map_position(|p| p.translate_position(&src[..]));
114             format!("{}\nIn input: `{}`", err, str::from_utf8(src).unwrap())
115         })?;
116 
117         println!(
118             "Accepted {} bytes: `{:?}`",
119             removed_len,
120             str::from_utf8(&src[..removed_len]).unwrap_or("NOT UTF8")
121         );
122 
123         // Remove the input we just committed.
124         // Ideally this would be done automatically by the call to
125         // `stream::decode` but it does unfortunately not work due
126         // to lifetime issues (Non lexical lifetimes might fix it!)
127         src.advance(removed_len);
128 
129         match opt {
130             // `None` means we did not have enough input and we require that the
131             // caller of `decode` supply more before calling us again
132             None => {
133                 println!("Requesting more input!");
134                 Ok(None)
135             }
136 
137             // `Some` means that a message was successfully decoded
138             // (and that we are ready to start decoding the next message)
139             Some(output) => {
140                 let value = String::from_utf8(output)?;
141                 println!("Decoded `{}`", value);
142                 Ok(Some(value))
143             }
144         }
145     }
146 }
147 
148 #[tokio::main]
main()149 async fn main() {
150     let input = "Content-Length: 6\r\n\
151                  \r\n\
152                  123456\r\n\
153                  Content-Length: 4\r\n\
154                  \r\n\
155                  true";
156 
157     let seq = vec![
158         PartialOp::Limited(20),
159         PartialOp::Limited(1),
160         PartialOp::Limited(2),
161         PartialOp::Limited(3),
162     ];
163     let reader = &mut Cursor::new(input.as_bytes());
164     // Using the `partial_io` crate we emulate the partial reads that would happen when reading
165     // asynchronously from an io device.
166     let partial_reader = PartialAsyncRead::new(reader, seq);
167 
168     let decoder = LanguageServerDecoder::default();
169     let content_length_parses = decoder.content_length_parses.clone();
170 
171     let result = FramedRead::new(partial_reader, decoder).try_collect().await;
172 
173     assert!(result.as_ref().is_ok(), "{}", result.unwrap_err());
174     let values: Vec<_> = result.unwrap();
175 
176     let expected_values = ["123456", "true"];
177     assert_eq!(values, expected_values);
178 
179     assert_eq!(content_length_parses.get(), expected_values.len() as i32);
180 
181     println!("Successfully parsed: `{}`", input);
182     println!(
183         "Found {} items and never repeated a completed parse!",
184         values.len(),
185     );
186     println!("Result: {:?}", values);
187 }
188