1 cfg_not_wasi! {
2     use crate::net::{to_socket_addrs, ToSocketAddrs};
3     use std::future::poll_fn;
4     use std::time::Duration;
5 }
6 
7 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
8 use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
9 use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
10 
11 use std::fmt;
12 use std::io;
13 use std::net::{Shutdown, SocketAddr};
14 use std::pin::Pin;
15 use std::task::{ready, Context, Poll};
16 
17 cfg_io_util! {
18     use bytes::BufMut;
19 }
20 
21 cfg_net! {
22     /// A TCP stream between a local and a remote socket.
23     ///
24     /// A TCP stream can either be created by connecting to an endpoint, via the
25     /// [`connect`] method, or by [accepting] a connection from a [listener]. A
26     /// TCP stream can also be created via the [`TcpSocket`] type.
27     ///
28     /// Reading and writing to a `TcpStream` is usually done using the
29     /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
30     /// traits.
31     ///
32     /// [`connect`]: method@TcpStream::connect
33     /// [accepting]: method@crate::net::TcpListener::accept
34     /// [listener]: struct@crate::net::TcpListener
35     /// [`TcpSocket`]: struct@crate::net::TcpSocket
36     /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
37     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
38     ///
39     /// # Examples
40     ///
41     /// ```no_run
42     /// use tokio::net::TcpStream;
43     /// use tokio::io::AsyncWriteExt;
44     /// use std::error::Error;
45     ///
46     /// #[tokio::main]
47     /// async fn main() -> Result<(), Box<dyn Error>> {
48     ///     // Connect to a peer
49     ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
50     ///
51     ///     // Write some data.
52     ///     stream.write_all(b"hello world!").await?;
53     ///
54     ///     Ok(())
55     /// }
56     /// ```
57     ///
58     /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
59     ///
60     /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
61     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
62     ///
63     /// To shut down the stream in the write direction, you can call the
64     /// [`shutdown()`] method. This will cause the other peer to receive a read of
65     /// length 0, indicating that no more data will be sent. This only closes
66     /// the stream in one direction.
67     ///
68     /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
69     pub struct TcpStream {
70         io: PollEvented<mio::net::TcpStream>,
71     }
72 }
73 
74 impl TcpStream {
75     cfg_not_wasi! {
76         /// Opens a TCP connection to a remote host.
77         ///
78         /// `addr` is an address of the remote host. Anything which implements the
79         /// [`ToSocketAddrs`] trait can be supplied as the address.  If `addr`
80         /// yields multiple addresses, connect will be attempted with each of the
81         /// addresses until a connection is successful. If none of the addresses
82         /// result in a successful connection, the error returned from the last
83         /// connection attempt (the last address) is returned.
84         ///
85         /// To configure the socket before connecting, you can use the [`TcpSocket`]
86         /// type.
87         ///
88         /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
89         /// [`TcpSocket`]: struct@crate::net::TcpSocket
90         ///
91         /// # Examples
92         ///
93         /// ```no_run
94         /// use tokio::net::TcpStream;
95         /// use tokio::io::AsyncWriteExt;
96         /// use std::error::Error;
97         ///
98         /// #[tokio::main]
99         /// async fn main() -> Result<(), Box<dyn Error>> {
100         ///     // Connect to a peer
101         ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
102         ///
103         ///     // Write some data.
104         ///     stream.write_all(b"hello world!").await?;
105         ///
106         ///     Ok(())
107         /// }
108         /// ```
109         ///
110         /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
111         ///
112         /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
113         /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
114         pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
115             let addrs = to_socket_addrs(addr).await?;
116 
117             let mut last_err = None;
118 
119             for addr in addrs {
120                 match TcpStream::connect_addr(addr).await {
121                     Ok(stream) => return Ok(stream),
122                     Err(e) => last_err = Some(e),
123                 }
124             }
125 
126             Err(last_err.unwrap_or_else(|| {
127                 io::Error::new(
128                     io::ErrorKind::InvalidInput,
129                     "could not resolve to any address",
130                 )
131             }))
132         }
133 
134         /// Establishes a connection to the specified `addr`.
135         async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
136             let sys = mio::net::TcpStream::connect(addr)?;
137             TcpStream::connect_mio(sys).await
138         }
139 
140         pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
141             let stream = TcpStream::new(sys)?;
142 
143             // Once we've connected, wait for the stream to be writable as
144             // that's when the actual connection has been initiated. Once we're
145             // writable we check for `take_socket_error` to see if the connect
146             // actually hit an error or not.
147             //
148             // If all that succeeded then we ship everything on up.
149             poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
150 
151             if let Some(e) = stream.io.take_error()? {
152                 return Err(e);
153             }
154 
155             Ok(stream)
156         }
157     }
158 
new(connected: mio::net::TcpStream) -> io::Result<TcpStream>159     pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
160         let io = PollEvented::new(connected)?;
161         Ok(TcpStream { io })
162     }
163 
164     /// Creates new `TcpStream` from a `std::net::TcpStream`.
165     ///
166     /// This function is intended to be used to wrap a TCP stream from the
167     /// standard library in the Tokio equivalent.
168     ///
169     /// # Notes
170     ///
171     /// The caller is responsible for ensuring that the stream is in
172     /// non-blocking mode. Otherwise all I/O operations on the stream
173     /// will block the thread, which will cause unexpected behavior.
174     /// Non-blocking mode can be set using [`set_nonblocking`].
175     ///
176     /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
177     ///
178     /// # Examples
179     ///
180     /// ```rust,no_run
181     /// use std::error::Error;
182     /// use tokio::net::TcpStream;
183     ///
184     /// #[tokio::main]
185     /// async fn main() -> Result<(), Box<dyn Error>> {
186     ///     let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
187     ///     std_stream.set_nonblocking(true)?;
188     ///     let stream = TcpStream::from_std(std_stream)?;
189     ///     Ok(())
190     /// }
191     /// ```
192     ///
193     /// # Panics
194     ///
195     /// This function panics if it is not called from within a runtime with
196     /// IO enabled.
197     ///
198     /// The runtime is usually set implicitly when this function is called
199     /// from a future driven by a tokio runtime, otherwise runtime can be set
200     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
201     #[track_caller]
from_std(stream: std::net::TcpStream) -> io::Result<TcpStream>202     pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
203         let io = mio::net::TcpStream::from_std(stream);
204         let io = PollEvented::new(io)?;
205         Ok(TcpStream { io })
206     }
207 
208     /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
209     ///
210     /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
211     /// Use [`set_nonblocking`] to change the blocking mode if needed.
212     ///
213     /// # Examples
214     ///
215     /// ```
216     /// use std::error::Error;
217     /// use std::io::Read;
218     /// use tokio::net::TcpListener;
219     /// # use tokio::net::TcpStream;
220     /// # use tokio::io::AsyncWriteExt;
221     ///
222     /// #[tokio::main]
223     /// async fn main() -> Result<(), Box<dyn Error>> {
224     ///     let mut data = [0u8; 12];
225     /// #   if false {
226     ///     let listener = TcpListener::bind("127.0.0.1:34254").await?;
227     /// #   }
228     /// #   let listener = TcpListener::bind("127.0.0.1:0").await?;
229     /// #   let addr = listener.local_addr().unwrap();
230     /// #   let handle = tokio::spawn(async move {
231     /// #       let mut stream: TcpStream = TcpStream::connect(addr).await.unwrap();
232     /// #       stream.write_all(b"Hello world!").await.unwrap();
233     /// #   });
234     ///     let (tokio_tcp_stream, _) = listener.accept().await?;
235     ///     let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
236     /// #   handle.await.expect("The task being joined has panicked");
237     ///     std_tcp_stream.set_nonblocking(false)?;
238     ///     std_tcp_stream.read_exact(&mut data)?;
239     /// #   assert_eq!(b"Hello world!", &data);
240     ///     Ok(())
241     /// }
242     /// ```
243     /// [`tokio::net::TcpStream`]: TcpStream
244     /// [`std::net::TcpStream`]: std::net::TcpStream
245     /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
into_std(self) -> io::Result<std::net::TcpStream>246     pub fn into_std(self) -> io::Result<std::net::TcpStream> {
247         #[cfg(unix)]
248         {
249             use std::os::unix::io::{FromRawFd, IntoRawFd};
250             self.io
251                 .into_inner()
252                 .map(IntoRawFd::into_raw_fd)
253                 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
254         }
255 
256         #[cfg(windows)]
257         {
258             use std::os::windows::io::{FromRawSocket, IntoRawSocket};
259             self.io
260                 .into_inner()
261                 .map(|io| io.into_raw_socket())
262                 .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
263         }
264 
265         #[cfg(target_os = "wasi")]
266         {
267             use std::os::wasi::io::{FromRawFd, IntoRawFd};
268             self.io
269                 .into_inner()
270                 .map(|io| io.into_raw_fd())
271                 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
272         }
273     }
274 
275     /// Returns the local address that this stream is bound to.
276     ///
277     /// # Examples
278     ///
279     /// ```no_run
280     /// use tokio::net::TcpStream;
281     ///
282     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
283     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
284     ///
285     /// println!("{:?}", stream.local_addr()?);
286     /// # Ok(())
287     /// # }
288     /// ```
local_addr(&self) -> io::Result<SocketAddr>289     pub fn local_addr(&self) -> io::Result<SocketAddr> {
290         self.io.local_addr()
291     }
292 
293     /// Returns the value of the `SO_ERROR` option.
take_error(&self) -> io::Result<Option<io::Error>>294     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
295         self.io.take_error()
296     }
297 
298     /// Returns the remote address that this stream is connected to.
299     ///
300     /// # Examples
301     ///
302     /// ```no_run
303     /// use tokio::net::TcpStream;
304     ///
305     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
306     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
307     ///
308     /// println!("{:?}", stream.peer_addr()?);
309     /// # Ok(())
310     /// # }
311     /// ```
peer_addr(&self) -> io::Result<SocketAddr>312     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
313         self.io.peer_addr()
314     }
315 
316     /// Attempts to receive data on the socket, without removing that data from
317     /// the queue, registering the current task for wakeup if data is not yet
318     /// available.
319     ///
320     /// Note that on multiple calls to `poll_peek`, `poll_read` or
321     /// `poll_read_ready`, only the `Waker` from the `Context` passed to the
322     /// most recent call is scheduled to receive a wakeup. (However,
323     /// `poll_write` retains a second, independent waker.)
324     ///
325     /// # Return value
326     ///
327     /// The function returns:
328     ///
329     /// * `Poll::Pending` if data is not yet available.
330     /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
331     /// * `Poll::Ready(Err(e))` if an error is encountered.
332     ///
333     /// # Errors
334     ///
335     /// This function may encounter any standard I/O error except `WouldBlock`.
336     ///
337     /// # Examples
338     ///
339     /// ```no_run
340     /// use tokio::io::{self, ReadBuf};
341     /// use tokio::net::TcpStream;
342     ///
343     /// use std::future::poll_fn;
344     ///
345     /// #[tokio::main]
346     /// async fn main() -> io::Result<()> {
347     ///     let stream = TcpStream::connect("127.0.0.1:8000").await?;
348     ///     let mut buf = [0; 10];
349     ///     let mut buf = ReadBuf::new(&mut buf);
350     ///
351     ///     poll_fn(|cx| {
352     ///         stream.poll_peek(cx, &mut buf)
353     ///     }).await?;
354     ///
355     ///     Ok(())
356     /// }
357     /// ```
poll_peek( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<usize>>358     pub fn poll_peek(
359         &self,
360         cx: &mut Context<'_>,
361         buf: &mut ReadBuf<'_>,
362     ) -> Poll<io::Result<usize>> {
363         loop {
364             let ev = ready!(self.io.registration().poll_read_ready(cx))?;
365 
366             let b = unsafe {
367                 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
368             };
369 
370             match self.io.peek(b) {
371                 Ok(ret) => {
372                     unsafe { buf.assume_init(ret) };
373                     buf.advance(ret);
374                     return Poll::Ready(Ok(ret));
375                 }
376                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
377                     self.io.registration().clear_readiness(ev);
378                 }
379                 Err(e) => return Poll::Ready(Err(e)),
380             }
381         }
382     }
383 
384     /// Waits for any of the requested ready states.
385     ///
386     /// This function is usually paired with `try_read()` or `try_write()`. It
387     /// can be used to concurrently read / write to the same socket on a single
388     /// task without splitting the socket.
389     ///
390     /// The function may complete without the socket being ready. This is a
391     /// false-positive and attempting an operation will return with
392     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
393     /// [`Ready`] set, so you should always check the returned value and possibly
394     /// wait again if the requested states are not set.
395     ///
396     /// # Cancel safety
397     ///
398     /// This method is cancel safe. Once a readiness event occurs, the method
399     /// will continue to return immediately until the readiness event is
400     /// consumed by an attempt to read or write that fails with `WouldBlock` or
401     /// `Poll::Pending`.
402     ///
403     /// # Examples
404     ///
405     /// Concurrently read and write to the stream on the same task without
406     /// splitting.
407     ///
408     /// ```no_run
409     /// use tokio::io::Interest;
410     /// use tokio::net::TcpStream;
411     /// use std::error::Error;
412     /// use std::io;
413     ///
414     /// #[tokio::main]
415     /// async fn main() -> Result<(), Box<dyn Error>> {
416     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
417     ///
418     ///     loop {
419     ///         let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
420     ///
421     ///         if ready.is_readable() {
422     ///             let mut data = vec![0; 1024];
423     ///             // Try to read data, this may still fail with `WouldBlock`
424     ///             // if the readiness event is a false positive.
425     ///             match stream.try_read(&mut data) {
426     ///                 Ok(n) => {
427     ///                     println!("read {} bytes", n);
428     ///                 }
429     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
430     ///                     continue;
431     ///                 }
432     ///                 Err(e) => {
433     ///                     return Err(e.into());
434     ///                 }
435     ///             }
436     ///
437     ///         }
438     ///
439     ///         if ready.is_writable() {
440     ///             // Try to write data, this may still fail with `WouldBlock`
441     ///             // if the readiness event is a false positive.
442     ///             match stream.try_write(b"hello world") {
443     ///                 Ok(n) => {
444     ///                     println!("write {} bytes", n);
445     ///                 }
446     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
447     ///                     continue
448     ///                 }
449     ///                 Err(e) => {
450     ///                     return Err(e.into());
451     ///                 }
452     ///             }
453     ///         }
454     ///     }
455     /// }
456     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>457     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
458         let event = self.io.registration().readiness(interest).await?;
459         Ok(event.ready)
460     }
461 
462     /// Waits for the socket to become readable.
463     ///
464     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
465     /// paired with `try_read()`.
466     ///
467     /// # Cancel safety
468     ///
469     /// This method is cancel safe. Once a readiness event occurs, the method
470     /// will continue to return immediately until the readiness event is
471     /// consumed by an attempt to read that fails with `WouldBlock` or
472     /// `Poll::Pending`.
473     ///
474     /// # Examples
475     ///
476     /// ```no_run
477     /// use tokio::net::TcpStream;
478     /// use std::error::Error;
479     /// use std::io;
480     ///
481     /// #[tokio::main]
482     /// async fn main() -> Result<(), Box<dyn Error>> {
483     ///     // Connect to a peer
484     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
485     ///
486     ///     let mut msg = vec![0; 1024];
487     ///
488     ///     loop {
489     ///         // Wait for the socket to be readable
490     ///         stream.readable().await?;
491     ///
492     ///         // Try to read data, this may still fail with `WouldBlock`
493     ///         // if the readiness event is a false positive.
494     ///         match stream.try_read(&mut msg) {
495     ///             Ok(n) => {
496     ///                 msg.truncate(n);
497     ///                 break;
498     ///             }
499     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
500     ///                 continue;
501     ///             }
502     ///             Err(e) => {
503     ///                 return Err(e.into());
504     ///             }
505     ///         }
506     ///     }
507     ///
508     ///     println!("GOT = {:?}", msg);
509     ///     Ok(())
510     /// }
511     /// ```
readable(&self) -> io::Result<()>512     pub async fn readable(&self) -> io::Result<()> {
513         self.ready(Interest::READABLE).await?;
514         Ok(())
515     }
516 
517     /// Polls for read readiness.
518     ///
519     /// If the tcp stream is not currently ready for reading, this method will
520     /// store a clone of the `Waker` from the provided `Context`. When the tcp
521     /// stream becomes ready for reading, `Waker::wake` will be called on the
522     /// waker.
523     ///
524     /// Note that on multiple calls to `poll_read_ready`, `poll_read` or
525     /// `poll_peek`, only the `Waker` from the `Context` passed to the most
526     /// recent call is scheduled to receive a wakeup. (However,
527     /// `poll_write_ready` retains a second, independent waker.)
528     ///
529     /// This function is intended for cases where creating and pinning a future
530     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
531     /// preferred, as this supports polling from multiple tasks at once.
532     ///
533     /// # Return value
534     ///
535     /// The function returns:
536     ///
537     /// * `Poll::Pending` if the tcp stream is not ready for reading.
538     /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading.
539     /// * `Poll::Ready(Err(e))` if an error is encountered.
540     ///
541     /// # Errors
542     ///
543     /// This function may encounter any standard I/O error except `WouldBlock`.
544     ///
545     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>546     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
547         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
548     }
549 
550     /// Tries to read data from the stream into the provided buffer, returning how
551     /// many bytes were read.
552     ///
553     /// Receives any pending data from the socket but does not wait for new data
554     /// to arrive. On success, returns the number of bytes read. Because
555     /// `try_read()` is non-blocking, the buffer does not have to be stored by
556     /// the async task and can exist entirely on the stack.
557     ///
558     /// Usually, [`readable()`] or [`ready()`] is used with this function.
559     ///
560     /// [`readable()`]: TcpStream::readable()
561     /// [`ready()`]: TcpStream::ready()
562     ///
563     /// # Return
564     ///
565     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
566     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
567     ///
568     /// 1. The stream's read half is closed and will no longer yield data.
569     /// 2. The specified buffer was 0 bytes in length.
570     ///
571     /// If the stream is not ready to read data,
572     /// `Err(io::ErrorKind::WouldBlock)` is returned.
573     ///
574     /// # Examples
575     ///
576     /// ```no_run
577     /// use tokio::net::TcpStream;
578     /// use std::error::Error;
579     /// use std::io;
580     ///
581     /// #[tokio::main]
582     /// async fn main() -> Result<(), Box<dyn Error>> {
583     ///     // Connect to a peer
584     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
585     ///
586     ///     loop {
587     ///         // Wait for the socket to be readable
588     ///         stream.readable().await?;
589     ///
590     ///         // Creating the buffer **after** the `await` prevents it from
591     ///         // being stored in the async task.
592     ///         let mut buf = [0; 4096];
593     ///
594     ///         // Try to read data, this may still fail with `WouldBlock`
595     ///         // if the readiness event is a false positive.
596     ///         match stream.try_read(&mut buf) {
597     ///             Ok(0) => break,
598     ///             Ok(n) => {
599     ///                 println!("read {} bytes", n);
600     ///             }
601     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
602     ///                 continue;
603     ///             }
604     ///             Err(e) => {
605     ///                 return Err(e.into());
606     ///             }
607     ///         }
608     ///     }
609     ///
610     ///     Ok(())
611     /// }
612     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>613     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
614         use std::io::Read;
615 
616         self.io
617             .registration()
618             .try_io(Interest::READABLE, || (&*self.io).read(buf))
619     }
620 
621     /// Tries to read data from the stream into the provided buffers, returning
622     /// how many bytes were read.
623     ///
624     /// Data is copied to fill each buffer in order, with the final buffer
625     /// written to possibly being only partially filled. This method behaves
626     /// equivalently to a single call to [`try_read()`] with concatenated
627     /// buffers.
628     ///
629     /// Receives any pending data from the socket but does not wait for new data
630     /// to arrive. On success, returns the number of bytes read. Because
631     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
632     /// stored by the async task and can exist entirely on the stack.
633     ///
634     /// Usually, [`readable()`] or [`ready()`] is used with this function.
635     ///
636     /// [`try_read()`]: TcpStream::try_read()
637     /// [`readable()`]: TcpStream::readable()
638     /// [`ready()`]: TcpStream::ready()
639     ///
640     /// # Return
641     ///
642     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
643     /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
644     /// and will no longer yield data. If the stream is not ready to read data
645     /// `Err(io::ErrorKind::WouldBlock)` is returned.
646     ///
647     /// # Examples
648     ///
649     /// ```no_run
650     /// use tokio::net::TcpStream;
651     /// use std::error::Error;
652     /// use std::io::{self, IoSliceMut};
653     ///
654     /// #[tokio::main]
655     /// async fn main() -> Result<(), Box<dyn Error>> {
656     ///     // Connect to a peer
657     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
658     ///
659     ///     loop {
660     ///         // Wait for the socket to be readable
661     ///         stream.readable().await?;
662     ///
663     ///         // Creating the buffer **after** the `await` prevents it from
664     ///         // being stored in the async task.
665     ///         let mut buf_a = [0; 512];
666     ///         let mut buf_b = [0; 1024];
667     ///         let mut bufs = [
668     ///             IoSliceMut::new(&mut buf_a),
669     ///             IoSliceMut::new(&mut buf_b),
670     ///         ];
671     ///
672     ///         // Try to read data, this may still fail with `WouldBlock`
673     ///         // if the readiness event is a false positive.
674     ///         match stream.try_read_vectored(&mut bufs) {
675     ///             Ok(0) => break,
676     ///             Ok(n) => {
677     ///                 println!("read {} bytes", n);
678     ///             }
679     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
680     ///                 continue;
681     ///             }
682     ///             Err(e) => {
683     ///                 return Err(e.into());
684     ///             }
685     ///         }
686     ///     }
687     ///
688     ///     Ok(())
689     /// }
690     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>691     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
692         use std::io::Read;
693 
694         self.io
695             .registration()
696             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
697     }
698 
699     cfg_io_util! {
700         /// Tries to read data from the stream into the provided buffer, advancing the
701         /// buffer's internal cursor, returning how many bytes were read.
702         ///
703         /// Receives any pending data from the socket but does not wait for new data
704         /// to arrive. On success, returns the number of bytes read. Because
705         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
706         /// the async task and can exist entirely on the stack.
707         ///
708         /// Usually, [`readable()`] or [`ready()`] is used with this function.
709         ///
710         /// [`readable()`]: TcpStream::readable()
711         /// [`ready()`]: TcpStream::ready()
712         ///
713         /// # Return
714         ///
715         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
716         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
717         /// and will no longer yield data. If the stream is not ready to read data
718         /// `Err(io::ErrorKind::WouldBlock)` is returned.
719         ///
720         /// # Examples
721         ///
722         /// ```no_run
723         /// use tokio::net::TcpStream;
724         /// use std::error::Error;
725         /// use std::io;
726         ///
727         /// #[tokio::main]
728         /// async fn main() -> Result<(), Box<dyn Error>> {
729         ///     // Connect to a peer
730         ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
731         ///
732         ///     loop {
733         ///         // Wait for the socket to be readable
734         ///         stream.readable().await?;
735         ///
736         ///         let mut buf = Vec::with_capacity(4096);
737         ///
738         ///         // Try to read data, this may still fail with `WouldBlock`
739         ///         // if the readiness event is a false positive.
740         ///         match stream.try_read_buf(&mut buf) {
741         ///             Ok(0) => break,
742         ///             Ok(n) => {
743         ///                 println!("read {} bytes", n);
744         ///             }
745         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
746         ///                 continue;
747         ///             }
748         ///             Err(e) => {
749         ///                 return Err(e.into());
750         ///             }
751         ///         }
752         ///     }
753         ///
754         ///     Ok(())
755         /// }
756         /// ```
757         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
758             self.io.registration().try_io(Interest::READABLE, || {
759                 use std::io::Read;
760 
761                 let dst = buf.chunk_mut();
762                 let dst =
763                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
764 
765                 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
766                 // buffer.
767                 let n = (&*self.io).read(dst)?;
768 
769                 unsafe {
770                     buf.advance_mut(n);
771                 }
772 
773                 Ok(n)
774             })
775         }
776     }
777 
778     /// Waits for the socket to become writable.
779     ///
780     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
781     /// paired with `try_write()`.
782     ///
783     /// # Cancel safety
784     ///
785     /// This method is cancel safe. Once a readiness event occurs, the method
786     /// will continue to return immediately until the readiness event is
787     /// consumed by an attempt to write that fails with `WouldBlock` or
788     /// `Poll::Pending`.
789     ///
790     /// # Examples
791     ///
792     /// ```no_run
793     /// use tokio::net::TcpStream;
794     /// use std::error::Error;
795     /// use std::io;
796     ///
797     /// #[tokio::main]
798     /// async fn main() -> Result<(), Box<dyn Error>> {
799     ///     // Connect to a peer
800     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
801     ///
802     ///     loop {
803     ///         // Wait for the socket to be writable
804     ///         stream.writable().await?;
805     ///
806     ///         // Try to write data, this may still fail with `WouldBlock`
807     ///         // if the readiness event is a false positive.
808     ///         match stream.try_write(b"hello world") {
809     ///             Ok(n) => {
810     ///                 break;
811     ///             }
812     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
813     ///                 continue;
814     ///             }
815     ///             Err(e) => {
816     ///                 return Err(e.into());
817     ///             }
818     ///         }
819     ///     }
820     ///
821     ///     Ok(())
822     /// }
823     /// ```
writable(&self) -> io::Result<()>824     pub async fn writable(&self) -> io::Result<()> {
825         self.ready(Interest::WRITABLE).await?;
826         Ok(())
827     }
828 
829     /// Polls for write readiness.
830     ///
831     /// If the tcp stream is not currently ready for writing, this method will
832     /// store a clone of the `Waker` from the provided `Context`. When the tcp
833     /// stream becomes ready for writing, `Waker::wake` will be called on the
834     /// waker.
835     ///
836     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
837     /// the `Waker` from the `Context` passed to the most recent call is
838     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
839     /// second, independent waker.)
840     ///
841     /// This function is intended for cases where creating and pinning a future
842     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
843     /// preferred, as this supports polling from multiple tasks at once.
844     ///
845     /// # Return value
846     ///
847     /// The function returns:
848     ///
849     /// * `Poll::Pending` if the tcp stream is not ready for writing.
850     /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing.
851     /// * `Poll::Ready(Err(e))` if an error is encountered.
852     ///
853     /// # Errors
854     ///
855     /// This function may encounter any standard I/O error except `WouldBlock`.
856     ///
857     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>858     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
859         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
860     }
861 
862     /// Try to write a buffer to the stream, returning how many bytes were
863     /// written.
864     ///
865     /// The function will attempt to write the entire contents of `buf`, but
866     /// only part of the buffer may be written.
867     ///
868     /// This function is usually paired with `writable()`.
869     ///
870     /// # Return
871     ///
872     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
873     /// number of bytes written. If the stream is not ready to write data,
874     /// `Err(io::ErrorKind::WouldBlock)` is returned.
875     ///
876     /// # Examples
877     ///
878     /// ```no_run
879     /// use tokio::net::TcpStream;
880     /// use std::error::Error;
881     /// use std::io;
882     ///
883     /// #[tokio::main]
884     /// async fn main() -> Result<(), Box<dyn Error>> {
885     ///     // Connect to a peer
886     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
887     ///
888     ///     loop {
889     ///         // Wait for the socket to be writable
890     ///         stream.writable().await?;
891     ///
892     ///         // Try to write data, this may still fail with `WouldBlock`
893     ///         // if the readiness event is a false positive.
894     ///         match stream.try_write(b"hello world") {
895     ///             Ok(n) => {
896     ///                 break;
897     ///             }
898     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
899     ///                 continue;
900     ///             }
901     ///             Err(e) => {
902     ///                 return Err(e.into());
903     ///             }
904     ///         }
905     ///     }
906     ///
907     ///     Ok(())
908     /// }
909     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>910     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
911         use std::io::Write;
912 
913         self.io
914             .registration()
915             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
916     }
917 
918     /// Tries to write several buffers to the stream, returning how many bytes
919     /// were written.
920     ///
921     /// Data is written from each buffer in order, with the final buffer read
922     /// from possible being only partially consumed. This method behaves
923     /// equivalently to a single call to [`try_write()`] with concatenated
924     /// buffers.
925     ///
926     /// This function is usually paired with `writable()`.
927     ///
928     /// [`try_write()`]: TcpStream::try_write()
929     ///
930     /// # Return
931     ///
932     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
933     /// number of bytes written. If the stream is not ready to write data,
934     /// `Err(io::ErrorKind::WouldBlock)` is returned.
935     ///
936     /// # Examples
937     ///
938     /// ```no_run
939     /// use tokio::net::TcpStream;
940     /// use std::error::Error;
941     /// use std::io;
942     ///
943     /// #[tokio::main]
944     /// async fn main() -> Result<(), Box<dyn Error>> {
945     ///     // Connect to a peer
946     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
947     ///
948     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
949     ///
950     ///     loop {
951     ///         // Wait for the socket to be writable
952     ///         stream.writable().await?;
953     ///
954     ///         // Try to write data, this may still fail with `WouldBlock`
955     ///         // if the readiness event is a false positive.
956     ///         match stream.try_write_vectored(&bufs) {
957     ///             Ok(n) => {
958     ///                 break;
959     ///             }
960     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
961     ///                 continue;
962     ///             }
963     ///             Err(e) => {
964     ///                 return Err(e.into());
965     ///             }
966     ///         }
967     ///     }
968     ///
969     ///     Ok(())
970     /// }
971     /// ```
try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize>972     pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
973         use std::io::Write;
974 
975         self.io
976             .registration()
977             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
978     }
979 
980     /// Tries to read or write from the socket using a user-provided IO operation.
981     ///
982     /// If the socket is ready, the provided closure is called. The closure
983     /// should attempt to perform IO operation on the socket by manually
984     /// calling the appropriate syscall. If the operation fails because the
985     /// socket is not actually ready, then the closure should return a
986     /// `WouldBlock` error and the readiness flag is cleared. The return value
987     /// of the closure is then returned by `try_io`.
988     ///
989     /// If the socket is not ready, then the closure is not called
990     /// and a `WouldBlock` error is returned.
991     ///
992     /// The closure should only return a `WouldBlock` error if it has performed
993     /// an IO operation on the socket that failed due to the socket not being
994     /// ready. Returning a `WouldBlock` error in any other situation will
995     /// incorrectly clear the readiness flag, which can cause the socket to
996     /// behave incorrectly.
997     ///
998     /// The closure should not perform the IO operation using any of the methods
999     /// defined on the Tokio `TcpStream` type, as this will mess with the
1000     /// readiness flag and can cause the socket to behave incorrectly.
1001     ///
1002     /// This method is not intended to be used with combined interests.
1003     /// The closure should perform only one type of IO operation, so it should not
1004     /// require more than one ready state. This method may panic or sleep forever
1005     /// if it is called with a combined interest.
1006     ///
1007     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1008     ///
1009     /// [`readable()`]: TcpStream::readable()
1010     /// [`writable()`]: TcpStream::writable()
1011     /// [`ready()`]: TcpStream::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1012     pub fn try_io<R>(
1013         &self,
1014         interest: Interest,
1015         f: impl FnOnce() -> io::Result<R>,
1016     ) -> io::Result<R> {
1017         self.io
1018             .registration()
1019             .try_io(interest, || self.io.try_io(f))
1020     }
1021 
1022     /// Reads or writes from the socket using a user-provided IO operation.
1023     ///
1024     /// The readiness of the socket is awaited and when the socket is ready,
1025     /// the provided closure is called. The closure should attempt to perform
1026     /// IO operation on the socket by manually calling the appropriate syscall.
1027     /// If the operation fails because the socket is not actually ready,
1028     /// then the closure should return a `WouldBlock` error. In such case the
1029     /// readiness flag is cleared and the socket readiness is awaited again.
1030     /// This loop is repeated until the closure returns an `Ok` or an error
1031     /// other than `WouldBlock`.
1032     ///
1033     /// The closure should only return a `WouldBlock` error if it has performed
1034     /// an IO operation on the socket that failed due to the socket not being
1035     /// ready. Returning a `WouldBlock` error in any other situation will
1036     /// incorrectly clear the readiness flag, which can cause the socket to
1037     /// behave incorrectly.
1038     ///
1039     /// The closure should not perform the IO operation using any of the methods
1040     /// defined on the Tokio `TcpStream` type, as this will mess with the
1041     /// readiness flag and can cause the socket to behave incorrectly.
1042     ///
1043     /// This method is not intended to be used with combined interests.
1044     /// The closure should perform only one type of IO operation, so it should not
1045     /// require more than one ready state. This method may panic or sleep forever
1046     /// if it is called with a combined interest.
async_io<R>( &self, interest: Interest, mut f: impl FnMut() -> io::Result<R>, ) -> io::Result<R>1047     pub async fn async_io<R>(
1048         &self,
1049         interest: Interest,
1050         mut f: impl FnMut() -> io::Result<R>,
1051     ) -> io::Result<R> {
1052         self.io
1053             .registration()
1054             .async_io(interest, || self.io.try_io(&mut f))
1055             .await
1056     }
1057 
1058     /// Receives data on the socket from the remote address to which it is
1059     /// connected, without removing that data from the queue. On success,
1060     /// returns the number of bytes peeked.
1061     ///
1062     /// Successive calls return the same data. This is accomplished by passing
1063     /// `MSG_PEEK` as a flag to the underlying `recv` system call.
1064     ///
1065     /// # Examples
1066     ///
1067     /// ```no_run
1068     /// use tokio::net::TcpStream;
1069     /// use tokio::io::AsyncReadExt;
1070     /// use std::error::Error;
1071     ///
1072     /// #[tokio::main]
1073     /// async fn main() -> Result<(), Box<dyn Error>> {
1074     ///     // Connect to a peer
1075     ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
1076     ///
1077     ///     let mut b1 = [0; 10];
1078     ///     let mut b2 = [0; 10];
1079     ///
1080     ///     // Peek at the data
1081     ///     let n = stream.peek(&mut b1).await?;
1082     ///
1083     ///     // Read the data
1084     ///     assert_eq!(n, stream.read(&mut b2[..n]).await?);
1085     ///     assert_eq!(&b1[..n], &b2[..n]);
1086     ///
1087     ///     Ok(())
1088     /// }
1089     /// ```
1090     ///
1091     /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
1092     ///
1093     /// [`read`]: fn@crate::io::AsyncReadExt::read
1094     /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
peek(&self, buf: &mut [u8]) -> io::Result<usize>1095     pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1096         self.io
1097             .registration()
1098             .async_io(Interest::READABLE, || self.io.peek(buf))
1099             .await
1100     }
1101 
1102     /// Shuts down the read, write, or both halves of this connection.
1103     ///
1104     /// This function will cause all pending and future I/O on the specified
1105     /// portions to return immediately with an appropriate value (see the
1106     /// documentation of `Shutdown`).
shutdown_std(&self, how: Shutdown) -> io::Result<()>1107     pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1108         self.io.shutdown(how)
1109     }
1110 
1111     /// Gets the value of the `TCP_NODELAY` option on this socket.
1112     ///
1113     /// For more information about this option, see [`set_nodelay`].
1114     ///
1115     /// [`set_nodelay`]: TcpStream::set_nodelay
1116     ///
1117     /// # Examples
1118     ///
1119     /// ```no_run
1120     /// use tokio::net::TcpStream;
1121     ///
1122     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1123     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1124     ///
1125     /// println!("{:?}", stream.nodelay()?);
1126     /// # Ok(())
1127     /// # }
1128     /// ```
nodelay(&self) -> io::Result<bool>1129     pub fn nodelay(&self) -> io::Result<bool> {
1130         self.io.nodelay()
1131     }
1132 
1133     /// Sets the value of the `TCP_NODELAY` option on this socket.
1134     ///
1135     /// If set, this option disables the Nagle algorithm. This means that
1136     /// segments are always sent as soon as possible, even if there is only a
1137     /// small amount of data. When not set, data is buffered until there is a
1138     /// sufficient amount to send out, thereby avoiding the frequent sending of
1139     /// small packets.
1140     ///
1141     /// # Examples
1142     ///
1143     /// ```no_run
1144     /// use tokio::net::TcpStream;
1145     ///
1146     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1147     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1148     ///
1149     /// stream.set_nodelay(true)?;
1150     /// # Ok(())
1151     /// # }
1152     /// ```
set_nodelay(&self, nodelay: bool) -> io::Result<()>1153     pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
1154         self.io.set_nodelay(nodelay)
1155     }
1156 
1157     cfg_not_wasi! {
1158         /// Reads the linger duration for this socket by getting the `SO_LINGER`
1159         /// option.
1160         ///
1161         /// For more information about this option, see [`set_linger`].
1162         ///
1163         /// [`set_linger`]: TcpStream::set_linger
1164         ///
1165         /// # Examples
1166         ///
1167         /// ```no_run
1168         /// use tokio::net::TcpStream;
1169         ///
1170         /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1171         /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1172         ///
1173         /// println!("{:?}", stream.linger()?);
1174         /// # Ok(())
1175         /// # }
1176         /// ```
1177         pub fn linger(&self) -> io::Result<Option<Duration>> {
1178             socket2::SockRef::from(self).linger()
1179         }
1180 
1181         /// Sets the linger duration of this socket by setting the `SO_LINGER` option.
1182         ///
1183         /// This option controls the action taken when a stream has unsent messages and the stream is
1184         /// closed. If `SO_LINGER` is set, the system shall block the process until it can transmit the
1185         /// data or until the time expires.
1186         ///
1187         /// If `SO_LINGER` is not specified, and the stream is closed, the system handles the call in a
1188         /// way that allows the process to continue as quickly as possible.
1189         ///
1190         /// # Examples
1191         ///
1192         /// ```no_run
1193         /// use tokio::net::TcpStream;
1194         ///
1195         /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1196         /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1197         ///
1198         /// stream.set_linger(None)?;
1199         /// # Ok(())
1200         /// # }
1201         /// ```
1202         pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
1203             socket2::SockRef::from(self).set_linger(dur)
1204         }
1205     }
1206 
1207     /// Gets the value of the `IP_TTL` option for this socket.
1208     ///
1209     /// For more information about this option, see [`set_ttl`].
1210     ///
1211     /// [`set_ttl`]: TcpStream::set_ttl
1212     ///
1213     /// # Examples
1214     ///
1215     /// ```no_run
1216     /// use tokio::net::TcpStream;
1217     ///
1218     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1219     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1220     ///
1221     /// println!("{:?}", stream.ttl()?);
1222     /// # Ok(())
1223     /// # }
1224     /// ```
ttl(&self) -> io::Result<u32>1225     pub fn ttl(&self) -> io::Result<u32> {
1226         self.io.ttl()
1227     }
1228 
1229     /// Sets the value for the `IP_TTL` option on this socket.
1230     ///
1231     /// This value sets the time-to-live field that is used in every packet sent
1232     /// from this socket.
1233     ///
1234     /// # Examples
1235     ///
1236     /// ```no_run
1237     /// use tokio::net::TcpStream;
1238     ///
1239     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1240     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1241     ///
1242     /// stream.set_ttl(123)?;
1243     /// # Ok(())
1244     /// # }
1245     /// ```
set_ttl(&self, ttl: u32) -> io::Result<()>1246     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1247         self.io.set_ttl(ttl)
1248     }
1249 
1250     // These lifetime markers also appear in the generated documentation, and make
1251     // it more clear that this is a *borrowed* split.
1252     #[allow(clippy::needless_lifetimes)]
1253     /// Splits a `TcpStream` into a read half and a write half, which can be used
1254     /// to read and write the stream concurrently.
1255     ///
1256     /// This method is more efficient than [`into_split`], but the halves cannot be
1257     /// moved into independently spawned tasks.
1258     ///
1259     /// [`into_split`]: TcpStream::into_split()
split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>)1260     pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
1261         split(self)
1262     }
1263 
1264     /// Splits a `TcpStream` into a read half and a write half, which can be used
1265     /// to read and write the stream concurrently.
1266     ///
1267     /// Unlike [`split`], the owned halves can be moved to separate tasks, however
1268     /// this comes at the cost of a heap allocation.
1269     ///
1270     /// **Note:** Dropping the write half will shut down the write half of the TCP
1271     /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.
1272     ///
1273     /// [`split`]: TcpStream::split()
1274     /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
into_split(self) -> (OwnedReadHalf, OwnedWriteHalf)1275     pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
1276         split_owned(self)
1277     }
1278 
1279     // == Poll IO functions that takes `&self` ==
1280     //
1281     // To read or write without mutable access to the `TcpStream`, combine the
1282     // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1283     // `try_write` methods.
1284 
poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1285     pub(crate) fn poll_read_priv(
1286         &self,
1287         cx: &mut Context<'_>,
1288         buf: &mut ReadBuf<'_>,
1289     ) -> Poll<io::Result<()>> {
1290         // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
1291         unsafe { self.io.poll_read(cx, buf) }
1292     }
1293 
poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1294     pub(super) fn poll_write_priv(
1295         &self,
1296         cx: &mut Context<'_>,
1297         buf: &[u8],
1298     ) -> Poll<io::Result<usize>> {
1299         self.io.poll_write(cx, buf)
1300     }
1301 
poll_write_vectored_priv( &self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1302     pub(super) fn poll_write_vectored_priv(
1303         &self,
1304         cx: &mut Context<'_>,
1305         bufs: &[io::IoSlice<'_>],
1306     ) -> Poll<io::Result<usize>> {
1307         self.io.poll_write_vectored(cx, bufs)
1308     }
1309 }
1310 
1311 impl TryFrom<std::net::TcpStream> for TcpStream {
1312     type Error = io::Error;
1313 
1314     /// Consumes stream, returning the tokio I/O object.
1315     ///
1316     /// This is equivalent to
1317     /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error>1318     fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
1319         Self::from_std(stream)
1320     }
1321 }
1322 
1323 // ===== impl Read / Write =====
1324 
1325 impl AsyncRead for TcpStream {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1326     fn poll_read(
1327         self: Pin<&mut Self>,
1328         cx: &mut Context<'_>,
1329         buf: &mut ReadBuf<'_>,
1330     ) -> Poll<io::Result<()>> {
1331         self.poll_read_priv(cx, buf)
1332     }
1333 }
1334 
1335 impl AsyncWrite for TcpStream {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1336     fn poll_write(
1337         self: Pin<&mut Self>,
1338         cx: &mut Context<'_>,
1339         buf: &[u8],
1340     ) -> Poll<io::Result<usize>> {
1341         self.poll_write_priv(cx, buf)
1342     }
1343 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1344     fn poll_write_vectored(
1345         self: Pin<&mut Self>,
1346         cx: &mut Context<'_>,
1347         bufs: &[io::IoSlice<'_>],
1348     ) -> Poll<io::Result<usize>> {
1349         self.poll_write_vectored_priv(cx, bufs)
1350     }
1351 
is_write_vectored(&self) -> bool1352     fn is_write_vectored(&self) -> bool {
1353         true
1354     }
1355 
1356     #[inline]
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1357     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1358         // tcp flush is a no-op
1359         Poll::Ready(Ok(()))
1360     }
1361 
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1362     fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1363         self.shutdown_std(std::net::Shutdown::Write)?;
1364         Poll::Ready(Ok(()))
1365     }
1366 }
1367 
1368 impl fmt::Debug for TcpStream {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1369     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1370         self.io.fmt(f)
1371     }
1372 }
1373 
1374 #[cfg(unix)]
1375 mod sys {
1376     use super::TcpStream;
1377     use std::os::unix::prelude::*;
1378 
1379     impl AsRawFd for TcpStream {
as_raw_fd(&self) -> RawFd1380         fn as_raw_fd(&self) -> RawFd {
1381             self.io.as_raw_fd()
1382         }
1383     }
1384 
1385     impl AsFd for TcpStream {
as_fd(&self) -> BorrowedFd<'_>1386         fn as_fd(&self) -> BorrowedFd<'_> {
1387             unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1388         }
1389     }
1390 }
1391 
1392 cfg_windows! {
1393     use crate::os::windows::io::{AsRawSocket, RawSocket, AsSocket, BorrowedSocket};
1394 
1395     impl AsRawSocket for TcpStream {
1396         fn as_raw_socket(&self) -> RawSocket {
1397             self.io.as_raw_socket()
1398         }
1399     }
1400 
1401     impl AsSocket for TcpStream {
1402         fn as_socket(&self) -> BorrowedSocket<'_> {
1403             unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
1404         }
1405     }
1406 }
1407 
1408 #[cfg(all(tokio_unstable, target_os = "wasi"))]
1409 mod sys {
1410     use super::TcpStream;
1411     use std::os::wasi::prelude::*;
1412 
1413     impl AsRawFd for TcpStream {
as_raw_fd(&self) -> RawFd1414         fn as_raw_fd(&self) -> RawFd {
1415             self.io.as_raw_fd()
1416         }
1417     }
1418 
1419     impl AsFd for TcpStream {
as_fd(&self) -> BorrowedFd<'_>1420         fn as_fd(&self) -> BorrowedFd<'_> {
1421             unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1422         }
1423     }
1424 }
1425