1 use crate::io::AsyncBufRead;
2 use crate::util::memchr;
3
4 use pin_project_lite::pin_project;
5 use std::future::Future;
6 use std::io;
7 use std::marker::PhantomPinned;
8 use std::mem;
9 use std::pin::Pin;
10 use std::task::{ready, Context, Poll};
11
12 pin_project! {
13 /// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method.
14 /// The delimiter is included in the resulting vector.
15 #[derive(Debug)]
16 #[must_use = "futures do nothing unless you `.await` or poll them"]
17 pub struct ReadUntil<'a, R: ?Sized> {
18 reader: &'a mut R,
19 delimiter: u8,
20 buf: &'a mut Vec<u8>,
21 // The number of bytes appended to buf. This can be less than buf.len() if
22 // the buffer was not empty when the operation was started.
23 read: usize,
24 // Make this future `!Unpin` for compatibility with async trait methods.
25 #[pin]
26 _pin: PhantomPinned,
27 }
28 }
29
read_until<'a, R>( reader: &'a mut R, delimiter: u8, buf: &'a mut Vec<u8>, ) -> ReadUntil<'a, R> where R: AsyncBufRead + ?Sized + Unpin,30 pub(crate) fn read_until<'a, R>(
31 reader: &'a mut R,
32 delimiter: u8,
33 buf: &'a mut Vec<u8>,
34 ) -> ReadUntil<'a, R>
35 where
36 R: AsyncBufRead + ?Sized + Unpin,
37 {
38 ReadUntil {
39 reader,
40 delimiter,
41 buf,
42 read: 0,
43 _pin: PhantomPinned,
44 }
45 }
46
read_until_internal<R: AsyncBufRead + ?Sized>( mut reader: Pin<&mut R>, cx: &mut Context<'_>, delimiter: u8, buf: &mut Vec<u8>, read: &mut usize, ) -> Poll<io::Result<usize>>47 pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
48 mut reader: Pin<&mut R>,
49 cx: &mut Context<'_>,
50 delimiter: u8,
51 buf: &mut Vec<u8>,
52 read: &mut usize,
53 ) -> Poll<io::Result<usize>> {
54 loop {
55 let (done, used) = {
56 let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
57 if let Some(i) = memchr::memchr(delimiter, available) {
58 buf.extend_from_slice(&available[..=i]);
59 (true, i + 1)
60 } else {
61 buf.extend_from_slice(available);
62 (false, available.len())
63 }
64 };
65 reader.as_mut().consume(used);
66 *read += used;
67 if done || used == 0 {
68 return Poll::Ready(Ok(mem::replace(read, 0)));
69 }
70 }
71 }
72
73 impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
74 type Output = io::Result<usize>;
75
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>76 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
77 let me = self.project();
78 read_until_internal(Pin::new(*me.reader), cx, *me.delimiter, me.buf, me.read)
79 }
80 }
81