1 use crate::Stream; 2 use pin_project_lite::pin_project; 3 use std::io; 4 use std::pin::Pin; 5 use std::task::{Context, Poll}; 6 use tokio::io::{AsyncBufRead, Lines}; 7 8 pin_project! { 9 /// A wrapper around [`tokio::io::Lines`] that implements [`Stream`]. 10 /// 11 /// [`tokio::io::Lines`]: struct@tokio::io::Lines 12 /// [`Stream`]: trait@crate::Stream 13 #[derive(Debug)] 14 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] 15 pub struct LinesStream<R> { 16 #[pin] 17 inner: Lines<R>, 18 } 19 } 20 21 impl<R> LinesStream<R> { 22 /// Create a new `LinesStream`. new(lines: Lines<R>) -> Self23 pub fn new(lines: Lines<R>) -> Self { 24 Self { inner: lines } 25 } 26 27 /// Get back the inner `Lines`. into_inner(self) -> Lines<R>28 pub fn into_inner(self) -> Lines<R> { 29 self.inner 30 } 31 32 /// Obtain a pinned reference to the inner `Lines<R>`. as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Lines<R>>33 pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Lines<R>> { 34 self.project().inner 35 } 36 } 37 38 impl<R: AsyncBufRead> Stream for LinesStream<R> { 39 type Item = io::Result<String>; 40 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>41 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 42 self.project() 43 .inner 44 .poll_next_line(cx) 45 .map(Result::transpose) 46 } 47 } 48 49 impl<R> AsRef<Lines<R>> for LinesStream<R> { as_ref(&self) -> &Lines<R>50 fn as_ref(&self) -> &Lines<R> { 51 &self.inner 52 } 53 } 54 55 impl<R> AsMut<Lines<R>> for LinesStream<R> { as_mut(&mut self) -> &mut Lines<R>56 fn as_mut(&mut self) -> &mut Lines<R> { 57 &mut self.inner 58 } 59 } 60