1 use futures_core::stream::{FusedStream, Stream}; 2 use futures_core::task::{Context, Poll}; 3 use pin_project_lite::pin_project; 4 use std::any::Any; 5 use std::boxed::Box; 6 use std::panic::{catch_unwind, AssertUnwindSafe, UnwindSafe}; 7 use std::pin::Pin; 8 9 pin_project! { 10 /// Stream for the [`catch_unwind`](super::StreamExt::catch_unwind) method. 11 #[derive(Debug)] 12 #[must_use = "streams do nothing unless polled"] 13 pub struct CatchUnwind<St> { 14 #[pin] 15 stream: St, 16 caught_unwind: bool, 17 } 18 } 19 20 impl<St: Stream + UnwindSafe> CatchUnwind<St> { new(stream: St) -> Self21 pub(super) fn new(stream: St) -> Self { 22 Self { stream, caught_unwind: false } 23 } 24 25 delegate_access_inner!(stream, St, ()); 26 } 27 28 impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> { 29 type Item = Result<St::Item, Box<dyn Any + Send>>; 30 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>31 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 32 let mut this = self.project(); 33 34 if *this.caught_unwind { 35 Poll::Ready(None) 36 } else { 37 let res = catch_unwind(AssertUnwindSafe(|| this.stream.as_mut().poll_next(cx))); 38 39 match res { 40 Ok(poll) => poll.map(|opt| opt.map(Ok)), 41 Err(e) => { 42 *this.caught_unwind = true; 43 Poll::Ready(Some(Err(e))) 44 } 45 } 46 } 47 } 48 size_hint(&self) -> (usize, Option<usize>)49 fn size_hint(&self) -> (usize, Option<usize>) { 50 if self.caught_unwind { 51 (0, Some(0)) 52 } else { 53 self.stream.size_hint() 54 } 55 } 56 } 57 58 impl<St: FusedStream + UnwindSafe> FusedStream for CatchUnwind<St> { is_terminated(&self) -> bool59 fn is_terminated(&self) -> bool { 60 self.caught_unwind || self.stream.is_terminated() 61 } 62 } 63