1 use crate::io::AsyncBufRead;
2 use crate::util::memchr;
3 
4 use pin_project_lite::pin_project;
5 use std::future::Future;
6 use std::io;
7 use std::marker::PhantomPinned;
8 use std::mem;
9 use std::pin::Pin;
10 use std::task::{ready, Context, Poll};
11 
12 pin_project! {
13     /// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method.
14     /// The delimiter is included in the resulting vector.
15     #[derive(Debug)]
16     #[must_use = "futures do nothing unless you `.await` or poll them"]
17     pub struct ReadUntil<'a, R: ?Sized> {
18         reader: &'a mut R,
19         delimiter: u8,
20         buf: &'a mut Vec<u8>,
21         // The number of bytes appended to buf. This can be less than buf.len() if
22         // the buffer was not empty when the operation was started.
23         read: usize,
24         // Make this future `!Unpin` for compatibility with async trait methods.
25         #[pin]
26         _pin: PhantomPinned,
27     }
28 }
29 
read_until<'a, R>( reader: &'a mut R, delimiter: u8, buf: &'a mut Vec<u8>, ) -> ReadUntil<'a, R> where R: AsyncBufRead + ?Sized + Unpin,30 pub(crate) fn read_until<'a, R>(
31     reader: &'a mut R,
32     delimiter: u8,
33     buf: &'a mut Vec<u8>,
34 ) -> ReadUntil<'a, R>
35 where
36     R: AsyncBufRead + ?Sized + Unpin,
37 {
38     ReadUntil {
39         reader,
40         delimiter,
41         buf,
42         read: 0,
43         _pin: PhantomPinned,
44     }
45 }
46 
read_until_internal<R: AsyncBufRead + ?Sized>( mut reader: Pin<&mut R>, cx: &mut Context<'_>, delimiter: u8, buf: &mut Vec<u8>, read: &mut usize, ) -> Poll<io::Result<usize>>47 pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
48     mut reader: Pin<&mut R>,
49     cx: &mut Context<'_>,
50     delimiter: u8,
51     buf: &mut Vec<u8>,
52     read: &mut usize,
53 ) -> Poll<io::Result<usize>> {
54     loop {
55         let (done, used) = {
56             let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
57             if let Some(i) = memchr::memchr(delimiter, available) {
58                 buf.extend_from_slice(&available[..=i]);
59                 (true, i + 1)
60             } else {
61                 buf.extend_from_slice(available);
62                 (false, available.len())
63             }
64         };
65         reader.as_mut().consume(used);
66         *read += used;
67         if done || used == 0 {
68             return Poll::Ready(Ok(mem::replace(read, 0)));
69         }
70     }
71 }
72 
73 impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
74     type Output = io::Result<usize>;
75 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>76     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
77         let me = self.project();
78         read_until_internal(Pin::new(*me.reader), cx, *me.delimiter, me.buf, me.read)
79     }
80 }
81