1 use core::pin::Pin; 2 use futures_core::stream::{FusedStream, Stream}; 3 use futures_core::task::{Context, Poll}; 4 use futures_sink::Sink; 5 use pin_project_lite::pin_project; 6 7 pin_project! { 8 /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method. 9 #[derive(Debug, Clone)] 10 #[must_use = "sinks do nothing unless polled"] 11 pub struct SinkMapErr<Si, F> { 12 #[pin] 13 sink: Si, 14 f: Option<F>, 15 } 16 } 17 18 impl<Si, F> SinkMapErr<Si, F> { new(sink: Si, f: F) -> Self19 pub(super) fn new(sink: Si, f: F) -> Self { 20 Self { sink, f: Some(f) } 21 } 22 23 delegate_access_inner!(sink, Si, ()); 24 take_f(self: Pin<&mut Self>) -> F25 fn take_f(self: Pin<&mut Self>) -> F { 26 self.project().f.take().expect("polled MapErr after completion") 27 } 28 } 29 30 impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F> 31 where 32 Si: Sink<Item>, 33 F: FnOnce(Si::Error) -> E, 34 { 35 type Error = E; 36 poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>37 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 38 self.as_mut().project().sink.poll_ready(cx).map_err(|e| self.as_mut().take_f()(e)) 39 } 40 start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>41 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { 42 self.as_mut().project().sink.start_send(item).map_err(|e| self.as_mut().take_f()(e)) 43 } 44 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>45 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 46 self.as_mut().project().sink.poll_flush(cx).map_err(|e| self.as_mut().take_f()(e)) 47 } 48 poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>49 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 50 self.as_mut().project().sink.poll_close(cx).map_err(|e| self.as_mut().take_f()(e)) 51 } 52 } 53 54 // Forwarding impl of Stream from the underlying sink 55 impl<S: Stream, F> Stream for SinkMapErr<S, F> { 56 type Item = S::Item; 57 58 delegate_stream!(sink); 59 } 60 61 impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> { is_terminated(&self) -> bool62 fn is_terminated(&self) -> bool { 63 self.sink.is_terminated() 64 } 65 } 66