1 //! Tokio wrappers which apply timeouts to IO operations.
2 //!
3 //! These timeouts are analogous to the read and write timeouts on traditional blocking sockets. A timeout countdown is
4 //! initiated when a read/write operation returns [`Poll::Pending`]. If a read/write does not return successfully before
5 //! the countdown expires, an [`io::Error`] with a kind of [`TimedOut`](io::ErrorKind::TimedOut) is returned.
6 #![doc(html_root_url = "https://docs.rs/tokio-io-timeout/1")]
7 #![warn(missing_docs)]
8 
9 use pin_project_lite::pin_project;
10 use std::future::Future;
11 use std::io;
12 use std::io::SeekFrom;
13 use std::pin::Pin;
14 use std::task::{Context, Poll};
15 use std::time::Duration;
16 use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
17 use tokio::time::{sleep_until, Instant, Sleep};
18 
19 pin_project! {
20     #[derive(Debug)]
21     struct TimeoutState {
22         timeout: Option<Duration>,
23         #[pin]
24         cur: Sleep,
25         active: bool,
26     }
27 }
28 
29 impl TimeoutState {
30     #[inline]
new() -> TimeoutState31     fn new() -> TimeoutState {
32         TimeoutState {
33             timeout: None,
34             cur: sleep_until(Instant::now()),
35             active: false,
36         }
37     }
38 
39     #[inline]
timeout(&self) -> Option<Duration>40     fn timeout(&self) -> Option<Duration> {
41         self.timeout
42     }
43 
44     #[inline]
set_timeout(&mut self, timeout: Option<Duration>)45     fn set_timeout(&mut self, timeout: Option<Duration>) {
46         // since this takes &mut self, we can't yet be active
47         self.timeout = timeout;
48     }
49 
50     #[inline]
set_timeout_pinned(mut self: Pin<&mut Self>, timeout: Option<Duration>)51     fn set_timeout_pinned(mut self: Pin<&mut Self>, timeout: Option<Duration>) {
52         *self.as_mut().project().timeout = timeout;
53         self.reset();
54     }
55 
56     #[inline]
reset(self: Pin<&mut Self>)57     fn reset(self: Pin<&mut Self>) {
58         let this = self.project();
59 
60         if *this.active {
61             *this.active = false;
62             this.cur.reset(Instant::now());
63         }
64     }
65 
66     #[inline]
poll_check(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Result<()>67     fn poll_check(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Result<()> {
68         let mut this = self.project();
69 
70         let timeout = match this.timeout {
71             Some(timeout) => *timeout,
72             None => return Ok(()),
73         };
74 
75         if !*this.active {
76             this.cur.as_mut().reset(Instant::now() + timeout);
77             *this.active = true;
78         }
79 
80         match this.cur.poll(cx) {
81             Poll::Ready(()) => Err(io::Error::from(io::ErrorKind::TimedOut)),
82             Poll::Pending => Ok(()),
83         }
84     }
85 }
86 
87 pin_project! {
88     /// An `AsyncRead`er which applies a timeout to read operations.
89     #[derive(Debug)]
90     pub struct TimeoutReader<R> {
91         #[pin]
92         reader: R,
93         #[pin]
94         state: TimeoutState,
95     }
96 }
97 
98 impl<R> TimeoutReader<R>
99 where
100     R: AsyncRead,
101 {
102     /// Returns a new `TimeoutReader` wrapping the specified reader.
103     ///
104     /// There is initially no timeout.
new(reader: R) -> TimeoutReader<R>105     pub fn new(reader: R) -> TimeoutReader<R> {
106         TimeoutReader {
107             reader,
108             state: TimeoutState::new(),
109         }
110     }
111 
112     /// Returns the current read timeout.
timeout(&self) -> Option<Duration>113     pub fn timeout(&self) -> Option<Duration> {
114         self.state.timeout()
115     }
116 
117     /// Sets the read timeout.
118     ///
119     /// This can only be used before the reader is pinned; use [`set_timeout_pinned`](Self::set_timeout_pinned)
120     /// otherwise.
set_timeout(&mut self, timeout: Option<Duration>)121     pub fn set_timeout(&mut self, timeout: Option<Duration>) {
122         self.state.set_timeout(timeout);
123     }
124 
125     /// Sets the read timeout.
126     ///
127     /// This will reset any pending timeout. Use [`set_timeout`](Self::set_timeout) instead if the reader is not yet
128     /// pinned.
set_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>)129     pub fn set_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>) {
130         self.project().state.set_timeout_pinned(timeout);
131     }
132 
133     /// Returns a shared reference to the inner reader.
get_ref(&self) -> &R134     pub fn get_ref(&self) -> &R {
135         &self.reader
136     }
137 
138     /// Returns a mutable reference to the inner reader.
get_mut(&mut self) -> &mut R139     pub fn get_mut(&mut self) -> &mut R {
140         &mut self.reader
141     }
142 
143     /// Returns a pinned mutable reference to the inner reader.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R>144     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
145         self.project().reader
146     }
147 
148     /// Consumes the `TimeoutReader`, returning the inner reader.
into_inner(self) -> R149     pub fn into_inner(self) -> R {
150         self.reader
151     }
152 }
153 
154 impl<R> AsyncRead for TimeoutReader<R>
155 where
156     R: AsyncRead,
157 {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<Result<(), io::Error>>158     fn poll_read(
159         self: Pin<&mut Self>,
160         cx: &mut Context<'_>,
161         buf: &mut ReadBuf<'_>,
162     ) -> Poll<Result<(), io::Error>> {
163         let this = self.project();
164         let r = this.reader.poll_read(cx, buf);
165         match r {
166             Poll::Pending => this.state.poll_check(cx)?,
167             _ => this.state.reset(),
168         }
169         r
170     }
171 }
172 
173 impl<R> AsyncWrite for TimeoutReader<R>
174 where
175     R: AsyncWrite,
176 {
poll_write( self: Pin<&mut Self>, cx: &mut Context, buf: &[u8], ) -> Poll<Result<usize, io::Error>>177     fn poll_write(
178         self: Pin<&mut Self>,
179         cx: &mut Context,
180         buf: &[u8],
181     ) -> Poll<Result<usize, io::Error>> {
182         self.project().reader.poll_write(cx, buf)
183     }
184 
poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>>185     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
186         self.project().reader.poll_flush(cx)
187     }
188 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>>189     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
190         self.project().reader.poll_shutdown(cx)
191     }
192 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>193     fn poll_write_vectored(
194         self: Pin<&mut Self>,
195         cx: &mut Context<'_>,
196         bufs: &[io::IoSlice<'_>],
197     ) -> Poll<io::Result<usize>> {
198         self.project().reader.poll_write_vectored(cx, bufs)
199     }
200 
is_write_vectored(&self) -> bool201     fn is_write_vectored(&self) -> bool {
202         self.reader.is_write_vectored()
203     }
204 }
205 
206 impl<R> AsyncSeek for TimeoutReader<R>
207 where
208     R: AsyncSeek,
209 {
start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>210     fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
211         self.project().reader.start_seek(position)
212     }
poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>213     fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
214         self.project().reader.poll_complete(cx)
215     }
216 }
217 
218 pin_project! {
219     /// An `AsyncWrite`er which applies a timeout to write operations.
220     #[derive(Debug)]
221     pub struct TimeoutWriter<W> {
222         #[pin]
223         writer: W,
224         #[pin]
225         state: TimeoutState,
226     }
227 }
228 
229 impl<W> TimeoutWriter<W>
230 where
231     W: AsyncWrite,
232 {
233     /// Returns a new `TimeoutReader` wrapping the specified reader.
234     ///
235     /// There is initially no timeout.
new(writer: W) -> TimeoutWriter<W>236     pub fn new(writer: W) -> TimeoutWriter<W> {
237         TimeoutWriter {
238             writer,
239             state: TimeoutState::new(),
240         }
241     }
242 
243     /// Returns the current write timeout.
timeout(&self) -> Option<Duration>244     pub fn timeout(&self) -> Option<Duration> {
245         self.state.timeout()
246     }
247 
248     /// Sets the write timeout.
249     ///
250     /// This can only be used before the writer is pinned; use [`set_timeout_pinned`](Self::set_timeout_pinned)
251     /// otherwise.
set_timeout(&mut self, timeout: Option<Duration>)252     pub fn set_timeout(&mut self, timeout: Option<Duration>) {
253         self.state.set_timeout(timeout);
254     }
255 
256     /// Sets the write timeout.
257     ///
258     /// This will reset any pending timeout. Use [`set_timeout`](Self::set_timeout) instead if the reader is not yet
259     /// pinned.
set_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>)260     pub fn set_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>) {
261         self.project().state.set_timeout_pinned(timeout);
262     }
263 
264     /// Returns a shared reference to the inner writer.
get_ref(&self) -> &W265     pub fn get_ref(&self) -> &W {
266         &self.writer
267     }
268 
269     /// Returns a mutable reference to the inner writer.
get_mut(&mut self) -> &mut W270     pub fn get_mut(&mut self) -> &mut W {
271         &mut self.writer
272     }
273 
274     /// Returns a pinned mutable reference to the inner writer.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W>275     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
276         self.project().writer
277     }
278 
279     /// Consumes the `TimeoutWriter`, returning the inner writer.
into_inner(self) -> W280     pub fn into_inner(self) -> W {
281         self.writer
282     }
283 }
284 
285 impl<W> AsyncWrite for TimeoutWriter<W>
286 where
287     W: AsyncWrite,
288 {
poll_write( self: Pin<&mut Self>, cx: &mut Context, buf: &[u8], ) -> Poll<Result<usize, io::Error>>289     fn poll_write(
290         self: Pin<&mut Self>,
291         cx: &mut Context,
292         buf: &[u8],
293     ) -> Poll<Result<usize, io::Error>> {
294         let this = self.project();
295         let r = this.writer.poll_write(cx, buf);
296         match r {
297             Poll::Pending => this.state.poll_check(cx)?,
298             _ => this.state.reset(),
299         }
300         r
301     }
302 
poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>>303     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
304         let this = self.project();
305         let r = this.writer.poll_flush(cx);
306         match r {
307             Poll::Pending => this.state.poll_check(cx)?,
308             _ => this.state.reset(),
309         }
310         r
311     }
312 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>>313     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
314         let this = self.project();
315         let r = this.writer.poll_shutdown(cx);
316         match r {
317             Poll::Pending => this.state.poll_check(cx)?,
318             _ => this.state.reset(),
319         }
320         r
321     }
322 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>323     fn poll_write_vectored(
324         self: Pin<&mut Self>,
325         cx: &mut Context<'_>,
326         bufs: &[io::IoSlice<'_>],
327     ) -> Poll<io::Result<usize>> {
328         let this = self.project();
329         let r = this.writer.poll_write_vectored(cx, bufs);
330         match r {
331             Poll::Pending => this.state.poll_check(cx)?,
332             _ => this.state.reset(),
333         }
334         r
335     }
336 
is_write_vectored(&self) -> bool337     fn is_write_vectored(&self) -> bool {
338         self.writer.is_write_vectored()
339     }
340 }
341 
342 impl<W> AsyncRead for TimeoutWriter<W>
343 where
344     W: AsyncRead,
345 {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<Result<(), io::Error>>346     fn poll_read(
347         self: Pin<&mut Self>,
348         cx: &mut Context<'_>,
349         buf: &mut ReadBuf<'_>,
350     ) -> Poll<Result<(), io::Error>> {
351         self.project().writer.poll_read(cx, buf)
352     }
353 }
354 
355 impl<W> AsyncSeek for TimeoutWriter<W>
356 where
357     W: AsyncSeek,
358 {
start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>359     fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
360         self.project().writer.start_seek(position)
361     }
poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>362     fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
363         self.project().writer.poll_complete(cx)
364     }
365 }
366 
367 pin_project! {
368     /// A stream which applies read and write timeouts to an inner stream.
369     #[derive(Debug)]
370     pub struct TimeoutStream<S> {
371         #[pin]
372         stream: TimeoutReader<TimeoutWriter<S>>
373     }
374 }
375 
376 impl<S> TimeoutStream<S>
377 where
378     S: AsyncRead + AsyncWrite,
379 {
380     /// Returns a new `TimeoutStream` wrapping the specified stream.
381     ///
382     /// There is initially no read or write timeout.
new(stream: S) -> TimeoutStream<S>383     pub fn new(stream: S) -> TimeoutStream<S> {
384         let writer = TimeoutWriter::new(stream);
385         let stream = TimeoutReader::new(writer);
386         TimeoutStream { stream }
387     }
388 
389     /// Returns the current read timeout.
read_timeout(&self) -> Option<Duration>390     pub fn read_timeout(&self) -> Option<Duration> {
391         self.stream.timeout()
392     }
393 
394     /// Sets the read timeout.
395     ///
396     /// This can only be used before the stream is pinned; use
397     /// [`set_read_timeout_pinned`](Self::set_read_timeout_pinned) otherwise.
set_read_timeout(&mut self, timeout: Option<Duration>)398     pub fn set_read_timeout(&mut self, timeout: Option<Duration>) {
399         self.stream.set_timeout(timeout)
400     }
401 
402     /// Sets the read timeout.
403     ///
404     /// This will reset any pending read timeout. Use [`set_read_timeout`](Self::set_read_timeout) instead if the stream
405     /// has not yet been pinned.
set_read_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>)406     pub fn set_read_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>) {
407         self.project().stream.set_timeout_pinned(timeout)
408     }
409 
410     /// Returns the current write timeout.
write_timeout(&self) -> Option<Duration>411     pub fn write_timeout(&self) -> Option<Duration> {
412         self.stream.get_ref().timeout()
413     }
414 
415     /// Sets the write timeout.
416     ///
417     /// This can only be used before the stream is pinned; use
418     /// [`set_write_timeout_pinned`](Self::set_write_timeout_pinned) otherwise.
set_write_timeout(&mut self, timeout: Option<Duration>)419     pub fn set_write_timeout(&mut self, timeout: Option<Duration>) {
420         self.stream.get_mut().set_timeout(timeout)
421     }
422 
423     /// Sets the write timeout.
424     ///
425     /// This will reset any pending write timeout. Use [`set_write_timeout`](Self::set_write_timeout) instead if the
426     /// stream has not yet been pinned.
set_write_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>)427     pub fn set_write_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>) {
428         self.project()
429             .stream
430             .get_pin_mut()
431             .set_timeout_pinned(timeout)
432     }
433 
434     /// Returns a shared reference to the inner stream.
get_ref(&self) -> &S435     pub fn get_ref(&self) -> &S {
436         self.stream.get_ref().get_ref()
437     }
438 
439     /// Returns a mutable reference to the inner stream.
get_mut(&mut self) -> &mut S440     pub fn get_mut(&mut self) -> &mut S {
441         self.stream.get_mut().get_mut()
442     }
443 
444     /// Returns a pinned mutable reference to the inner stream.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S>445     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> {
446         self.project().stream.get_pin_mut().get_pin_mut()
447     }
448 
449     /// Consumes the stream, returning the inner stream.
into_inner(self) -> S450     pub fn into_inner(self) -> S {
451         self.stream.into_inner().into_inner()
452     }
453 }
454 
455 impl<S> AsyncRead for TimeoutStream<S>
456 where
457     S: AsyncRead + AsyncWrite,
458 {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<Result<(), io::Error>>459     fn poll_read(
460         self: Pin<&mut Self>,
461         cx: &mut Context<'_>,
462         buf: &mut ReadBuf<'_>,
463     ) -> Poll<Result<(), io::Error>> {
464         self.project().stream.poll_read(cx, buf)
465     }
466 }
467 
468 impl<S> AsyncWrite for TimeoutStream<S>
469 where
470     S: AsyncRead + AsyncWrite,
471 {
poll_write( self: Pin<&mut Self>, cx: &mut Context, buf: &[u8], ) -> Poll<Result<usize, io::Error>>472     fn poll_write(
473         self: Pin<&mut Self>,
474         cx: &mut Context,
475         buf: &[u8],
476     ) -> Poll<Result<usize, io::Error>> {
477         self.project().stream.poll_write(cx, buf)
478     }
479 
poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>>480     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
481         self.project().stream.poll_flush(cx)
482     }
483 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>>484     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
485         self.project().stream.poll_shutdown(cx)
486     }
487 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>488     fn poll_write_vectored(
489         self: Pin<&mut Self>,
490         cx: &mut Context<'_>,
491         bufs: &[io::IoSlice<'_>],
492     ) -> Poll<io::Result<usize>> {
493         self.project().stream.poll_write_vectored(cx, bufs)
494     }
495 
is_write_vectored(&self) -> bool496     fn is_write_vectored(&self) -> bool {
497         self.stream.is_write_vectored()
498     }
499 }
500 
501 #[cfg(test)]
502 mod test {
503     use super::*;
504     use std::io::Write;
505     use std::net::TcpListener;
506     use std::thread;
507     use tokio::io::{AsyncReadExt, AsyncWriteExt};
508     use tokio::net::TcpStream;
509     use tokio::pin;
510 
511     pin_project! {
512         struct DelayStream {
513             #[pin]
514             sleep: Sleep,
515         }
516     }
517 
518     impl DelayStream {
new(until: Instant) -> Self519         fn new(until: Instant) -> Self {
520             DelayStream {
521                 sleep: sleep_until(until),
522             }
523         }
524     }
525 
526     impl AsyncRead for DelayStream {
poll_read( self: Pin<&mut Self>, cx: &mut Context, _buf: &mut ReadBuf, ) -> Poll<Result<(), io::Error>>527         fn poll_read(
528             self: Pin<&mut Self>,
529             cx: &mut Context,
530             _buf: &mut ReadBuf,
531         ) -> Poll<Result<(), io::Error>> {
532             match self.project().sleep.poll(cx) {
533                 Poll::Ready(()) => Poll::Ready(Ok(())),
534                 Poll::Pending => Poll::Pending,
535             }
536         }
537     }
538 
539     impl AsyncWrite for DelayStream {
poll_write( self: Pin<&mut Self>, cx: &mut Context, buf: &[u8], ) -> Poll<Result<usize, io::Error>>540         fn poll_write(
541             self: Pin<&mut Self>,
542             cx: &mut Context,
543             buf: &[u8],
544         ) -> Poll<Result<usize, io::Error>> {
545             match self.project().sleep.poll(cx) {
546                 Poll::Ready(()) => Poll::Ready(Ok(buf.len())),
547                 Poll::Pending => Poll::Pending,
548             }
549         }
550 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), io::Error>>551         fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), io::Error>> {
552             Poll::Ready(Ok(()))
553         }
554 
poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), io::Error>>555         fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), io::Error>> {
556             Poll::Ready(Ok(()))
557         }
558     }
559 
560     #[tokio::test]
read_timeout()561     async fn read_timeout() {
562         let reader = DelayStream::new(Instant::now() + Duration::from_millis(500));
563         let mut reader = TimeoutReader::new(reader);
564         reader.set_timeout(Some(Duration::from_millis(100)));
565         pin!(reader);
566 
567         let r = reader.read(&mut [0]).await;
568         assert_eq!(r.err().unwrap().kind(), io::ErrorKind::TimedOut);
569     }
570 
571     #[tokio::test]
read_ok()572     async fn read_ok() {
573         let reader = DelayStream::new(Instant::now() + Duration::from_millis(100));
574         let mut reader = TimeoutReader::new(reader);
575         reader.set_timeout(Some(Duration::from_millis(500)));
576         pin!(reader);
577 
578         reader.read(&mut [0]).await.unwrap();
579     }
580 
581     #[tokio::test]
write_timeout()582     async fn write_timeout() {
583         let writer = DelayStream::new(Instant::now() + Duration::from_millis(500));
584         let mut writer = TimeoutWriter::new(writer);
585         writer.set_timeout(Some(Duration::from_millis(100)));
586         pin!(writer);
587 
588         let r = writer.write(&[0]).await;
589         assert_eq!(r.err().unwrap().kind(), io::ErrorKind::TimedOut);
590     }
591 
592     #[tokio::test]
write_ok()593     async fn write_ok() {
594         let writer = DelayStream::new(Instant::now() + Duration::from_millis(100));
595         let mut writer = TimeoutWriter::new(writer);
596         writer.set_timeout(Some(Duration::from_millis(500)));
597         pin!(writer);
598 
599         writer.write(&[0]).await.unwrap();
600     }
601 
602     #[tokio::test]
tcp_read()603     async fn tcp_read() {
604         let listener = TcpListener::bind("127.0.0.1:0").unwrap();
605         let addr = listener.local_addr().unwrap();
606 
607         thread::spawn(move || {
608             let mut socket = listener.accept().unwrap().0;
609             thread::sleep(Duration::from_millis(10));
610             socket.write_all(b"f").unwrap();
611             thread::sleep(Duration::from_millis(500));
612             let _ = socket.write_all(b"f"); // this may hit an eof
613         });
614 
615         let s = TcpStream::connect(&addr).await.unwrap();
616         let mut s = TimeoutStream::new(s);
617         s.set_read_timeout(Some(Duration::from_millis(100)));
618         pin!(s);
619         s.read(&mut [0]).await.unwrap();
620         let r = s.read(&mut [0]).await;
621 
622         match r {
623             Ok(_) => panic!("unexpected success"),
624             Err(ref e) if e.kind() == io::ErrorKind::TimedOut => (),
625             Err(e) => panic!("{:?}", e),
626         }
627     }
628 }
629