1 use crate::Stream; 2 use pin_project_lite::pin_project; 3 use std::pin::Pin; 4 use std::task::{Context, Poll}; 5 6 pin_project! { 7 /// A `Stream` that wraps the values in an `Option`. 8 /// 9 /// Whenever the wrapped stream yields an item, this stream yields that item 10 /// wrapped in `Some`. When the inner stream ends, then this stream first 11 /// yields a `None` item, and then this stream will also end. 12 /// 13 /// # Example 14 /// 15 /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`. 16 /// 17 /// ``` 18 /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose}; 19 /// 20 /// #[tokio::main] 21 /// async fn main() { 22 /// let mut map = StreamMap::new(); 23 /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); 24 /// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); 25 /// map.insert(0, stream); 26 /// map.insert(1, stream2); 27 /// while let Some((key, val)) = map.next().await { 28 /// match val { 29 /// Some(val) => println!("got {val:?} from stream {key:?}"), 30 /// None => println!("stream {key:?} closed"), 31 /// } 32 /// } 33 /// } 34 /// ``` 35 #[must_use = "streams do nothing unless polled"] 36 pub struct StreamNotifyClose<S> { 37 #[pin] 38 inner: Option<S>, 39 } 40 } 41 42 impl<S> StreamNotifyClose<S> { 43 /// Create a new `StreamNotifyClose`. new(stream: S) -> Self44 pub fn new(stream: S) -> Self { 45 Self { 46 inner: Some(stream), 47 } 48 } 49 50 /// Get back the inner `Stream`. 51 /// 52 /// Returns `None` if the stream has reached its end. into_inner(self) -> Option<S>53 pub fn into_inner(self) -> Option<S> { 54 self.inner 55 } 56 } 57 58 impl<S> Stream for StreamNotifyClose<S> 59 where 60 S: Stream, 61 { 62 type Item = Option<S::Item>; 63 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>64 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 65 // We can't invoke poll_next after it ended, so we unset the inner stream as a marker. 66 match self 67 .as_mut() 68 .project() 69 .inner 70 .as_pin_mut() 71 .map(|stream| S::poll_next(stream, cx)) 72 { 73 Some(Poll::Ready(Some(item))) => Poll::Ready(Some(Some(item))), 74 Some(Poll::Ready(None)) => { 75 self.project().inner.set(None); 76 Poll::Ready(Some(None)) 77 } 78 Some(Poll::Pending) => Poll::Pending, 79 None => Poll::Ready(None), 80 } 81 } 82 83 #[inline] size_hint(&self) -> (usize, Option<usize>)84 fn size_hint(&self) -> (usize, Option<usize>) { 85 if let Some(inner) = &self.inner { 86 // We always return +1 because when there's stream there's atleast one more item. 87 let (l, u) = inner.size_hint(); 88 (l.saturating_add(1), u.and_then(|u| u.checked_add(1))) 89 } else { 90 (0, Some(0)) 91 } 92 } 93 } 94