1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::ready; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 #[cfg(feature = "sink")] 7 use futures_sink::Sink; 8 use pin_project_lite::pin_project; 9 10 use crate::fns::FnMut1; 11 12 pin_project! { 13 /// Stream for the [`map`](super::StreamExt::map) method. 14 #[must_use = "streams do nothing unless polled"] 15 pub struct Map<St, F> { 16 #[pin] 17 stream: St, 18 f: F, 19 } 20 } 21 22 impl<St, F> fmt::Debug for Map<St, F> 23 where 24 St: fmt::Debug, 25 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 27 f.debug_struct("Map").field("stream", &self.stream).finish() 28 } 29 } 30 31 impl<St, F> Map<St, F> { new(stream: St, f: F) -> Self32 pub(crate) fn new(stream: St, f: F) -> Self { 33 Self { stream, f } 34 } 35 36 delegate_access_inner!(stream, St, ()); 37 } 38 39 impl<St, F> FusedStream for Map<St, F> 40 where 41 St: FusedStream, 42 F: FnMut1<St::Item>, 43 { is_terminated(&self) -> bool44 fn is_terminated(&self) -> bool { 45 self.stream.is_terminated() 46 } 47 } 48 49 impl<St, F> Stream for Map<St, F> 50 where 51 St: Stream, 52 F: FnMut1<St::Item>, 53 { 54 type Item = F::Output; 55 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>56 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 57 let mut this = self.project(); 58 let res = ready!(this.stream.as_mut().poll_next(cx)); 59 Poll::Ready(res.map(|x| this.f.call_mut(x))) 60 } 61 size_hint(&self) -> (usize, Option<usize>)62 fn size_hint(&self) -> (usize, Option<usize>) { 63 self.stream.size_hint() 64 } 65 } 66 67 // Forwarding impl of Sink from the underlying stream 68 #[cfg(feature = "sink")] 69 impl<St, F, Item> Sink<Item> for Map<St, F> 70 where 71 St: Stream + Sink<Item>, 72 F: FnMut1<St::Item>, 73 { 74 type Error = St::Error; 75 76 delegate_sink!(stream, Item); 77 } 78