1 #![allow(dead_code)]
2
3 use std::pin::Pin;
4 use std::task::{Context, Poll};
5 use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
6 use tokio_stream::Stream;
7
8 struct UnboundedStream<T> {
9 recv: UnboundedReceiver<T>,
10 }
11 impl<T> Stream for UnboundedStream<T> {
12 type Item = T;
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>13 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
14 Pin::into_inner(self).recv.poll_recv(cx)
15 }
16 }
17
unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>)18 pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) {
19 let (tx, rx) = mpsc::unbounded_channel();
20
21 let stream = UnboundedStream { recv: rx };
22
23 (tx, stream)
24 }
25
26 struct BoundedStream<T> {
27 recv: Receiver<T>,
28 }
29 impl<T> Stream for BoundedStream<T> {
30 type Item = T;
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>31 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
32 Pin::into_inner(self).recv.poll_recv(cx)
33 }
34 }
35
channel_stream<T: Unpin>(size: usize) -> (Sender<T>, impl Stream<Item = T>)36 pub fn channel_stream<T: Unpin>(size: usize) -> (Sender<T>, impl Stream<Item = T>) {
37 let (tx, rx) = mpsc::channel(size);
38
39 let stream = BoundedStream { recv: rx };
40
41 (tx, stream)
42 }
43