1 //
2 //Traits and implementations of arbitrary data streams.
3 //!
4 //! Streams are similar to the `Iterator` trait in that they represent some sequential set of items
5 //! which can be retrieved one by one. Where `Stream`s differ is that they are allowed to return
6 //! errors instead of just `None` and if they implement the `RangeStreamOnce` trait they are also
7 //! capable of returning multiple items at the same time, usually in the form of a slice.
8 //!
9 //! In addition to he functionality above, a proper `Stream` usable by a `Parser` must also have a
10 //! position (marked by the `Positioned` trait) and must also be resetable (marked by the
11 //! `ResetStream` trait). The former is used to ensure that errors at different points in the stream
12 //! aren't combined and the latter is used in parsers such as `or` to try multiple alternative
13 //! parses.
14
15 use crate::lib::{cmp::Ordering, fmt, marker::PhantomData, str::Chars};
16
17 use crate::{
18 error::{
19 ParseError,
20 ParseResult::{self, *},
21 StreamError, StringStreamError, Tracked, UnexpectedParse,
22 },
23 Parser,
24 };
25
26 #[cfg(feature = "std")]
27 pub use self::decoder::Decoder;
28
29 #[doc(hidden)]
30 #[macro_export]
31 macro_rules! clone_resetable {
32 (( $($params: tt)* ) $ty: ty) => {
33 impl<$($params)*> ResetStream for $ty
34 where Self: StreamOnce
35 {
36 type Checkpoint = Self;
37
38 fn checkpoint(&self) -> Self {
39 self.clone()
40 }
41 #[inline]
42 fn reset(&mut self, checkpoint: Self) -> Result<(), Self::Error> {
43 *self = checkpoint;
44 Ok(())
45 }
46 }
47 }
48 }
49
50 #[cfg(feature = "std")]
51 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
52 pub mod buf_reader;
53 /// Stream wrapper which provides a `ResetStream` impl for `StreamOnce` impls which do not have
54 /// one.
55 #[cfg(feature = "alloc")]
56 #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
57 pub mod buffered;
58 #[cfg(feature = "std")]
59 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
60 pub mod easy;
61 /// Stream wrapper which provides more detailed position information.
62 pub mod position;
63 /// Stream wrapper allowing `std::io::Read` to be used
64 #[cfg(feature = "std")]
65 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
66 pub mod read;
67 pub mod span;
68 /// Stream wrapper allowing custom state to be used.
69 pub mod state;
70
71 #[cfg(feature = "std")]
72 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
73 pub mod decoder;
74
75 /// A type which has a position.
76 pub trait Positioned: StreamOnce {
77 /// Returns the current position of the stream.
position(&self) -> Self::Position78 fn position(&self) -> Self::Position;
79 }
80
81 /// Convenience alias over the `StreamError` for the input stream `Input`
82 ///
83 /// ```
84 /// #[macro_use]
85 /// extern crate combine;
86 /// use combine::{easy, Parser, Stream, many1};
87 /// use combine::parser::char::letter;
88 /// use combine::stream::StreamErrorFor;
89 /// use combine::error::{ParseError, StreamError};
90 ///
91 /// parser!{
92 /// fn parser[Input]()(Input) -> String
93 /// where [ Input: Stream<Token = char>, ]
94 /// {
95 /// many1(letter()).and_then(|word: String| {
96 /// if word == "combine" {
97 /// Ok(word)
98 /// } else {
99 /// // The alias makes it easy to refer to the `StreamError` type of `Input`
100 /// Err(StreamErrorFor::<Input>::expected_static_message("combine"))
101 /// }
102 /// })
103 /// }
104 /// }
105 ///
106 /// fn main() {
107 /// }
108 /// ```
109 pub type StreamErrorFor<Input> = <<Input as StreamOnce>::Error as ParseError<
110 <Input as StreamOnce>::Token,
111 <Input as StreamOnce>::Range,
112 <Input as StreamOnce>::Position,
113 >>::StreamError;
114
115 /// `StreamOnce` represents a sequence of items that can be extracted one by one.
116 pub trait StreamOnce {
117 /// The type of items which is yielded from this stream.
118 type Token: Clone;
119
120 /// The type of a range of items yielded from this stream.
121 /// Types which do not a have a way of yielding ranges of items should just use the
122 /// `Self::Token` for this type.
123 type Range: Clone;
124
125 /// Type which represents the position in a stream.
126 /// `Ord` is required to allow parsers to determine which of two positions are further ahead.
127 type Position: Clone + Ord;
128
129 type Error: ParseError<Self::Token, Self::Range, Self::Position>;
130 /// Takes a stream and removes its first token, yielding the token and the rest of the elements.
131 /// Returns `Err` if no element could be retrieved.
uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>>132 fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>>;
133
134 /// Returns `true` if this stream only contains partial input.
135 ///
136 /// See `PartialStream`.
is_partial(&self) -> bool137 fn is_partial(&self) -> bool {
138 false
139 }
140 }
141
142 /// A `StreamOnce` which can create checkpoints which the stream can be reset to
143 pub trait ResetStream: StreamOnce {
144 type Checkpoint: Clone;
145
146 /// Creates a `Checkpoint` at the current position which can be used to reset the stream
147 /// later to the current position
checkpoint(&self) -> Self::Checkpoint148 fn checkpoint(&self) -> Self::Checkpoint;
149 /// Attempts to reset the stream to an earlier position.
reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error>150 fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error>;
151 }
152
153 clone_resetable! {('a) &'a str}
154 clone_resetable! {('a, T) &'a [T]}
155 clone_resetable! {('a, T) SliceStream<'a, T> }
156 clone_resetable! {(T: Clone) IteratorStream<T>}
157
158 /// A stream of tokens which can be duplicated
159 ///
160 /// This is a trait over types which implement the `StreamOnce`, `ResetStream` and `Positioned`
161 /// traits. If you need a custom `Stream` object then implement those traits and `Stream` is
162 /// implemented automatically.
163 pub trait Stream: StreamOnce + ResetStream + Positioned {}
164
165 impl<Input> Stream for Input
166 where
167 Input: StreamOnce + Positioned + ResetStream,
168 {
169 }
170
171 #[inline]
uncons<Input>(input: &mut Input) -> ParseResult<Input::Token, Input::Error> where Input: ?Sized + Stream,172 pub fn uncons<Input>(input: &mut Input) -> ParseResult<Input::Token, Input::Error>
173 where
174 Input: ?Sized + Stream,
175 {
176 match input.uncons() {
177 Ok(x) => CommitOk(x),
178 Err(err) => wrap_stream_error(input, err),
179 }
180 }
181
182 /// A `RangeStream` is an extension of `StreamOnce` which allows for zero copy parsing.
183 pub trait RangeStreamOnce: StreamOnce + ResetStream {
184 /// Takes `size` elements from the stream.
185 /// Fails if the length of the stream is less than `size`.
uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>186 fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>;
187
188 /// Takes items from stream, testing each one with `predicate`.
189 /// returns the range of items which passed `predicate`.
uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool190 fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
191 where
192 F: FnMut(Self::Token) -> bool;
193
194 #[inline]
195 /// Takes items from stream, testing each one with `predicate`
196 /// returns a range of at least one items which passed `predicate`.
197 ///
198 /// # Note
199 ///
200 /// This may not return `PeekOk` as it should uncons at least one token.
uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,201 fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
202 where
203 F: FnMut(Self::Token) -> bool,
204 {
205 let mut committed = false;
206 let mut started_at_eoi = true;
207 let result = self.uncons_while(|c| {
208 let ok = f(c);
209 committed |= ok;
210 started_at_eoi = false;
211 ok
212 });
213 if committed {
214 match result {
215 Ok(x) => CommitOk(x),
216 Err(x) => CommitErr(x),
217 }
218 } else if started_at_eoi {
219 PeekErr(Tracked::from(StreamErrorFor::<Self>::end_of_input()))
220 } else {
221 PeekErr(Tracked::from(
222 StreamErrorFor::<Self>::unexpected_static_message(""),
223 ))
224 }
225 }
226
227 /// Returns the distance between `self` and `end`. The returned `usize` must be so that
228 ///
229 /// ```ignore
230 /// let start = stream.checkpoint();
231 /// stream.uncons_range(distance);
232 /// stream.distance(&start) == distance
233 /// ```
distance(&self, end: &Self::Checkpoint) -> usize234 fn distance(&self, end: &Self::Checkpoint) -> usize;
235
236 /// Returns the entire range of `self`
range(&self) -> Self::Range237 fn range(&self) -> Self::Range;
238 }
239
240 /// A `RangeStream` is an extension of `Stream` which allows for zero copy parsing.
241 pub trait RangeStream: Stream + RangeStreamOnce {}
242
243 impl<Input> RangeStream for Input where Input: RangeStreamOnce + Stream {}
244
245 #[doc(hidden)]
wrap_stream_error<T, Input>( input: &Input, err: <Input::Error as ParseError<Input::Token, Input::Range, Input::Position>>::StreamError, ) -> ParseResult<T, <Input as StreamOnce>::Error> where Input: ?Sized + StreamOnce + Positioned,246 pub fn wrap_stream_error<T, Input>(
247 input: &Input,
248 err: <Input::Error as ParseError<Input::Token, Input::Range, Input::Position>>::StreamError,
249 ) -> ParseResult<T, <Input as StreamOnce>::Error>
250 where
251 Input: ?Sized + StreamOnce + Positioned,
252 {
253 let err = Input::Error::from_error(input.position(), err);
254 if input.is_partial() {
255 CommitErr(err)
256 } else {
257 PeekErr(err.into())
258 }
259 }
260
261 #[inline]
uncons_range<Input>( input: &mut Input, size: usize, ) -> ParseResult<Input::Range, <Input as StreamOnce>::Error> where Input: ?Sized + RangeStream,262 pub fn uncons_range<Input>(
263 input: &mut Input,
264 size: usize,
265 ) -> ParseResult<Input::Range, <Input as StreamOnce>::Error>
266 where
267 Input: ?Sized + RangeStream,
268 {
269 match input.uncons_range(size) {
270 Err(err) => wrap_stream_error(input, err),
271 Ok(x) => {
272 if size == 0 {
273 PeekOk(x)
274 } else {
275 CommitOk(x)
276 }
277 }
278 }
279 }
280
281 #[doc(hidden)]
input_at_eof<Input>(input: &mut Input) -> bool where Input: ?Sized + Stream,282 pub fn input_at_eof<Input>(input: &mut Input) -> bool
283 where
284 Input: ?Sized + Stream,
285 {
286 let before = input.checkpoint();
287 let x = input
288 .uncons()
289 .err()
290 .map_or(false, |err| err.is_unexpected_end_of_input());
291 input.reset(before).is_ok() && x
292 }
293
294 /// Removes items from the input while `predicate` returns `true`.
295 #[inline]
uncons_while<Input, F>( input: &mut Input, predicate: F, ) -> ParseResult<Input::Range, Input::Error> where F: FnMut(Input::Token) -> bool, Input: ?Sized + RangeStream, Input::Range: Range,296 pub fn uncons_while<Input, F>(
297 input: &mut Input,
298 predicate: F,
299 ) -> ParseResult<Input::Range, Input::Error>
300 where
301 F: FnMut(Input::Token) -> bool,
302 Input: ?Sized + RangeStream,
303 Input::Range: Range,
304 {
305 match input.uncons_while(predicate) {
306 Err(err) => wrap_stream_error(input, err),
307 Ok(x) => {
308 if input.is_partial() && input_at_eof(input) {
309 // Partial inputs which encounter end of file must fail to let more input be
310 // retrieved
311 CommitErr(Input::Error::from_error(
312 input.position(),
313 StreamError::end_of_input(),
314 ))
315 } else if x.len() == 0 {
316 PeekOk(x)
317 } else {
318 CommitOk(x)
319 }
320 }
321 }
322 }
323
324 #[inline]
325 /// Takes items from stream, testing each one with `predicate`
326 /// returns a range of at least one items which passed `predicate`.
327 ///
328 /// # Note
329 ///
330 /// This may not return `PeekOk` as it should uncons at least one token.
uncons_while1<Input, F>( input: &mut Input, predicate: F, ) -> ParseResult<Input::Range, Input::Error> where F: FnMut(Input::Token) -> bool, Input: ?Sized + RangeStream,331 pub fn uncons_while1<Input, F>(
332 input: &mut Input,
333 predicate: F,
334 ) -> ParseResult<Input::Range, Input::Error>
335 where
336 F: FnMut(Input::Token) -> bool,
337 Input: ?Sized + RangeStream,
338 {
339 match input.uncons_while1(predicate) {
340 CommitOk(x) => {
341 if input.is_partial() && input_at_eof(input) {
342 // Partial inputs which encounter end of file must fail to let more input be
343 // retrieved
344 CommitErr(Input::Error::from_error(
345 input.position(),
346 StreamError::end_of_input(),
347 ))
348 } else {
349 CommitOk(x)
350 }
351 }
352 PeekErr(_) => {
353 if input.is_partial() && input_at_eof(input) {
354 // Partial inputs which encounter end of file must fail to let more input be
355 // retrieved
356 CommitErr(Input::Error::from_error(
357 input.position(),
358 StreamError::end_of_input(),
359 ))
360 } else {
361 PeekErr(Input::Error::empty(input.position()).into())
362 }
363 }
364 CommitErr(err) => {
365 if input.is_partial() && input_at_eof(input) {
366 // Partial inputs which encounter end of file must fail to let more input be
367 // retrieved
368 CommitErr(Input::Error::from_error(
369 input.position(),
370 StreamError::end_of_input(),
371 ))
372 } else {
373 wrap_stream_error(input, err)
374 }
375 }
376 PeekOk(_) => unreachable!(),
377 }
378 }
379
380 /// Trait representing a range of elements.
381 pub trait Range {
382 /// Returns the remaining length of `self`.
383 /// The returned length need not be the same as the number of items left in the stream.
len(&self) -> usize384 fn len(&self) -> usize;
385
386 /// Returns `true` if the range does not contain any elements (`Range::len() == 0`)
is_empty(&self) -> bool387 fn is_empty(&self) -> bool {
388 self.len() == 0
389 }
390 }
391
392 impl<'a, I> StreamOnce for &'a mut I
393 where
394 I: StreamOnce + ?Sized,
395 {
396 type Token = I::Token;
397
398 type Range = I::Range;
399
400 type Position = I::Position;
401
402 type Error = I::Error;
uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>>403 fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>> {
404 (**self).uncons()
405 }
406
is_partial(&self) -> bool407 fn is_partial(&self) -> bool {
408 (**self).is_partial()
409 }
410 }
411
412 impl<'a, I> Positioned for &'a mut I
413 where
414 I: Positioned + ?Sized,
415 {
416 #[inline]
position(&self) -> Self::Position417 fn position(&self) -> Self::Position {
418 (**self).position()
419 }
420 }
421
422 impl<'a, I> ResetStream for &'a mut I
423 where
424 I: ResetStream + ?Sized,
425 {
426 type Checkpoint = I::Checkpoint;
427
checkpoint(&self) -> Self::Checkpoint428 fn checkpoint(&self) -> Self::Checkpoint {
429 (**self).checkpoint()
430 }
431
reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error>432 fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error> {
433 (**self).reset(checkpoint)
434 }
435 }
436
437 impl<'a, I> RangeStreamOnce for &'a mut I
438 where
439 I: RangeStreamOnce + ?Sized,
440 {
441 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,442 fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
443 where
444 F: FnMut(Self::Token) -> bool,
445 {
446 (**self).uncons_while(f)
447 }
448
449 #[inline]
uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,450 fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
451 where
452 F: FnMut(Self::Token) -> bool,
453 {
454 (**self).uncons_while1(f)
455 }
456
457 #[inline]
uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>458 fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
459 (**self).uncons_range(size)
460 }
461
462 #[inline]
distance(&self, end: &Self::Checkpoint) -> usize463 fn distance(&self, end: &Self::Checkpoint) -> usize {
464 (**self).distance(end)
465 }
466
range(&self) -> Self::Range467 fn range(&self) -> Self::Range {
468 (**self).range()
469 }
470 }
471
472 impl<'a, I> Range for &'a mut I
473 where
474 I: Range + ?Sized,
475 {
len(&self) -> usize476 fn len(&self) -> usize {
477 (**self).len()
478 }
479 }
480
481 impl<'a> StreamOnce for &'a str {
482 type Token = char;
483 type Range = &'a str;
484 type Position = PointerOffset<str>;
485 type Error = StringStreamError;
486
487 #[inline]
uncons(&mut self) -> Result<char, StreamErrorFor<Self>>488 fn uncons(&mut self) -> Result<char, StreamErrorFor<Self>> {
489 let mut chars = self.chars();
490 match chars.next() {
491 Some(c) => {
492 *self = chars.as_str();
493 Ok(c)
494 }
495 None => Err(StringStreamError::Eoi),
496 }
497 }
498 }
499
500 impl<'a> Positioned for &'a str {
501 #[inline]
position(&self) -> Self::Position502 fn position(&self) -> Self::Position {
503 PointerOffset::new(self.as_bytes().position().0)
504 }
505 }
506
507 #[allow(clippy::while_let_loop)]
str_uncons_while<'a, F>(slice: &mut &'a str, mut chars: Chars<'a>, mut f: F) -> &'a str where F: FnMut(char) -> bool,508 fn str_uncons_while<'a, F>(slice: &mut &'a str, mut chars: Chars<'a>, mut f: F) -> &'a str
509 where
510 F: FnMut(char) -> bool,
511 {
512 let mut last_char_size = 0;
513
514 macro_rules! test_next {
515 () => {
516 match chars.next() {
517 Some(c) => {
518 if !f(c) {
519 last_char_size = c.len_utf8();
520 break;
521 }
522 }
523 None => break,
524 }
525 };
526 }
527 loop {
528 test_next!();
529 test_next!();
530 test_next!();
531 test_next!();
532 test_next!();
533 test_next!();
534 test_next!();
535 test_next!();
536 }
537
538 let len = slice.len() - chars.as_str().len() - last_char_size;
539 let (result, rest) = slice.split_at(len);
540 *slice = rest;
541 result
542 }
543
544 impl<'a> RangeStreamOnce for &'a str {
uncons_while<F>(&mut self, f: F) -> Result<&'a str, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,545 fn uncons_while<F>(&mut self, f: F) -> Result<&'a str, StreamErrorFor<Self>>
546 where
547 F: FnMut(Self::Token) -> bool,
548 {
549 Ok(str_uncons_while(self, self.chars(), f))
550 }
551
552 #[inline]
uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,553 fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
554 where
555 F: FnMut(Self::Token) -> bool,
556 {
557 let mut chars = self.chars();
558 match chars.next() {
559 Some(c) => {
560 if !f(c) {
561 return PeekErr(Tracked::from(StringStreamError::UnexpectedParse));
562 }
563 }
564 None => return PeekErr(Tracked::from(StringStreamError::Eoi)),
565 }
566
567 CommitOk(str_uncons_while(self, chars, f))
568 }
569
570 #[inline]
uncons_range(&mut self, size: usize) -> Result<&'a str, StreamErrorFor<Self>>571 fn uncons_range(&mut self, size: usize) -> Result<&'a str, StreamErrorFor<Self>> {
572 fn is_char_boundary(s: &str, index: usize) -> bool {
573 if index == s.len() {
574 return true;
575 }
576 match s.as_bytes().get(index) {
577 None => false,
578 Some(b) => !(128..=192).contains(b),
579 }
580 }
581 if size <= self.len() {
582 if is_char_boundary(self, size) {
583 let (result, remaining) = self.split_at(size);
584 *self = remaining;
585 Ok(result)
586 } else {
587 Err(StringStreamError::CharacterBoundary)
588 }
589 } else {
590 Err(StringStreamError::Eoi)
591 }
592 }
593
594 #[inline]
distance(&self, end: &Self) -> usize595 fn distance(&self, end: &Self) -> usize {
596 self.position().0 - end.position().0
597 }
598
range(&self) -> Self::Range599 fn range(&self) -> Self::Range {
600 self
601 }
602 }
603
604 impl<'a> Range for &'a str {
605 #[inline]
len(&self) -> usize606 fn len(&self) -> usize {
607 str::len(self)
608 }
609 }
610
611 impl<'a, T> Range for &'a [T] {
612 #[inline]
len(&self) -> usize613 fn len(&self) -> usize {
614 <[T]>::len(self)
615 }
616 }
617
618 #[repr(usize)]
619 enum UnconsStart {
620 Zero = 0,
621 One = 1,
622 }
623
slice_uncons_while<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T] where F: FnMut(T) -> bool, T: Clone,624 fn slice_uncons_while<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T]
625 where
626 F: FnMut(T) -> bool,
627 T: Clone,
628 {
629 let mut i = start as usize;
630 let len = slice.len();
631 // SAFETY: We only call this function with `One` if the slice has length >= 1
632 debug_assert!(len >= i, "");
633 let mut found = false;
634
635 macro_rules! check {
636 () => {
637 if !f(unsafe { slice.get_unchecked(i).clone() }) {
638 found = true;
639 break;
640 }
641 i += 1;
642 };
643 }
644
645 // SAFETY: ensures we can access at least 8 elements starting at i, making get_unchecked sound.
646 while len - i >= 8 {
647 check!();
648 check!();
649 check!();
650 check!();
651 check!();
652 check!();
653 check!();
654 check!();
655 }
656
657 if !found {
658 while let Some(c) = slice.get(i) {
659 if !f(c.clone()) {
660 break;
661 }
662 i += 1;
663 }
664 }
665
666 let (result, remaining) = slice.split_at(i);
667 *slice = remaining;
668 result
669 }
670
671 impl<'a, T> RangeStreamOnce for &'a [T]
672 where
673 T: Clone + PartialEq,
674 {
675 #[inline]
uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>>676 fn uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>> {
677 if size <= self.len() {
678 let (result, remaining) = self.split_at(size);
679 *self = remaining;
680 Ok(result)
681 } else {
682 Err(UnexpectedParse::Eoi)
683 }
684 }
685
686 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,687 fn uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>>
688 where
689 F: FnMut(Self::Token) -> bool,
690 {
691 Ok(slice_uncons_while(self, UnconsStart::Zero, f))
692 }
693
694 #[inline]
uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,695 fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
696 where
697 F: FnMut(Self::Token) -> bool,
698 {
699 match self.first() {
700 Some(c) => {
701 if !f(c.clone()) {
702 return PeekErr(Tracked::from(UnexpectedParse::Unexpected));
703 }
704 }
705 None => {
706 return PeekErr(Tracked::from(UnexpectedParse::Eoi));
707 }
708 }
709
710 CommitOk(slice_uncons_while(self, UnconsStart::One, f))
711 }
712
713 #[inline]
distance(&self, end: &Self) -> usize714 fn distance(&self, end: &Self) -> usize {
715 end.len() - self.len()
716 }
717
range(&self) -> Self::Range718 fn range(&self) -> Self::Range {
719 self
720 }
721 }
722
723 impl<'a, T> Positioned for &'a [T]
724 where
725 T: Clone + PartialEq,
726 {
727 #[inline]
position(&self) -> Self::Position728 fn position(&self) -> Self::Position {
729 PointerOffset::new(self.as_ptr() as usize)
730 }
731 }
732
733 impl<'a, T> StreamOnce for &'a [T]
734 where
735 T: Clone + PartialEq,
736 {
737 type Token = T;
738 type Range = &'a [T];
739 type Position = PointerOffset<[T]>;
740 type Error = UnexpectedParse;
741
742 #[inline]
uncons(&mut self) -> Result<T, StreamErrorFor<Self>>743 fn uncons(&mut self) -> Result<T, StreamErrorFor<Self>> {
744 match self.split_first() {
745 Some((first, rest)) => {
746 *self = rest;
747 Ok(first.clone())
748 }
749 None => Err(UnexpectedParse::Eoi),
750 }
751 }
752 }
753
754 /// Stream type which indicates that the stream is partial if end of input is reached
755 #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
756 pub struct PartialStream<S>(pub S);
757
758 impl<S> From<S> for PartialStream<S> {
from(t: S) -> Self759 fn from(t: S) -> Self {
760 PartialStream(t)
761 }
762 }
763
764 impl<S> Positioned for PartialStream<S>
765 where
766 S: Positioned,
767 {
768 #[inline]
position(&self) -> Self::Position769 fn position(&self) -> Self::Position {
770 self.0.position()
771 }
772 }
773
774 impl<S> ResetStream for PartialStream<S>
775 where
776 S: ResetStream,
777 {
778 type Checkpoint = S::Checkpoint;
779
780 #[inline]
checkpoint(&self) -> Self::Checkpoint781 fn checkpoint(&self) -> Self::Checkpoint {
782 self.0.checkpoint()
783 }
784
785 #[inline]
reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error>786 fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
787 self.0.reset(checkpoint)
788 }
789 }
790
791 impl<S> StreamOnce for PartialStream<S>
792 where
793 S: StreamOnce,
794 {
795 type Token = S::Token;
796 type Range = S::Range;
797 type Position = S::Position;
798 type Error = S::Error;
799
800 #[inline]
uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>>801 fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
802 self.0.uncons()
803 }
804
is_partial(&self) -> bool805 fn is_partial(&self) -> bool {
806 true
807 }
808 }
809
810 impl<S> RangeStreamOnce for PartialStream<S>
811 where
812 S: RangeStreamOnce,
813 {
814 #[inline]
uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>815 fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
816 self.0.uncons_range(size)
817 }
818
819 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,820 fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
821 where
822 F: FnMut(Self::Token) -> bool,
823 {
824 self.0.uncons_while(f)
825 }
826
uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,827 fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
828 where
829 F: FnMut(Self::Token) -> bool,
830 {
831 self.0.uncons_while1(f)
832 }
833
834 #[inline]
distance(&self, end: &Self::Checkpoint) -> usize835 fn distance(&self, end: &Self::Checkpoint) -> usize {
836 self.0.distance(end)
837 }
838
839 #[inline]
range(&self) -> Self::Range840 fn range(&self) -> Self::Range {
841 self.0.range()
842 }
843 }
844
845 /// Stream type which indicates that the stream is complete if end of input is reached
846 ///
847 /// For most streams this is already the default but this wrapper can be used to override a nested
848 /// `PartialStream`
849 #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
850 #[repr(transparent)]
851 pub struct CompleteStream<S>(pub S);
852
853 impl<S> From<S> for CompleteStream<S> {
from(t: S) -> Self854 fn from(t: S) -> Self {
855 CompleteStream(t)
856 }
857 }
858
859 impl<'s, S> From<&'s mut S> for &'s mut CompleteStream<S> {
from(t: &'s mut S) -> Self860 fn from(t: &'s mut S) -> Self {
861 // SAFETY repr(transparent) is specified on CompleteStream
862 unsafe { &mut *(t as *mut S as *mut CompleteStream<S>) }
863 }
864 }
865
866 impl<S> Positioned for CompleteStream<S>
867 where
868 S: Positioned,
869 {
870 #[inline]
position(&self) -> Self::Position871 fn position(&self) -> Self::Position {
872 self.0.position()
873 }
874 }
875
876 impl<S> ResetStream for CompleteStream<S>
877 where
878 S: ResetStream,
879 {
880 type Checkpoint = S::Checkpoint;
881
882 #[inline]
checkpoint(&self) -> Self::Checkpoint883 fn checkpoint(&self) -> Self::Checkpoint {
884 self.0.checkpoint()
885 }
886
887 #[inline]
reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error>888 fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
889 self.0.reset(checkpoint)
890 }
891 }
892
893 impl<S> StreamOnce for CompleteStream<S>
894 where
895 S: StreamOnce,
896 {
897 type Token = S::Token;
898 type Range = S::Range;
899 type Position = S::Position;
900 type Error = S::Error;
901
902 #[inline]
uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>>903 fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
904 self.0.uncons()
905 }
906
is_partial(&self) -> bool907 fn is_partial(&self) -> bool {
908 false
909 }
910 }
911
912 impl<S> RangeStreamOnce for CompleteStream<S>
913 where
914 S: RangeStreamOnce,
915 {
916 #[inline]
uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>917 fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
918 self.0.uncons_range(size)
919 }
920
921 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,922 fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
923 where
924 F: FnMut(Self::Token) -> bool,
925 {
926 self.0.uncons_while(f)
927 }
928
uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,929 fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
930 where
931 F: FnMut(Self::Token) -> bool,
932 {
933 self.0.uncons_while1(f)
934 }
935
936 #[inline]
distance(&self, end: &Self::Checkpoint) -> usize937 fn distance(&self, end: &Self::Checkpoint) -> usize {
938 self.0.distance(end)
939 }
940
941 #[inline]
range(&self) -> Self::Range942 fn range(&self) -> Self::Range {
943 self.0.range()
944 }
945 }
946
947 #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
948 pub struct MaybePartialStream<S>(pub S, pub bool);
949
950 impl<S> Positioned for MaybePartialStream<S>
951 where
952 S: Positioned,
953 {
954 #[inline]
position(&self) -> Self::Position955 fn position(&self) -> Self::Position {
956 self.0.position()
957 }
958 }
959
960 impl<S> ResetStream for MaybePartialStream<S>
961 where
962 S: ResetStream,
963 {
964 type Checkpoint = S::Checkpoint;
965
966 #[inline]
checkpoint(&self) -> Self::Checkpoint967 fn checkpoint(&self) -> Self::Checkpoint {
968 self.0.checkpoint()
969 }
970
971 #[inline]
reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error>972 fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
973 self.0.reset(checkpoint)
974 }
975 }
976
977 impl<S> StreamOnce for MaybePartialStream<S>
978 where
979 S: StreamOnce,
980 {
981 type Token = S::Token;
982 type Range = S::Range;
983 type Position = S::Position;
984 type Error = S::Error;
985
986 #[inline]
uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>>987 fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
988 self.0.uncons()
989 }
990
is_partial(&self) -> bool991 fn is_partial(&self) -> bool {
992 self.1
993 }
994 }
995
996 impl<S> RangeStreamOnce for MaybePartialStream<S>
997 where
998 S: RangeStreamOnce,
999 {
1000 #[inline]
uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>1001 fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
1002 self.0.uncons_range(size)
1003 }
1004
1005 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,1006 fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
1007 where
1008 F: FnMut(Self::Token) -> bool,
1009 {
1010 self.0.uncons_while(f)
1011 }
1012
uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,1013 fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
1014 where
1015 F: FnMut(Self::Token) -> bool,
1016 {
1017 self.0.uncons_while1(f)
1018 }
1019
1020 #[inline]
distance(&self, end: &Self::Checkpoint) -> usize1021 fn distance(&self, end: &Self::Checkpoint) -> usize {
1022 self.0.distance(end)
1023 }
1024
1025 #[inline]
range(&self) -> Self::Range1026 fn range(&self) -> Self::Range {
1027 self.0.range()
1028 }
1029 }
1030
1031 /// Newtype for constructing a stream from a slice where the items in the slice are not copyable.
1032 #[derive(Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
1033 pub struct SliceStream<'a, T>(pub &'a [T]);
1034
1035 impl<'a, T> Clone for SliceStream<'a, T> {
clone(&self) -> SliceStream<'a, T>1036 fn clone(&self) -> SliceStream<'a, T> {
1037 SliceStream(self.0)
1038 }
1039 }
1040
1041 impl<'a, T> Positioned for SliceStream<'a, T>
1042 where
1043 T: PartialEq + 'a,
1044 {
1045 #[inline]
position(&self) -> Self::Position1046 fn position(&self) -> Self::Position {
1047 PointerOffset::new(self.0.as_ptr() as usize)
1048 }
1049 }
1050
1051 impl<'a, T> StreamOnce for SliceStream<'a, T>
1052 where
1053 T: PartialEq + 'a,
1054 {
1055 type Token = &'a T;
1056 type Range = &'a [T];
1057 type Position = PointerOffset<[T]>;
1058 type Error = UnexpectedParse;
1059
1060 #[inline]
uncons(&mut self) -> Result<&'a T, StreamErrorFor<Self>>1061 fn uncons(&mut self) -> Result<&'a T, StreamErrorFor<Self>> {
1062 match self.0.split_first() {
1063 Some((first, rest)) => {
1064 self.0 = rest;
1065 Ok(first)
1066 }
1067 None => Err(UnexpectedParse::Eoi),
1068 }
1069 }
1070 }
1071
slice_uncons_while_ref<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T] where F: FnMut(&'a T) -> bool,1072 fn slice_uncons_while_ref<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T]
1073 where
1074 F: FnMut(&'a T) -> bool,
1075 {
1076 let mut i = start as usize;
1077 let len = slice.len();
1078 // SAFETY: We only call this function with `One` if the slice has length >= 1
1079 debug_assert!(len >= i, "");
1080 let mut found = false;
1081
1082 macro_rules! check {
1083 () => {
1084 if !f(unsafe { slice.get_unchecked(i) }) {
1085 found = true;
1086 break;
1087 }
1088 i += 1;
1089 };
1090 }
1091
1092 // SAFETY: ensures we can access at least 8 elements starting at i, making get_unchecked sound.
1093 while len - i >= 8 {
1094 check!();
1095 check!();
1096 check!();
1097 check!();
1098 check!();
1099 check!();
1100 check!();
1101 check!();
1102 }
1103
1104 if !found {
1105 while let Some(c) = slice.get(i) {
1106 if !f(c) {
1107 break;
1108 }
1109 i += 1;
1110 }
1111 }
1112
1113 let (result, remaining) = slice.split_at(i);
1114 *slice = remaining;
1115 result
1116 }
1117
1118 impl<'a, T> RangeStreamOnce for SliceStream<'a, T>
1119 where
1120 T: PartialEq + 'a,
1121 {
1122 #[inline]
uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>>1123 fn uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>> {
1124 if size <= self.0.len() {
1125 let (range, rest) = self.0.split_at(size);
1126 self.0 = rest;
1127 Ok(range)
1128 } else {
1129 Err(UnexpectedParse::Eoi)
1130 }
1131 }
1132
1133 #[inline]
uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,1134 fn uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>>
1135 where
1136 F: FnMut(Self::Token) -> bool,
1137 {
1138 Ok(slice_uncons_while_ref(&mut self.0, UnconsStart::Zero, f))
1139 }
1140
1141 #[inline]
uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>> where F: FnMut(Self::Token) -> bool,1142 fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
1143 where
1144 F: FnMut(Self::Token) -> bool,
1145 {
1146 match self.0.first() {
1147 Some(c) => {
1148 if !f(c) {
1149 return PeekErr(Tracked::from(UnexpectedParse::Unexpected));
1150 }
1151 }
1152 None => return PeekErr(Tracked::from(UnexpectedParse::Eoi)),
1153 }
1154
1155 CommitOk(slice_uncons_while_ref(&mut self.0, UnconsStart::One, f))
1156 }
1157
1158 #[inline]
distance(&self, end: &Self) -> usize1159 fn distance(&self, end: &Self) -> usize {
1160 end.0.len() - self.0.len()
1161 }
1162
range(&self) -> Self::Range1163 fn range(&self) -> Self::Range {
1164 self.0
1165 }
1166 }
1167
1168 /// Wrapper around iterators which allows them to be treated as a stream.
1169 /// Returned by [`IteratorStream::new`].
1170 #[derive(Copy, Clone, Debug)]
1171 pub struct IteratorStream<Input>(Input);
1172
1173 impl<Input> IteratorStream<Input>
1174 where
1175 Input: Iterator,
1176 {
1177 /// Converts an `Iterator` into a stream.
1178 ///
1179 /// NOTE: This type do not implement `Positioned` and `Clone` and must be wrapped with types
1180 /// such as `BufferedStreamRef` and `State` to become a `Stream` which can be parsed
new<T>(iter: T) -> IteratorStream<Input> where T: IntoIterator<IntoIter = Input, Item = Input::Item>,1181 pub fn new<T>(iter: T) -> IteratorStream<Input>
1182 where
1183 T: IntoIterator<IntoIter = Input, Item = Input::Item>,
1184 {
1185 IteratorStream(iter.into_iter())
1186 }
1187 }
1188
1189 impl<Input> Iterator for IteratorStream<Input>
1190 where
1191 Input: Iterator,
1192 {
1193 type Item = Input::Item;
next(&mut self) -> Option<Input::Item>1194 fn next(&mut self) -> Option<Input::Item> {
1195 self.0.next()
1196 }
1197 }
1198
1199 impl<Input: Iterator> StreamOnce for IteratorStream<Input>
1200 where
1201 Input::Item: Clone + PartialEq,
1202 {
1203 type Token = Input::Item;
1204 type Range = Input::Item;
1205 type Position = ();
1206 type Error = UnexpectedParse;
1207
1208 #[inline]
uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>>1209 fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>> {
1210 match self.next() {
1211 Some(x) => Ok(x),
1212 None => Err(UnexpectedParse::Eoi),
1213 }
1214 }
1215 }
1216
1217 /// Newtype around a pointer offset into a slice stream (`&[T]`/`&str`).
1218 pub struct PointerOffset<T: ?Sized>(pub usize, PhantomData<T>);
1219
1220 impl<T: ?Sized> Clone for PointerOffset<T> {
clone(&self) -> Self1221 fn clone(&self) -> Self {
1222 PointerOffset::new(self.0)
1223 }
1224 }
1225
1226 impl<T: ?Sized> Copy for PointerOffset<T> {}
1227
1228 impl<T: ?Sized> Default for PointerOffset<T> {
default() -> Self1229 fn default() -> Self {
1230 PointerOffset::new(0)
1231 }
1232 }
1233
1234 impl<T: ?Sized> PartialEq for PointerOffset<T> {
eq(&self, other: &Self) -> bool1235 fn eq(&self, other: &Self) -> bool {
1236 self.0 == other.0
1237 }
1238 }
1239
1240 impl<T: ?Sized> Eq for PointerOffset<T> {}
1241
1242 impl<T: ?Sized> PartialOrd for PointerOffset<T> {
partial_cmp(&self, other: &Self) -> Option<Ordering>1243 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1244 self.0.partial_cmp(&other.0)
1245 }
1246 }
1247
1248 impl<T: ?Sized> Ord for PointerOffset<T> {
cmp(&self, other: &Self) -> Ordering1249 fn cmp(&self, other: &Self) -> Ordering {
1250 self.0.cmp(&other.0)
1251 }
1252 }
1253
1254 impl<T> fmt::Debug for PointerOffset<T>
1255 where
1256 T: ?Sized,
1257 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1258 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1259 write!(f, "{}", self)
1260 }
1261 }
1262
1263 impl<T> fmt::Display for PointerOffset<T>
1264 where
1265 T: ?Sized,
1266 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1268 write!(f, "PointerOffset({:?})", self.0 as *const ())
1269 }
1270 }
1271
1272 impl<T> PointerOffset<T>
1273 where
1274 T: ?Sized,
1275 {
new(offset: usize) -> Self1276 pub fn new(offset: usize) -> Self {
1277 PointerOffset(offset, PhantomData)
1278 }
1279
1280 /// Converts the pointer-based position into an indexed position.
1281 ///
1282 /// ```rust
1283 /// # extern crate combine;
1284 /// # use combine::*;
1285 /// # fn main() {
1286 /// let text = "b";
1287 /// let err = token('a').easy_parse(text).unwrap_err();
1288 /// assert_eq!(err.position.0, text.as_ptr() as usize);
1289 /// assert_eq!(err.map_position(|p| p.translate_position(text)).position, 0);
1290 /// # }
1291 /// ```
translate_position(mut self, initial_slice: &T) -> usize1292 pub fn translate_position(mut self, initial_slice: &T) -> usize {
1293 self.0 -= initial_slice as *const T as *const () as usize;
1294 self.0
1295 }
1296 }
1297
1298 /// Decodes `input` using `parser`.
1299 ///
1300 /// Return `Ok(Some(token), committed_data)` if there was enough data to finish parsing using
1301 /// `parser`.
1302 /// Returns `Ok(None, committed_data)` if `input` did not contain enough data to finish parsing
1303 /// using `parser`.
1304 ///
1305 /// See `examples/async.rs` for example usage in a `tokio_io::codec::Decoder`
decode<Input, P>( mut parser: P, input: &mut Input, partial_state: &mut P::PartialState, ) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error> where P: Parser<Input>, Input: RangeStream,1306 pub fn decode<Input, P>(
1307 mut parser: P,
1308 input: &mut Input,
1309 partial_state: &mut P::PartialState,
1310 ) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error>
1311 where
1312 P: Parser<Input>,
1313 Input: RangeStream,
1314 {
1315 let start = input.checkpoint();
1316 match parser.parse_with_state(input, partial_state) {
1317 Ok(message) => Ok((Some(message), input.distance(&start))),
1318 Err(err) => {
1319 if err.is_unexpected_end_of_input() {
1320 if input.is_partial() {
1321 // The parser expected more input to parse and input is partial, return `None`
1322 // as we did not finish and also return how much may be removed from the stream
1323 Ok((None, input.distance(&start)))
1324 } else {
1325 Err(err)
1326 }
1327 } else {
1328 Err(err)
1329 }
1330 }
1331 }
1332 }
1333
1334 /// Decodes `input` using `parser`. Like `decode` but works directly in both
1335 /// `tokio_util::Decoder::decode` and `tokio_util::Decoder::decode_eof`
1336 ///
1337 /// Return `Ok(Some(token), committed_data)` if there was enough data to finish parsing using
1338 /// `parser`.
1339 /// Returns `Ok(None, committed_data)` if `input` did not contain enough data to finish parsing
1340 /// using `parser`.
1341 /// Returns `Ok(None, 0)` if `input` did not contain enough data to finish parsing
1342 /// using `parser`.
1343 ///
1344 /// See `examples/async.rs` for example usage in a `tokio_io::codec::Decoder`
decode_tokio<Input, P>( mut parser: P, input: &mut Input, partial_state: &mut P::PartialState, ) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error> where P: Parser<Input>, Input: RangeStream,1345 pub fn decode_tokio<Input, P>(
1346 mut parser: P,
1347 input: &mut Input,
1348 partial_state: &mut P::PartialState,
1349 ) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error>
1350 where
1351 P: Parser<Input>,
1352 Input: RangeStream,
1353 {
1354 let start = input.checkpoint();
1355 match parser.parse_with_state(input, partial_state) {
1356 Ok(message) => Ok((Some(message), input.distance(&start))),
1357 Err(err) => {
1358 if err.is_unexpected_end_of_input() {
1359 if input.is_partial() {
1360 // The parser expected more input to parse and input is partial, return `None`
1361 // as we did not finish and also return how much may be removed from the stream
1362 Ok((None, input.distance(&start)))
1363 } else if input_at_eof(input) && input.distance(&start) == 0 {
1364 // We are at eof and the input is empty, return None to indicate that we are
1365 // done
1366 Ok((None, 0))
1367 } else {
1368 Err(err)
1369 }
1370 } else {
1371 Err(err)
1372 }
1373 }
1374 }
1375 }
1376
1377 /// Parses an instance of `std::io::Read` as a `&[u8]` without reading the entire file into
1378 /// memory.
1379 ///
1380 /// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1381 /// example for how to pass a parser to the macro (constructing parts of the parser outside of
1382 /// the `decode!` call is unlikely to work.
1383 ///
1384 /// ```
1385 /// use std::{
1386 /// fs::File,
1387 /// };
1388 /// use combine::{decode, satisfy, skip_many1, many1, sep_end_by, Parser, stream::Decoder};
1389 ///
1390 /// let mut read = File::open("README.md").unwrap();
1391 /// let mut decoder = Decoder::new();
1392 /// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1393 /// assert_eq!(
1394 /// decode!(
1395 /// decoder,
1396 /// read,
1397 /// {
1398 /// let word = many1(satisfy(|b| !is_whitespace(b)));
1399 /// sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1400 /// },
1401 /// |input, _position| combine::easy::Stream::from(input),
1402 /// ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1403 /// Ok(773),
1404 /// );
1405 /// ```
1406 #[cfg(feature = "std")]
1407 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
1408 #[macro_export]
1409 macro_rules! decode {
1410 ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1411 $crate::decode!($decoder, $read, $parser, |input, _position| input, |x| x)
1412 };
1413
1414 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1415 $crate::decode!($decoder, $read, $parser, $input_stream, |x| x)
1416 };
1417
1418 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1419 match $decoder {
1420 ref mut decoder => match $read {
1421 ref mut read => 'outer: loop {
1422 let (opt, removed) = {
1423 let (state, position, buffer, end_of_input) = decoder.__inner();
1424 let buffer =
1425 $crate::stream::buf_reader::CombineBuffer::buffer(buffer, read);
1426
1427 let mut stream = $crate::stream::call_with2(
1428 $crate::stream::MaybePartialStream(buffer, !end_of_input),
1429 *position,
1430 $input_stream,
1431 );
1432 let result = $crate::stream::decode($parser, &mut stream, state);
1433 *position = $crate::stream::Positioned::position(&stream);
1434 $crate::stream::call_with(stream, $post_decode);
1435 match result {
1436 Ok(x) => x,
1437 Err(err) => {
1438 break 'outer Err($crate::stream::decoder::Error::Parse(err))
1439 }
1440 }
1441 };
1442
1443 decoder.advance(&mut *read, removed);
1444
1445 if let Some(v) = opt {
1446 break 'outer Ok(v);
1447 }
1448
1449 match decoder.__before_parse(&mut *read) {
1450 Ok(x) => x,
1451 Err(error) => {
1452 break 'outer Err($crate::stream::decoder::Error::Io {
1453 error,
1454 position: Clone::clone(decoder.position()),
1455 })
1456 }
1457 };
1458 },
1459 },
1460 }
1461 };
1462 }
1463
1464 /// Parses an instance of `futures::io::AsyncRead` as a `&[u8]` without reading the entire file into
1465 /// memory.
1466 ///
1467 /// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1468 /// example for how to pass a parser to the macro (constructing parts of the parser outside of
1469 /// the `decode!` call is unlikely to work.
1470 ///
1471 /// ```
1472 /// # use futures_03_dep as futures;
1473 /// use futures::pin_mut;
1474 /// use async_std::{
1475 /// fs::File,
1476 /// task,
1477 /// };
1478 ///
1479 /// use combine::{decode_futures_03, satisfy, skip_many1, many1, sep_end_by, Parser, stream::Decoder};
1480 ///
1481 /// fn main() {
1482 /// task::block_on(main_());
1483 /// }
1484 ///
1485 /// async fn main_() {
1486 /// let mut read = File::open("README.md").await.unwrap();
1487 /// let mut decoder = Decoder::new();
1488 /// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1489 /// assert_eq!(
1490 /// decode_futures_03!(
1491 /// decoder,
1492 /// read,
1493 /// {
1494 /// let word = many1(satisfy(|b| !is_whitespace(b)));
1495 /// sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1496 /// },
1497 /// |input, _position| combine::easy::Stream::from(input),
1498 /// ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1499 /// Ok(773),
1500 /// );
1501 /// }
1502 /// ```
1503 #[cfg(feature = "futures-io-03")]
1504 #[cfg_attr(docsrs, doc(cfg(feature = "futures-io-03")))]
1505 #[macro_export]
1506 macro_rules! decode_futures_03 {
1507 ($decoder: expr, $read: expr, $parser: expr) => {
1508 $crate::decode_futures_03!($decoder, $read, $parser, |x| x $(,)?)
1509 };
1510
1511
1512 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1513 $crate::decode_futures_03!($decoder, $read, $parser, $input_stream, |x| x)
1514 };
1515
1516 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1517 match $decoder {
1518 ref mut decoder => match $read {
1519 ref mut read => 'outer: loop {
1520 let (opt, removed) = {
1521 let (state, position, buffer, end_of_input) = decoder.__inner();
1522 let buffer =
1523 $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1524
1525 let mut stream = $crate::stream::call_with2(
1526 $crate::stream::MaybePartialStream(buffer, !end_of_input),
1527 *position,
1528 $input_stream,
1529 );
1530 let result = $crate::stream::decode($parser, &mut stream, state);
1531 *position = $crate::stream::Positioned::position(&stream);
1532 $crate::stream::call_with(stream, $post_decode);
1533 match result {
1534 Ok(x) => x,
1535 Err(err) => break 'outer Err($crate::stream::decoder::Error::Parse(err)),
1536 }
1537 };
1538
1539 decoder.advance_pin(std::pin::Pin::new(&mut *read), removed);
1540
1541 if let Some(v) = opt {
1542 break 'outer Ok(v);
1543 }
1544
1545
1546 match decoder.__before_parse_async(std::pin::Pin::new(&mut *read)).await {
1547 Ok(_) => (),
1548 Err(error) => {
1549 break 'outer Err($crate::stream::decoder::Error::Io {
1550 error,
1551 position: Clone::clone(decoder.position()),
1552 })
1553 }
1554 };
1555 }
1556 }
1557 }
1558 };
1559 }
1560
1561 /// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
1562 /// memory.
1563 ///
1564 /// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1565 /// example for how to pass a parser to the macro (constructing parts of the parser outside of
1566 /// the `decode!` call is unlikely to work.
1567 ///
1568 /// ```
1569 /// # use tokio_02_dep as tokio;
1570 /// # use futures_03_dep as futures;
1571 /// use futures::pin_mut;
1572 /// use tokio::{
1573 /// fs::File,
1574 /// };
1575 ///
1576 /// use combine::{decode_tokio_02, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
1577 ///
1578 /// #[tokio::main]
1579 /// async fn main() {
1580 /// let mut read = BufReader::new(File::open("README.md").await.unwrap());
1581 /// let mut decoder = Decoder::new_bufferless();
1582 /// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1583 /// assert_eq!(
1584 /// decode_tokio_02!(
1585 /// decoder,
1586 /// read,
1587 /// {
1588 /// let word = many1(satisfy(|b| !is_whitespace(b)));
1589 /// sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1590 /// },
1591 /// |input, _position| combine::easy::Stream::from(input),
1592 /// ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1593 /// Ok(773),
1594 /// );
1595 /// }
1596 /// ```
1597 #[cfg(feature = "tokio-02")]
1598 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-02")))]
1599 #[macro_export]
1600 macro_rules! decode_tokio_02 {
1601 ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1602 $crate::decode_tokio_02!($decoder, $read, $parser, |input, _position| input)
1603 };
1604
1605 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1606 $crate::decode_tokio_02!($decoder, $read, $parser, $input_stream, |x| x)
1607 };
1608
1609 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1610 match $decoder {
1611 ref mut decoder => match $read {
1612 ref mut read => 'outer: loop {
1613 let (opt, removed) = {
1614 let (state, position, buffer, end_of_input) = decoder.__inner();
1615 let buffer =
1616 $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1617 let mut stream = $crate::stream::call_with2(
1618 $crate::stream::MaybePartialStream(buffer, !end_of_input),
1619 *position,
1620 $input_stream,
1621 );
1622 let result = $crate::stream::decode($parser, &mut stream, state);
1623 *position = $crate::stream::Positioned::position(&stream);
1624 $crate::stream::call_with(stream, $post_decode);
1625 match result {
1626 Ok(x) => x,
1627 Err(err) => {
1628 break 'outer Err($crate::stream::decoder::Error::Parse(err))
1629 }
1630 }
1631 };
1632
1633 decoder.advance_pin(std::pin::Pin::new(read), removed);
1634
1635 if let Some(v) = opt {
1636 break 'outer Ok(v);
1637 }
1638
1639 match decoder
1640 .__before_parse_tokio_02(std::pin::Pin::new(&mut *read))
1641 .await
1642 {
1643 Ok(x) => x,
1644 Err(error) => {
1645 break 'outer Err($crate::stream::decoder::Error::Io {
1646 error,
1647 position: Clone::clone(decoder.position()),
1648 })
1649 }
1650 };
1651 },
1652 },
1653 }
1654 };
1655 }
1656
1657 /// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
1658 /// memory.
1659 ///
1660 /// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1661 /// example for how to pass a parser to the macro (constructing parts of the parser outside of
1662 /// the `decode!` call is unlikely to work.
1663 ///
1664 /// ```
1665 /// # use tokio_03_dep as tokio;
1666 /// # use futures_03_dep as futures;
1667 /// use futures::pin_mut;
1668 /// use tokio::{
1669 /// fs::File,
1670 /// };
1671 ///
1672 /// use combine::{decode_tokio_03, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
1673 ///
1674 /// #[tokio::main]
1675 /// async fn main() {
1676 /// let mut read = BufReader::new(File::open("README.md").await.unwrap());
1677 /// let mut decoder = Decoder::new_bufferless();
1678 /// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1679 /// assert_eq!(
1680 /// decode_tokio_03!(
1681 /// decoder,
1682 /// read,
1683 /// {
1684 /// let word = many1(satisfy(|b| !is_whitespace(b)));
1685 /// sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1686 /// },
1687 /// |input, _position| combine::easy::Stream::from(input),
1688 /// ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1689 /// Ok(773),
1690 /// );
1691 /// }
1692 /// ```
1693 #[cfg(feature = "tokio-03")]
1694 #[cfg_attr(docsrs, doc(cfg(feature = "tokio-03")))]
1695 #[macro_export]
1696 macro_rules! decode_tokio_03 {
1697 ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1698 $crate::decode_tokio_03!($decoder, $read, $parser, |input, _position| input)
1699 };
1700
1701 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1702 $crate::decode_tokio_03!($decoder, $read, $parser, $input_stream, |x| x)
1703 };
1704
1705 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1706 match $decoder {
1707 ref mut decoder => match $read {
1708 ref mut read => 'outer: loop {
1709 let (opt, removed) = {
1710 let (state, position, buffer, end_of_input) = decoder.__inner();
1711 let buffer =
1712 $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1713 let mut stream = $crate::stream::call_with2(
1714 $crate::stream::MaybePartialStream(buffer, !end_of_input),
1715 *position,
1716 $input_stream,
1717 );
1718 let result = $crate::stream::decode($parser, &mut stream, state);
1719 *position = $crate::stream::Positioned::position(&stream);
1720 $crate::stream::call_with(stream, $post_decode);
1721 match result {
1722 Ok(x) => x,
1723 Err(err) => {
1724 break 'outer Err($crate::stream::decoder::Error::Parse(err))
1725 }
1726 }
1727 };
1728
1729 decoder.advance_pin(std::pin::Pin::new(read), removed);
1730
1731 if let Some(v) = opt {
1732 break 'outer Ok(v);
1733 }
1734
1735 match decoder
1736 .__before_parse_tokio_03(std::pin::Pin::new(&mut *read))
1737 .await
1738 {
1739 Ok(x) => x,
1740 Err(error) => {
1741 break 'outer Err($crate::stream::decoder::Error::Io {
1742 error,
1743 position: Clone::clone(decoder.position()),
1744 })
1745 }
1746 };
1747 },
1748 },
1749 }
1750 };
1751 }
1752
1753 /// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
1754 /// memory.
1755 ///
1756 /// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1757 /// example for how to pass a parser to the macro (constructing parts of the parser outside of
1758 /// the `decode!` call is unlikely to work.
1759 ///
1760 /// ```
1761 /// # use tokio_dep as tokio;
1762 /// # use futures_03_dep as futures;
1763 /// use futures::pin_mut;
1764 /// use tokio::{
1765 /// fs::File,
1766 /// };
1767 ///
1768 /// use combine::{decode_tokio, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
1769 ///
1770 /// #[tokio::main]
1771 /// async fn main() {
1772 /// let mut read = BufReader::new(File::open("README.md").await.unwrap());
1773 /// let mut decoder = Decoder::new_bufferless();
1774 /// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1775 /// assert_eq!(
1776 /// decode_tokio!(
1777 /// decoder,
1778 /// read,
1779 /// {
1780 /// let word = many1(satisfy(|b| !is_whitespace(b)));
1781 /// sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1782 /// },
1783 /// |input, _position| combine::easy::Stream::from(input),
1784 /// ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1785 /// Ok(773),
1786 /// );
1787 /// }
1788 /// ```
1789 #[cfg(feature = "tokio")]
1790 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
1791 #[macro_export]
1792 macro_rules! decode_tokio {
1793 ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1794 $crate::decode_tokio!($decoder, $read, $parser, |input, _position| input)
1795 };
1796
1797 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1798 $crate::decode_tokio!($decoder, $read, $parser, $input_stream, |x| x)
1799 };
1800
1801 ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1802 match $decoder {
1803 ref mut decoder => match $read {
1804 ref mut read => 'outer: loop {
1805 let (opt, removed) = {
1806 let (state, position, buffer, end_of_input) = decoder.__inner();
1807 let buffer =
1808 $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1809 let mut stream = $crate::stream::call_with2(
1810 $crate::stream::MaybePartialStream(buffer, !end_of_input),
1811 *position,
1812 $input_stream,
1813 );
1814 let result = $crate::stream::decode($parser, &mut stream, state);
1815 *position = $crate::stream::Positioned::position(&stream);
1816 $crate::stream::call_with(stream, $post_decode);
1817 match result {
1818 Ok(x) => x,
1819 Err(err) => {
1820 break 'outer Err($crate::stream::decoder::Error::Parse(err))
1821 }
1822 }
1823 };
1824
1825 decoder.advance_pin(std::pin::Pin::new(read), removed);
1826
1827 if let Some(v) = opt {
1828 break 'outer Ok(v);
1829 }
1830
1831 match decoder
1832 .__before_parse_tokio(std::pin::Pin::new(&mut *read))
1833 .await
1834 {
1835 Ok(x) => x,
1836 Err(error) => {
1837 break 'outer Err($crate::stream::decoder::Error::Io {
1838 error,
1839 position: Clone::clone(decoder.position()),
1840 })
1841 }
1842 };
1843 },
1844 },
1845 }
1846 };
1847 }
1848
1849 #[doc(hidden)]
call_with2<F, A, B, R>(a: A, b: B, f: F) -> R where F: FnOnce(A, B) -> R,1850 pub fn call_with2<F, A, B, R>(a: A, b: B, f: F) -> R
1851 where
1852 F: FnOnce(A, B) -> R,
1853 {
1854 f(a, b)
1855 }
1856
1857 #[doc(hidden)]
call_with<F, A, R>(a: A, f: F) -> R where F: FnOnce(A) -> R,1858 pub fn call_with<F, A, R>(a: A, f: F) -> R
1859 where
1860 F: FnOnce(A) -> R,
1861 {
1862 f(a)
1863 }
1864
1865 #[cfg(test)]
1866 mod tests {
1867
1868 use super::*;
1869
1870 #[test]
1871 #[inline]
uncons_range_at_end()1872 fn uncons_range_at_end() {
1873 assert_eq!("".uncons_range(0), Ok(""));
1874 assert_eq!("123".uncons_range(3), Ok("123"));
1875 assert_eq!((&[1][..]).uncons_range(1), Ok(&[1][..]));
1876 let s: &[u8] = &[];
1877 assert_eq!(SliceStream(s).uncons_range(0), Ok(&[][..]));
1878 }
1879
1880 #[test]
larger_than_1_byte_items_return_correct_distance()1881 fn larger_than_1_byte_items_return_correct_distance() {
1882 let mut input = &[123i32, 0i32][..];
1883
1884 let before = input.checkpoint();
1885 assert_eq!(input.distance(&before), 0);
1886
1887 input.uncons().unwrap();
1888 assert_eq!(input.distance(&before), 1);
1889
1890 input.uncons().unwrap();
1891 assert_eq!(input.distance(&before), 2);
1892
1893 input.reset(before).unwrap();
1894 assert_eq!(input.distance(&before), 0);
1895 }
1896 }
1897