1 #![cfg(feature = "std")]
2 #![cfg(feature = "tokio")]
3
4 use std::{cell::Cell, io::Cursor, rc::Rc, str};
5
6 use {futures_03_dep as futures, tokio_dep as tokio};
7
8 use {
9 bytes::{Buf, BytesMut},
10 combine::{
11 error::{ParseError, StreamError},
12 parser::{
13 byte::digit,
14 combinator::{any_partial_state, AnyPartialState},
15 range::{range, recognize, take},
16 },
17 skip_many, skip_many1,
18 stream::{easy, PartialStream, RangeStream, StreamErrorFor},
19 Parser,
20 },
21 futures::prelude::*,
22 partial_io::PartialOp,
23 tokio_util::codec::{Decoder, FramedRead},
24 };
25
26 // Workaround partial_io not working with tokio-0.2
27 #[path = "../tests/support/mod.rs"]
28 mod support;
29 use support::*;
30
31 pub struct LanguageServerDecoder {
32 state: AnyPartialState,
33 content_length_parses: Rc<Cell<i32>>,
34 }
35
36 impl Default for LanguageServerDecoder {
default() -> Self37 fn default() -> Self {
38 LanguageServerDecoder {
39 state: Default::default(),
40 content_length_parses: Rc::new(Cell::new(0)),
41 }
42 }
43 }
44
45 /// Parses blocks of data with length headers
46 ///
47 /// ```
48 /// Content-Length: 18
49 ///
50 /// { "some": "data" }
51 /// ```
52 // The `content_length_parses` parameter only exists to demonstrate that `content_length` only
53 // gets parsed once per message
decode_parser<'a, Input>( content_length_parses: Rc<Cell<i32>>, ) -> impl Parser<Input, Output = Vec<u8>, PartialState = AnyPartialState> + 'a where Input: RangeStream<Token = u8, Range = &'a [u8]> + 'a,54 fn decode_parser<'a, Input>(
55 content_length_parses: Rc<Cell<i32>>,
56 ) -> impl Parser<Input, Output = Vec<u8>, PartialState = AnyPartialState> + 'a
57 where
58 Input: RangeStream<Token = u8, Range = &'a [u8]> + 'a,
59 {
60 let content_length = range(&b"Content-Length: "[..])
61 .with(recognize(skip_many1(digit())).and_then(|digits: &[u8]| {
62 str::from_utf8(digits)
63 .unwrap()
64 .parse::<usize>()
65 // Convert the error from `.parse` into an error combine understands
66 .map_err(StreamErrorFor::<Input>::other)
67 }))
68 .map(move |x| {
69 content_length_parses.set(content_length_parses.get() + 1);
70 x
71 });
72
73 // `any_partial_state` boxes the state which hides the type and lets us store it in
74 // `self`
75 any_partial_state(
76 (
77 skip_many(range(&b"\r\n"[..])),
78 content_length,
79 range(&b"\r\n\r\n"[..]).map(|_| ()),
80 )
81 .then_partial(|&mut (_, message_length, _)| {
82 take(message_length).map(|bytes: &[u8]| bytes.to_owned())
83 }),
84 )
85 }
86
87 impl Decoder for LanguageServerDecoder {
88 type Item = String;
89 type Error = Box<dyn std::error::Error + Send + Sync>;
90
decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>91 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
92 println!("Decoding `{:?}`", str::from_utf8(src).unwrap_or("NOT UTF8"));
93
94 let (opt, removed_len) = combine::stream::decode(
95 decode_parser(self.content_length_parses.clone()),
96 // easy::Stream gives us nice error messages
97 // (the same error messages that combine has had since its inception)
98 // PartialStream lets the parser know that more input should be
99 // expected if end of input is unexpectedly reached
100 &mut easy::Stream(PartialStream(&src[..])),
101 &mut self.state,
102 )
103 .map_err(|err| {
104 // Since err contains references into `src` we must replace these before
105 // we can return an error or call `advance` to remove the input we
106 // just committed
107 let err = err
108 .map_range(|r| {
109 str::from_utf8(r)
110 .ok()
111 .map_or_else(|| format!("{:?}", r), |s| s.to_string())
112 })
113 .map_position(|p| p.translate_position(&src[..]));
114 format!("{}\nIn input: `{}`", err, str::from_utf8(src).unwrap())
115 })?;
116
117 println!(
118 "Accepted {} bytes: `{:?}`",
119 removed_len,
120 str::from_utf8(&src[..removed_len]).unwrap_or("NOT UTF8")
121 );
122
123 // Remove the input we just committed.
124 // Ideally this would be done automatically by the call to
125 // `stream::decode` but it does unfortunately not work due
126 // to lifetime issues (Non lexical lifetimes might fix it!)
127 src.advance(removed_len);
128
129 match opt {
130 // `None` means we did not have enough input and we require that the
131 // caller of `decode` supply more before calling us again
132 None => {
133 println!("Requesting more input!");
134 Ok(None)
135 }
136
137 // `Some` means that a message was successfully decoded
138 // (and that we are ready to start decoding the next message)
139 Some(output) => {
140 let value = String::from_utf8(output)?;
141 println!("Decoded `{}`", value);
142 Ok(Some(value))
143 }
144 }
145 }
146 }
147
148 #[tokio::main]
main()149 async fn main() {
150 let input = "Content-Length: 6\r\n\
151 \r\n\
152 123456\r\n\
153 Content-Length: 4\r\n\
154 \r\n\
155 true";
156
157 let seq = vec![
158 PartialOp::Limited(20),
159 PartialOp::Limited(1),
160 PartialOp::Limited(2),
161 PartialOp::Limited(3),
162 ];
163 let reader = &mut Cursor::new(input.as_bytes());
164 // Using the `partial_io` crate we emulate the partial reads that would happen when reading
165 // asynchronously from an io device.
166 let partial_reader = PartialAsyncRead::new(reader, seq);
167
168 let decoder = LanguageServerDecoder::default();
169 let content_length_parses = decoder.content_length_parses.clone();
170
171 let result = FramedRead::new(partial_reader, decoder).try_collect().await;
172
173 assert!(result.as_ref().is_ok(), "{}", result.unwrap_err());
174 let values: Vec<_> = result.unwrap();
175
176 let expected_values = ["123456", "true"];
177 assert_eq!(values, expected_values);
178
179 assert_eq!(content_length_parses.get(), expected_values.len() as i32);
180
181 println!("Successfully parsed: `{}`", input);
182 println!(
183 "Found {} items and never repeated a completed parse!",
184 values.len(),
185 );
186 println!("Result: {:?}", values);
187 }
188