1 use alloc::collections::VecDeque; 2 use core::pin::Pin; 3 use futures_core::ready; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 use futures_sink::Sink; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Sink for the [`buffer`](super::SinkExt::buffer) method. 11 #[derive(Debug)] 12 #[must_use = "sinks do nothing unless polled"] 13 pub struct Buffer<Si, Item> { 14 #[pin] 15 sink: Si, 16 buf: VecDeque<Item>, 17 18 // Track capacity separately from the `VecDeque`, which may be rounded up 19 capacity: usize, 20 } 21 } 22 23 impl<Si: Sink<Item>, Item> Buffer<Si, Item> { new(sink: Si, capacity: usize) -> Self24 pub(super) fn new(sink: Si, capacity: usize) -> Self { 25 Self { sink, buf: VecDeque::with_capacity(capacity), capacity } 26 } 27 28 delegate_access_inner!(sink, Si, ()); 29 try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>>30 fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> { 31 let mut this = self.project(); 32 ready!(this.sink.as_mut().poll_ready(cx))?; 33 while let Some(item) = this.buf.pop_front() { 34 this.sink.as_mut().start_send(item)?; 35 if !this.buf.is_empty() { 36 ready!(this.sink.as_mut().poll_ready(cx))?; 37 } 38 } 39 Poll::Ready(Ok(())) 40 } 41 } 42 43 // Forwarding impl of Stream from the underlying sink 44 impl<S, Item> Stream for Buffer<S, Item> 45 where 46 S: Sink<Item> + Stream, 47 { 48 type Item = S::Item; 49 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>50 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { 51 self.project().sink.poll_next(cx) 52 } 53 size_hint(&self) -> (usize, Option<usize>)54 fn size_hint(&self) -> (usize, Option<usize>) { 55 self.sink.size_hint() 56 } 57 } 58 59 impl<S, Item> FusedStream for Buffer<S, Item> 60 where 61 S: Sink<Item> + FusedStream, 62 { is_terminated(&self) -> bool63 fn is_terminated(&self) -> bool { 64 self.sink.is_terminated() 65 } 66 } 67 68 impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> { 69 type Error = Si::Error; 70 poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>71 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 72 if self.capacity == 0 { 73 return self.project().sink.poll_ready(cx); 74 } 75 76 let _ = self.as_mut().try_empty_buffer(cx)?; 77 78 if self.buf.len() >= self.capacity { 79 Poll::Pending 80 } else { 81 Poll::Ready(Ok(())) 82 } 83 } 84 start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>85 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { 86 if self.capacity == 0 { 87 self.project().sink.start_send(item) 88 } else { 89 self.project().buf.push_back(item); 90 Ok(()) 91 } 92 } 93 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>94 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 95 ready!(self.as_mut().try_empty_buffer(cx))?; 96 debug_assert!(self.buf.is_empty()); 97 self.project().sink.poll_flush(cx) 98 } 99 poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>100 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 101 ready!(self.as_mut().try_empty_buffer(cx))?; 102 debug_assert!(self.buf.is_empty()); 103 self.project().sink.poll_close(cx) 104 } 105 } 106