1 use crate::Stream;
2 use pin_project_lite::pin_project;
3 use std::pin::Pin;
4 use std::task::{Context, Poll};
5 
6 pin_project! {
7     /// A `Stream` that wraps the values in an `Option`.
8     ///
9     /// Whenever the wrapped stream yields an item, this stream yields that item
10     /// wrapped in `Some`. When the inner stream ends, then this stream first
11     /// yields a `None` item, and then this stream will also end.
12     ///
13     /// # Example
14     ///
15     /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
16     ///
17     /// ```
18     /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
19     ///
20     /// #[tokio::main]
21     /// async fn main() {
22     ///     let mut map = StreamMap::new();
23     ///     let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
24     ///     let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
25     ///     map.insert(0, stream);
26     ///     map.insert(1, stream2);
27     ///     while let Some((key, val)) = map.next().await {
28     ///         match val {
29     ///             Some(val) => println!("got {val:?} from stream {key:?}"),
30     ///             None => println!("stream {key:?} closed"),
31     ///         }
32     ///     }
33     /// }
34     /// ```
35     #[must_use = "streams do nothing unless polled"]
36     pub struct StreamNotifyClose<S> {
37         #[pin]
38         inner: Option<S>,
39     }
40 }
41 
42 impl<S> StreamNotifyClose<S> {
43     /// Create a new `StreamNotifyClose`.
new(stream: S) -> Self44     pub fn new(stream: S) -> Self {
45         Self {
46             inner: Some(stream),
47         }
48     }
49 
50     /// Get back the inner `Stream`.
51     ///
52     /// Returns `None` if the stream has reached its end.
into_inner(self) -> Option<S>53     pub fn into_inner(self) -> Option<S> {
54         self.inner
55     }
56 }
57 
58 impl<S> Stream for StreamNotifyClose<S>
59 where
60     S: Stream,
61 {
62     type Item = Option<S::Item>;
63 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>64     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65         // We can't invoke poll_next after it ended, so we unset the inner stream as a marker.
66         match self
67             .as_mut()
68             .project()
69             .inner
70             .as_pin_mut()
71             .map(|stream| S::poll_next(stream, cx))
72         {
73             Some(Poll::Ready(Some(item))) => Poll::Ready(Some(Some(item))),
74             Some(Poll::Ready(None)) => {
75                 self.project().inner.set(None);
76                 Poll::Ready(Some(None))
77             }
78             Some(Poll::Pending) => Poll::Pending,
79             None => Poll::Ready(None),
80         }
81     }
82 
83     #[inline]
size_hint(&self) -> (usize, Option<usize>)84     fn size_hint(&self) -> (usize, Option<usize>) {
85         if let Some(inner) = &self.inner {
86             // We always return +1 because when there's stream there's atleast one more item.
87             let (l, u) = inner.size_hint();
88             (l.saturating_add(1), u.and_then(|u| u.checked_add(1)))
89         } else {
90             (0, Some(0))
91         }
92     }
93 }
94