1 #![allow(dead_code)]
2 
3 use std::{
4     io,
5     marker::Unpin,
6     pin::Pin,
7     task::{self, Poll},
8 };
9 
10 use {futures_03_dep::ready, partial_io::PartialOp};
11 
12 pub struct PartialAsyncRead<R> {
13     inner: R,
14     ops: Box<dyn Iterator<Item = PartialOp> + Send>,
15 }
16 
17 impl<R> PartialAsyncRead<R>
18 where
19     R: Unpin,
20 {
new<I>(inner: R, ops: I) -> Self where I: IntoIterator<Item = PartialOp>, I::IntoIter: Send + 'static,21     pub fn new<I>(inner: R, ops: I) -> Self
22     where
23         I: IntoIterator<Item = PartialOp>,
24         I::IntoIter: Send + 'static,
25     {
26         PartialAsyncRead {
27             inner,
28             ops: Box::new(ops.into_iter()),
29         }
30     }
31 }
32 
33 impl<R> tokio_02_dep::io::AsyncRead for PartialAsyncRead<R>
34 where
35     R: tokio_02_dep::io::AsyncRead + Unpin,
36 {
poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>37     fn poll_read(
38         mut self: Pin<&mut Self>,
39         cx: &mut task::Context<'_>,
40         buf: &mut [u8],
41     ) -> Poll<io::Result<usize>> {
42         match self.ops.next() {
43             Some(PartialOp::Limited(n)) => {
44                 let len = std::cmp::min(n, buf.len());
45                 Pin::new(&mut self.inner).poll_read(cx, &mut buf[..len])
46             }
47             Some(PartialOp::Err(err)) => {
48                 if err == io::ErrorKind::WouldBlock {
49                     cx.waker().wake_by_ref();
50                     Poll::Pending
51                 } else {
52                     Err(io::Error::new(
53                         err,
54                         "error during read, generated by partial-io",
55                     ))
56                     .into()
57                 }
58             }
59             Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf),
60         }
61     }
62 }
63 
64 impl<R> tokio_03_dep::io::AsyncRead for PartialAsyncRead<R>
65 where
66     R: tokio_03_dep::io::AsyncRead + Unpin,
67 {
poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut tokio_03_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>68     fn poll_read(
69         mut self: Pin<&mut Self>,
70         cx: &mut task::Context<'_>,
71         buf: &mut tokio_03_dep::io::ReadBuf<'_>,
72     ) -> Poll<io::Result<()>> {
73         match self.ops.next() {
74             Some(PartialOp::Limited(n)) => {
75                 let len = std::cmp::min(n, buf.remaining());
76                 buf.initialize_unfilled();
77                 let mut sub_buf = buf.take(len);
78                 ready!(Pin::new(&mut self.inner).poll_read(cx, &mut sub_buf))?;
79                 let filled = sub_buf.filled().len();
80                 buf.advance(filled);
81                 Poll::Ready(Ok(()))
82             }
83             Some(PartialOp::Err(err)) => {
84                 if err == io::ErrorKind::WouldBlock {
85                     cx.waker().wake_by_ref();
86                     Poll::Pending
87                 } else {
88                     Err(io::Error::new(
89                         err,
90                         "error during read, generated by partial-io",
91                     ))
92                     .into()
93                 }
94             }
95             Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf),
96         }
97     }
98 }
99 
100 impl<R> tokio_dep::io::AsyncRead for PartialAsyncRead<R>
101 where
102     R: tokio_dep::io::AsyncRead + Unpin,
103 {
poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut tokio_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>104     fn poll_read(
105         mut self: Pin<&mut Self>,
106         cx: &mut task::Context<'_>,
107         buf: &mut tokio_dep::io::ReadBuf<'_>,
108     ) -> Poll<io::Result<()>> {
109         match self.ops.next() {
110             Some(PartialOp::Limited(n)) => {
111                 let len = std::cmp::min(n, buf.remaining());
112                 buf.initialize_unfilled();
113                 let mut sub_buf = buf.take(len);
114                 ready!(Pin::new(&mut self.inner).poll_read(cx, &mut sub_buf))?;
115                 let filled = sub_buf.filled().len();
116                 buf.advance(filled);
117                 Poll::Ready(Ok(()))
118             }
119             Some(PartialOp::Err(err)) => {
120                 if err == io::ErrorKind::WouldBlock {
121                     cx.waker().wake_by_ref();
122                     Poll::Pending
123                 } else {
124                     Err(io::Error::new(
125                         err,
126                         "error during read, generated by partial-io",
127                     ))
128                     .into()
129                 }
130             }
131             Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf),
132         }
133     }
134 }
135 
136 pub struct FuturesPartialAsyncRead<R> {
137     inner: R,
138     ops: Box<dyn Iterator<Item = PartialOp> + Send>,
139 }
140 
141 impl<R> FuturesPartialAsyncRead<R>
142 where
143     R: crate::futures::io::AsyncRead + Unpin,
144 {
new<I>(inner: R, ops: I) -> Self where I: IntoIterator<Item = PartialOp>, I::IntoIter: Send + 'static,145     pub fn new<I>(inner: R, ops: I) -> Self
146     where
147         I: IntoIterator<Item = PartialOp>,
148         I::IntoIter: Send + 'static,
149     {
150         FuturesPartialAsyncRead {
151             inner,
152             ops: Box::new(ops.into_iter()),
153         }
154     }
155 }
156 
157 impl<R> crate::futures::io::AsyncRead for FuturesPartialAsyncRead<R>
158 where
159     R: crate::futures::io::AsyncRead + Unpin,
160 {
poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>161     fn poll_read(
162         mut self: Pin<&mut Self>,
163         cx: &mut task::Context<'_>,
164         buf: &mut [u8],
165     ) -> Poll<io::Result<usize>> {
166         match self.ops.next() {
167             Some(PartialOp::Limited(n)) => {
168                 let len = std::cmp::min(n, buf.len());
169                 Pin::new(&mut self.inner).poll_read(cx, &mut buf[..len])
170             }
171             Some(PartialOp::Err(err)) => {
172                 if err == io::ErrorKind::WouldBlock {
173                     cx.waker().wake_by_ref();
174                     Poll::Pending
175                 } else {
176                     Err(io::Error::new(
177                         err,
178                         "error during read, generated by partial-io",
179                     ))
180                     .into()
181                 }
182             }
183             Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf),
184         }
185     }
186 }
187