1 #![allow(dead_code)] 2 3 use std::{ 4 io, 5 marker::Unpin, 6 pin::Pin, 7 task::{self, Poll}, 8 }; 9 10 use {futures_03_dep::ready, partial_io::PartialOp}; 11 12 pub struct PartialAsyncRead<R> { 13 inner: R, 14 ops: Box<dyn Iterator<Item = PartialOp> + Send>, 15 } 16 17 impl<R> PartialAsyncRead<R> 18 where 19 R: Unpin, 20 { new<I>(inner: R, ops: I) -> Self where I: IntoIterator<Item = PartialOp>, I::IntoIter: Send + 'static,21 pub fn new<I>(inner: R, ops: I) -> Self 22 where 23 I: IntoIterator<Item = PartialOp>, 24 I::IntoIter: Send + 'static, 25 { 26 PartialAsyncRead { 27 inner, 28 ops: Box::new(ops.into_iter()), 29 } 30 } 31 } 32 33 impl<R> tokio_02_dep::io::AsyncRead for PartialAsyncRead<R> 34 where 35 R: tokio_02_dep::io::AsyncRead + Unpin, 36 { poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>37 fn poll_read( 38 mut self: Pin<&mut Self>, 39 cx: &mut task::Context<'_>, 40 buf: &mut [u8], 41 ) -> Poll<io::Result<usize>> { 42 match self.ops.next() { 43 Some(PartialOp::Limited(n)) => { 44 let len = std::cmp::min(n, buf.len()); 45 Pin::new(&mut self.inner).poll_read(cx, &mut buf[..len]) 46 } 47 Some(PartialOp::Err(err)) => { 48 if err == io::ErrorKind::WouldBlock { 49 cx.waker().wake_by_ref(); 50 Poll::Pending 51 } else { 52 Err(io::Error::new( 53 err, 54 "error during read, generated by partial-io", 55 )) 56 .into() 57 } 58 } 59 Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf), 60 } 61 } 62 } 63 64 impl<R> tokio_03_dep::io::AsyncRead for PartialAsyncRead<R> 65 where 66 R: tokio_03_dep::io::AsyncRead + Unpin, 67 { poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut tokio_03_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>68 fn poll_read( 69 mut self: Pin<&mut Self>, 70 cx: &mut task::Context<'_>, 71 buf: &mut tokio_03_dep::io::ReadBuf<'_>, 72 ) -> Poll<io::Result<()>> { 73 match self.ops.next() { 74 Some(PartialOp::Limited(n)) => { 75 let len = std::cmp::min(n, buf.remaining()); 76 buf.initialize_unfilled(); 77 let mut sub_buf = buf.take(len); 78 ready!(Pin::new(&mut self.inner).poll_read(cx, &mut sub_buf))?; 79 let filled = sub_buf.filled().len(); 80 buf.advance(filled); 81 Poll::Ready(Ok(())) 82 } 83 Some(PartialOp::Err(err)) => { 84 if err == io::ErrorKind::WouldBlock { 85 cx.waker().wake_by_ref(); 86 Poll::Pending 87 } else { 88 Err(io::Error::new( 89 err, 90 "error during read, generated by partial-io", 91 )) 92 .into() 93 } 94 } 95 Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf), 96 } 97 } 98 } 99 100 impl<R> tokio_dep::io::AsyncRead for PartialAsyncRead<R> 101 where 102 R: tokio_dep::io::AsyncRead + Unpin, 103 { poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut tokio_dep::io::ReadBuf<'_>, ) -> Poll<io::Result<()>>104 fn poll_read( 105 mut self: Pin<&mut Self>, 106 cx: &mut task::Context<'_>, 107 buf: &mut tokio_dep::io::ReadBuf<'_>, 108 ) -> Poll<io::Result<()>> { 109 match self.ops.next() { 110 Some(PartialOp::Limited(n)) => { 111 let len = std::cmp::min(n, buf.remaining()); 112 buf.initialize_unfilled(); 113 let mut sub_buf = buf.take(len); 114 ready!(Pin::new(&mut self.inner).poll_read(cx, &mut sub_buf))?; 115 let filled = sub_buf.filled().len(); 116 buf.advance(filled); 117 Poll::Ready(Ok(())) 118 } 119 Some(PartialOp::Err(err)) => { 120 if err == io::ErrorKind::WouldBlock { 121 cx.waker().wake_by_ref(); 122 Poll::Pending 123 } else { 124 Err(io::Error::new( 125 err, 126 "error during read, generated by partial-io", 127 )) 128 .into() 129 } 130 } 131 Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf), 132 } 133 } 134 } 135 136 pub struct FuturesPartialAsyncRead<R> { 137 inner: R, 138 ops: Box<dyn Iterator<Item = PartialOp> + Send>, 139 } 140 141 impl<R> FuturesPartialAsyncRead<R> 142 where 143 R: crate::futures::io::AsyncRead + Unpin, 144 { new<I>(inner: R, ops: I) -> Self where I: IntoIterator<Item = PartialOp>, I::IntoIter: Send + 'static,145 pub fn new<I>(inner: R, ops: I) -> Self 146 where 147 I: IntoIterator<Item = PartialOp>, 148 I::IntoIter: Send + 'static, 149 { 150 FuturesPartialAsyncRead { 151 inner, 152 ops: Box::new(ops.into_iter()), 153 } 154 } 155 } 156 157 impl<R> crate::futures::io::AsyncRead for FuturesPartialAsyncRead<R> 158 where 159 R: crate::futures::io::AsyncRead + Unpin, 160 { poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>>161 fn poll_read( 162 mut self: Pin<&mut Self>, 163 cx: &mut task::Context<'_>, 164 buf: &mut [u8], 165 ) -> Poll<io::Result<usize>> { 166 match self.ops.next() { 167 Some(PartialOp::Limited(n)) => { 168 let len = std::cmp::min(n, buf.len()); 169 Pin::new(&mut self.inner).poll_read(cx, &mut buf[..len]) 170 } 171 Some(PartialOp::Err(err)) => { 172 if err == io::ErrorKind::WouldBlock { 173 cx.waker().wake_by_ref(); 174 Poll::Pending 175 } else { 176 Err(io::Error::new( 177 err, 178 "error during read, generated by partial-io", 179 )) 180 .into() 181 } 182 } 183 Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf), 184 } 185 } 186 } 187