1 use core::pin::Pin;
2 use futures_core::stream::{FusedStream, Stream};
3 use futures_core::task::{Context, Poll};
4 use futures_sink::Sink;
5 use pin_project_lite::pin_project;
6 
7 pin_project! {
8     /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method.
9     #[derive(Debug, Clone)]
10     #[must_use = "sinks do nothing unless polled"]
11     pub struct SinkMapErr<Si, F> {
12         #[pin]
13         sink: Si,
14         f: Option<F>,
15     }
16 }
17 
18 impl<Si, F> SinkMapErr<Si, F> {
new(sink: Si, f: F) -> Self19     pub(super) fn new(sink: Si, f: F) -> Self {
20         Self { sink, f: Some(f) }
21     }
22 
23     delegate_access_inner!(sink, Si, ());
24 
take_f(self: Pin<&mut Self>) -> F25     fn take_f(self: Pin<&mut Self>) -> F {
26         self.project().f.take().expect("polled MapErr after completion")
27     }
28 }
29 
30 impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F>
31 where
32     Si: Sink<Item>,
33     F: FnOnce(Si::Error) -> E,
34 {
35     type Error = E;
36 
poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>37     fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38         self.as_mut().project().sink.poll_ready(cx).map_err(|e| self.as_mut().take_f()(e))
39     }
40 
start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>41     fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
42         self.as_mut().project().sink.start_send(item).map_err(|e| self.as_mut().take_f()(e))
43     }
44 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>45     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
46         self.as_mut().project().sink.poll_flush(cx).map_err(|e| self.as_mut().take_f()(e))
47     }
48 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>49     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
50         self.as_mut().project().sink.poll_close(cx).map_err(|e| self.as_mut().take_f()(e))
51     }
52 }
53 
54 // Forwarding impl of Stream from the underlying sink
55 impl<S: Stream, F> Stream for SinkMapErr<S, F> {
56     type Item = S::Item;
57 
58     delegate_stream!(sink);
59 }
60 
61 impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> {
is_terminated(&self) -> bool62     fn is_terminated(&self) -> bool {
63         self.sink.is_terminated()
64     }
65 }
66