1 use alloc::collections::VecDeque;
2 use core::pin::Pin;
3 use futures_core::ready;
4 use futures_core::stream::{FusedStream, Stream};
5 use futures_core::task::{Context, Poll};
6 use futures_sink::Sink;
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Sink for the [`buffer`](super::SinkExt::buffer) method.
11     #[derive(Debug)]
12     #[must_use = "sinks do nothing unless polled"]
13     pub struct Buffer<Si, Item> {
14         #[pin]
15         sink: Si,
16         buf: VecDeque<Item>,
17 
18         // Track capacity separately from the `VecDeque`, which may be rounded up
19         capacity: usize,
20     }
21 }
22 
23 impl<Si: Sink<Item>, Item> Buffer<Si, Item> {
new(sink: Si, capacity: usize) -> Self24     pub(super) fn new(sink: Si, capacity: usize) -> Self {
25         Self { sink, buf: VecDeque::with_capacity(capacity), capacity }
26     }
27 
28     delegate_access_inner!(sink, Si, ());
29 
try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>>30     fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> {
31         let mut this = self.project();
32         ready!(this.sink.as_mut().poll_ready(cx))?;
33         while let Some(item) = this.buf.pop_front() {
34             this.sink.as_mut().start_send(item)?;
35             if !this.buf.is_empty() {
36                 ready!(this.sink.as_mut().poll_ready(cx))?;
37             }
38         }
39         Poll::Ready(Ok(()))
40     }
41 }
42 
43 // Forwarding impl of Stream from the underlying sink
44 impl<S, Item> Stream for Buffer<S, Item>
45 where
46     S: Sink<Item> + Stream,
47 {
48     type Item = S::Item;
49 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>50     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
51         self.project().sink.poll_next(cx)
52     }
53 
size_hint(&self) -> (usize, Option<usize>)54     fn size_hint(&self) -> (usize, Option<usize>) {
55         self.sink.size_hint()
56     }
57 }
58 
59 impl<S, Item> FusedStream for Buffer<S, Item>
60 where
61     S: Sink<Item> + FusedStream,
62 {
is_terminated(&self) -> bool63     fn is_terminated(&self) -> bool {
64         self.sink.is_terminated()
65     }
66 }
67 
68 impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> {
69     type Error = Si::Error;
70 
poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>71     fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72         if self.capacity == 0 {
73             return self.project().sink.poll_ready(cx);
74         }
75 
76         let _ = self.as_mut().try_empty_buffer(cx)?;
77 
78         if self.buf.len() >= self.capacity {
79             Poll::Pending
80         } else {
81             Poll::Ready(Ok(()))
82         }
83     }
84 
start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>85     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
86         if self.capacity == 0 {
87             self.project().sink.start_send(item)
88         } else {
89             self.project().buf.push_back(item);
90             Ok(())
91         }
92     }
93 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>94     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
95         ready!(self.as_mut().try_empty_buffer(cx))?;
96         debug_assert!(self.buf.is_empty());
97         self.project().sink.poll_flush(cx)
98     }
99 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>100     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101         ready!(self.as_mut().try_empty_buffer(cx))?;
102         debug_assert!(self.buf.is_empty());
103         self.project().sink.poll_close(cx)
104     }
105 }
106