1 #![allow(dead_code)]
2 
3 use std::pin::Pin;
4 use std::task::{Context, Poll};
5 use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
6 use tokio_stream::Stream;
7 
8 struct UnboundedStream<T> {
9     recv: UnboundedReceiver<T>,
10 }
11 impl<T> Stream for UnboundedStream<T> {
12     type Item = T;
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>13     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
14         Pin::into_inner(self).recv.poll_recv(cx)
15     }
16 }
17 
unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>)18 pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) {
19     let (tx, rx) = mpsc::unbounded_channel();
20 
21     let stream = UnboundedStream { recv: rx };
22 
23     (tx, stream)
24 }
25 
26 struct BoundedStream<T> {
27     recv: Receiver<T>,
28 }
29 impl<T> Stream for BoundedStream<T> {
30     type Item = T;
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>31     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
32         Pin::into_inner(self).recv.poll_recv(cx)
33     }
34 }
35 
channel_stream<T: Unpin>(size: usize) -> (Sender<T>, impl Stream<Item = T>)36 pub fn channel_stream<T: Unpin>(size: usize) -> (Sender<T>, impl Stream<Item = T>) {
37     let (tx, rx) = mpsc::channel(size);
38 
39     let stream = BoundedStream { recv: rx };
40 
41     (tx, stream)
42 }
43