1 cfg_not_wasi! { 2 use crate::net::{to_socket_addrs, ToSocketAddrs}; 3 use std::future::poll_fn; 4 use std::time::Duration; 5 } 6 7 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; 8 use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; 9 use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; 10 11 use std::fmt; 12 use std::io; 13 use std::net::{Shutdown, SocketAddr}; 14 use std::pin::Pin; 15 use std::task::{ready, Context, Poll}; 16 17 cfg_io_util! { 18 use bytes::BufMut; 19 } 20 21 cfg_net! { 22 /// A TCP stream between a local and a remote socket. 23 /// 24 /// A TCP stream can either be created by connecting to an endpoint, via the 25 /// [`connect`] method, or by [accepting] a connection from a [listener]. A 26 /// TCP stream can also be created via the [`TcpSocket`] type. 27 /// 28 /// Reading and writing to a `TcpStream` is usually done using the 29 /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] 30 /// traits. 31 /// 32 /// [`connect`]: method@TcpStream::connect 33 /// [accepting]: method@crate::net::TcpListener::accept 34 /// [listener]: struct@crate::net::TcpListener 35 /// [`TcpSocket`]: struct@crate::net::TcpSocket 36 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt 37 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt 38 /// 39 /// # Examples 40 /// 41 /// ```no_run 42 /// use tokio::net::TcpStream; 43 /// use tokio::io::AsyncWriteExt; 44 /// use std::error::Error; 45 /// 46 /// #[tokio::main] 47 /// async fn main() -> Result<(), Box<dyn Error>> { 48 /// // Connect to a peer 49 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; 50 /// 51 /// // Write some data. 52 /// stream.write_all(b"hello world!").await?; 53 /// 54 /// Ok(()) 55 /// } 56 /// ``` 57 /// 58 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. 59 /// 60 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all 61 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt 62 /// 63 /// To shut down the stream in the write direction, you can call the 64 /// [`shutdown()`] method. This will cause the other peer to receive a read of 65 /// length 0, indicating that no more data will be sent. This only closes 66 /// the stream in one direction. 67 /// 68 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown 69 pub struct TcpStream { 70 io: PollEvented<mio::net::TcpStream>, 71 } 72 } 73 74 impl TcpStream { 75 cfg_not_wasi! { 76 /// Opens a TCP connection to a remote host. 77 /// 78 /// `addr` is an address of the remote host. Anything which implements the 79 /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr` 80 /// yields multiple addresses, connect will be attempted with each of the 81 /// addresses until a connection is successful. If none of the addresses 82 /// result in a successful connection, the error returned from the last 83 /// connection attempt (the last address) is returned. 84 /// 85 /// To configure the socket before connecting, you can use the [`TcpSocket`] 86 /// type. 87 /// 88 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs 89 /// [`TcpSocket`]: struct@crate::net::TcpSocket 90 /// 91 /// # Examples 92 /// 93 /// ```no_run 94 /// use tokio::net::TcpStream; 95 /// use tokio::io::AsyncWriteExt; 96 /// use std::error::Error; 97 /// 98 /// #[tokio::main] 99 /// async fn main() -> Result<(), Box<dyn Error>> { 100 /// // Connect to a peer 101 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; 102 /// 103 /// // Write some data. 104 /// stream.write_all(b"hello world!").await?; 105 /// 106 /// Ok(()) 107 /// } 108 /// ``` 109 /// 110 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. 111 /// 112 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all 113 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt 114 pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> { 115 let addrs = to_socket_addrs(addr).await?; 116 117 let mut last_err = None; 118 119 for addr in addrs { 120 match TcpStream::connect_addr(addr).await { 121 Ok(stream) => return Ok(stream), 122 Err(e) => last_err = Some(e), 123 } 124 } 125 126 Err(last_err.unwrap_or_else(|| { 127 io::Error::new( 128 io::ErrorKind::InvalidInput, 129 "could not resolve to any address", 130 ) 131 })) 132 } 133 134 /// Establishes a connection to the specified `addr`. 135 async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> { 136 let sys = mio::net::TcpStream::connect(addr)?; 137 TcpStream::connect_mio(sys).await 138 } 139 140 pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> { 141 let stream = TcpStream::new(sys)?; 142 143 // Once we've connected, wait for the stream to be writable as 144 // that's when the actual connection has been initiated. Once we're 145 // writable we check for `take_socket_error` to see if the connect 146 // actually hit an error or not. 147 // 148 // If all that succeeded then we ship everything on up. 149 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; 150 151 if let Some(e) = stream.io.take_error()? { 152 return Err(e); 153 } 154 155 Ok(stream) 156 } 157 } 158 new(connected: mio::net::TcpStream) -> io::Result<TcpStream>159 pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> { 160 let io = PollEvented::new(connected)?; 161 Ok(TcpStream { io }) 162 } 163 164 /// Creates new `TcpStream` from a `std::net::TcpStream`. 165 /// 166 /// This function is intended to be used to wrap a TCP stream from the 167 /// standard library in the Tokio equivalent. 168 /// 169 /// # Notes 170 /// 171 /// The caller is responsible for ensuring that the stream is in 172 /// non-blocking mode. Otherwise all I/O operations on the stream 173 /// will block the thread, which will cause unexpected behavior. 174 /// Non-blocking mode can be set using [`set_nonblocking`]. 175 /// 176 /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking 177 /// 178 /// # Examples 179 /// 180 /// ```rust,no_run 181 /// use std::error::Error; 182 /// use tokio::net::TcpStream; 183 /// 184 /// #[tokio::main] 185 /// async fn main() -> Result<(), Box<dyn Error>> { 186 /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?; 187 /// std_stream.set_nonblocking(true)?; 188 /// let stream = TcpStream::from_std(std_stream)?; 189 /// Ok(()) 190 /// } 191 /// ``` 192 /// 193 /// # Panics 194 /// 195 /// This function panics if it is not called from within a runtime with 196 /// IO enabled. 197 /// 198 /// The runtime is usually set implicitly when this function is called 199 /// from a future driven by a tokio runtime, otherwise runtime can be set 200 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. 201 #[track_caller] from_std(stream: std::net::TcpStream) -> io::Result<TcpStream>202 pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> { 203 let io = mio::net::TcpStream::from_std(stream); 204 let io = PollEvented::new(io)?; 205 Ok(TcpStream { io }) 206 } 207 208 /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`]. 209 /// 210 /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`. 211 /// Use [`set_nonblocking`] to change the blocking mode if needed. 212 /// 213 /// # Examples 214 /// 215 /// ``` 216 /// use std::error::Error; 217 /// use std::io::Read; 218 /// use tokio::net::TcpListener; 219 /// # use tokio::net::TcpStream; 220 /// # use tokio::io::AsyncWriteExt; 221 /// 222 /// #[tokio::main] 223 /// async fn main() -> Result<(), Box<dyn Error>> { 224 /// let mut data = [0u8; 12]; 225 /// # if false { 226 /// let listener = TcpListener::bind("127.0.0.1:34254").await?; 227 /// # } 228 /// # let listener = TcpListener::bind("127.0.0.1:0").await?; 229 /// # let addr = listener.local_addr().unwrap(); 230 /// # let handle = tokio::spawn(async move { 231 /// # let mut stream: TcpStream = TcpStream::connect(addr).await.unwrap(); 232 /// # stream.write_all(b"Hello world!").await.unwrap(); 233 /// # }); 234 /// let (tokio_tcp_stream, _) = listener.accept().await?; 235 /// let mut std_tcp_stream = tokio_tcp_stream.into_std()?; 236 /// # handle.await.expect("The task being joined has panicked"); 237 /// std_tcp_stream.set_nonblocking(false)?; 238 /// std_tcp_stream.read_exact(&mut data)?; 239 /// # assert_eq!(b"Hello world!", &data); 240 /// Ok(()) 241 /// } 242 /// ``` 243 /// [`tokio::net::TcpStream`]: TcpStream 244 /// [`std::net::TcpStream`]: std::net::TcpStream 245 /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking into_std(self) -> io::Result<std::net::TcpStream>246 pub fn into_std(self) -> io::Result<std::net::TcpStream> { 247 #[cfg(unix)] 248 { 249 use std::os::unix::io::{FromRawFd, IntoRawFd}; 250 self.io 251 .into_inner() 252 .map(IntoRawFd::into_raw_fd) 253 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }) 254 } 255 256 #[cfg(windows)] 257 { 258 use std::os::windows::io::{FromRawSocket, IntoRawSocket}; 259 self.io 260 .into_inner() 261 .map(|io| io.into_raw_socket()) 262 .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) }) 263 } 264 265 #[cfg(target_os = "wasi")] 266 { 267 use std::os::wasi::io::{FromRawFd, IntoRawFd}; 268 self.io 269 .into_inner() 270 .map(|io| io.into_raw_fd()) 271 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }) 272 } 273 } 274 275 /// Returns the local address that this stream is bound to. 276 /// 277 /// # Examples 278 /// 279 /// ```no_run 280 /// use tokio::net::TcpStream; 281 /// 282 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 283 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 284 /// 285 /// println!("{:?}", stream.local_addr()?); 286 /// # Ok(()) 287 /// # } 288 /// ``` local_addr(&self) -> io::Result<SocketAddr>289 pub fn local_addr(&self) -> io::Result<SocketAddr> { 290 self.io.local_addr() 291 } 292 293 /// Returns the value of the `SO_ERROR` option. take_error(&self) -> io::Result<Option<io::Error>>294 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 295 self.io.take_error() 296 } 297 298 /// Returns the remote address that this stream is connected to. 299 /// 300 /// # Examples 301 /// 302 /// ```no_run 303 /// use tokio::net::TcpStream; 304 /// 305 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 306 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 307 /// 308 /// println!("{:?}", stream.peer_addr()?); 309 /// # Ok(()) 310 /// # } 311 /// ``` peer_addr(&self) -> io::Result<SocketAddr>312 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 313 self.io.peer_addr() 314 } 315 316 /// Attempts to receive data on the socket, without removing that data from 317 /// the queue, registering the current task for wakeup if data is not yet 318 /// available. 319 /// 320 /// Note that on multiple calls to `poll_peek`, `poll_read` or 321 /// `poll_read_ready`, only the `Waker` from the `Context` passed to the 322 /// most recent call is scheduled to receive a wakeup. (However, 323 /// `poll_write` retains a second, independent waker.) 324 /// 325 /// # Return value 326 /// 327 /// The function returns: 328 /// 329 /// * `Poll::Pending` if data is not yet available. 330 /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked. 331 /// * `Poll::Ready(Err(e))` if an error is encountered. 332 /// 333 /// # Errors 334 /// 335 /// This function may encounter any standard I/O error except `WouldBlock`. 336 /// 337 /// # Examples 338 /// 339 /// ```no_run 340 /// use tokio::io::{self, ReadBuf}; 341 /// use tokio::net::TcpStream; 342 /// 343 /// use std::future::poll_fn; 344 /// 345 /// #[tokio::main] 346 /// async fn main() -> io::Result<()> { 347 /// let stream = TcpStream::connect("127.0.0.1:8000").await?; 348 /// let mut buf = [0; 10]; 349 /// let mut buf = ReadBuf::new(&mut buf); 350 /// 351 /// poll_fn(|cx| { 352 /// stream.poll_peek(cx, &mut buf) 353 /// }).await?; 354 /// 355 /// Ok(()) 356 /// } 357 /// ``` poll_peek( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<usize>>358 pub fn poll_peek( 359 &self, 360 cx: &mut Context<'_>, 361 buf: &mut ReadBuf<'_>, 362 ) -> Poll<io::Result<usize>> { 363 loop { 364 let ev = ready!(self.io.registration().poll_read_ready(cx))?; 365 366 let b = unsafe { 367 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) 368 }; 369 370 match self.io.peek(b) { 371 Ok(ret) => { 372 unsafe { buf.assume_init(ret) }; 373 buf.advance(ret); 374 return Poll::Ready(Ok(ret)); 375 } 376 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 377 self.io.registration().clear_readiness(ev); 378 } 379 Err(e) => return Poll::Ready(Err(e)), 380 } 381 } 382 } 383 384 /// Waits for any of the requested ready states. 385 /// 386 /// This function is usually paired with `try_read()` or `try_write()`. It 387 /// can be used to concurrently read / write to the same socket on a single 388 /// task without splitting the socket. 389 /// 390 /// The function may complete without the socket being ready. This is a 391 /// false-positive and attempting an operation will return with 392 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty 393 /// [`Ready`] set, so you should always check the returned value and possibly 394 /// wait again if the requested states are not set. 395 /// 396 /// # Cancel safety 397 /// 398 /// This method is cancel safe. Once a readiness event occurs, the method 399 /// will continue to return immediately until the readiness event is 400 /// consumed by an attempt to read or write that fails with `WouldBlock` or 401 /// `Poll::Pending`. 402 /// 403 /// # Examples 404 /// 405 /// Concurrently read and write to the stream on the same task without 406 /// splitting. 407 /// 408 /// ```no_run 409 /// use tokio::io::Interest; 410 /// use tokio::net::TcpStream; 411 /// use std::error::Error; 412 /// use std::io; 413 /// 414 /// #[tokio::main] 415 /// async fn main() -> Result<(), Box<dyn Error>> { 416 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 417 /// 418 /// loop { 419 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?; 420 /// 421 /// if ready.is_readable() { 422 /// let mut data = vec![0; 1024]; 423 /// // Try to read data, this may still fail with `WouldBlock` 424 /// // if the readiness event is a false positive. 425 /// match stream.try_read(&mut data) { 426 /// Ok(n) => { 427 /// println!("read {} bytes", n); 428 /// } 429 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 430 /// continue; 431 /// } 432 /// Err(e) => { 433 /// return Err(e.into()); 434 /// } 435 /// } 436 /// 437 /// } 438 /// 439 /// if ready.is_writable() { 440 /// // Try to write data, this may still fail with `WouldBlock` 441 /// // if the readiness event is a false positive. 442 /// match stream.try_write(b"hello world") { 443 /// Ok(n) => { 444 /// println!("write {} bytes", n); 445 /// } 446 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 447 /// continue 448 /// } 449 /// Err(e) => { 450 /// return Err(e.into()); 451 /// } 452 /// } 453 /// } 454 /// } 455 /// } 456 /// ``` ready(&self, interest: Interest) -> io::Result<Ready>457 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { 458 let event = self.io.registration().readiness(interest).await?; 459 Ok(event.ready) 460 } 461 462 /// Waits for the socket to become readable. 463 /// 464 /// This function is equivalent to `ready(Interest::READABLE)` and is usually 465 /// paired with `try_read()`. 466 /// 467 /// # Cancel safety 468 /// 469 /// This method is cancel safe. Once a readiness event occurs, the method 470 /// will continue to return immediately until the readiness event is 471 /// consumed by an attempt to read that fails with `WouldBlock` or 472 /// `Poll::Pending`. 473 /// 474 /// # Examples 475 /// 476 /// ```no_run 477 /// use tokio::net::TcpStream; 478 /// use std::error::Error; 479 /// use std::io; 480 /// 481 /// #[tokio::main] 482 /// async fn main() -> Result<(), Box<dyn Error>> { 483 /// // Connect to a peer 484 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 485 /// 486 /// let mut msg = vec![0; 1024]; 487 /// 488 /// loop { 489 /// // Wait for the socket to be readable 490 /// stream.readable().await?; 491 /// 492 /// // Try to read data, this may still fail with `WouldBlock` 493 /// // if the readiness event is a false positive. 494 /// match stream.try_read(&mut msg) { 495 /// Ok(n) => { 496 /// msg.truncate(n); 497 /// break; 498 /// } 499 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 500 /// continue; 501 /// } 502 /// Err(e) => { 503 /// return Err(e.into()); 504 /// } 505 /// } 506 /// } 507 /// 508 /// println!("GOT = {:?}", msg); 509 /// Ok(()) 510 /// } 511 /// ``` readable(&self) -> io::Result<()>512 pub async fn readable(&self) -> io::Result<()> { 513 self.ready(Interest::READABLE).await?; 514 Ok(()) 515 } 516 517 /// Polls for read readiness. 518 /// 519 /// If the tcp stream is not currently ready for reading, this method will 520 /// store a clone of the `Waker` from the provided `Context`. When the tcp 521 /// stream becomes ready for reading, `Waker::wake` will be called on the 522 /// waker. 523 /// 524 /// Note that on multiple calls to `poll_read_ready`, `poll_read` or 525 /// `poll_peek`, only the `Waker` from the `Context` passed to the most 526 /// recent call is scheduled to receive a wakeup. (However, 527 /// `poll_write_ready` retains a second, independent waker.) 528 /// 529 /// This function is intended for cases where creating and pinning a future 530 /// via [`readable`] is not feasible. Where possible, using [`readable`] is 531 /// preferred, as this supports polling from multiple tasks at once. 532 /// 533 /// # Return value 534 /// 535 /// The function returns: 536 /// 537 /// * `Poll::Pending` if the tcp stream is not ready for reading. 538 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading. 539 /// * `Poll::Ready(Err(e))` if an error is encountered. 540 /// 541 /// # Errors 542 /// 543 /// This function may encounter any standard I/O error except `WouldBlock`. 544 /// 545 /// [`readable`]: method@Self::readable poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>546 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 547 self.io.registration().poll_read_ready(cx).map_ok(|_| ()) 548 } 549 550 /// Tries to read data from the stream into the provided buffer, returning how 551 /// many bytes were read. 552 /// 553 /// Receives any pending data from the socket but does not wait for new data 554 /// to arrive. On success, returns the number of bytes read. Because 555 /// `try_read()` is non-blocking, the buffer does not have to be stored by 556 /// the async task and can exist entirely on the stack. 557 /// 558 /// Usually, [`readable()`] or [`ready()`] is used with this function. 559 /// 560 /// [`readable()`]: TcpStream::readable() 561 /// [`ready()`]: TcpStream::ready() 562 /// 563 /// # Return 564 /// 565 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 566 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: 567 /// 568 /// 1. The stream's read half is closed and will no longer yield data. 569 /// 2. The specified buffer was 0 bytes in length. 570 /// 571 /// If the stream is not ready to read data, 572 /// `Err(io::ErrorKind::WouldBlock)` is returned. 573 /// 574 /// # Examples 575 /// 576 /// ```no_run 577 /// use tokio::net::TcpStream; 578 /// use std::error::Error; 579 /// use std::io; 580 /// 581 /// #[tokio::main] 582 /// async fn main() -> Result<(), Box<dyn Error>> { 583 /// // Connect to a peer 584 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 585 /// 586 /// loop { 587 /// // Wait for the socket to be readable 588 /// stream.readable().await?; 589 /// 590 /// // Creating the buffer **after** the `await` prevents it from 591 /// // being stored in the async task. 592 /// let mut buf = [0; 4096]; 593 /// 594 /// // Try to read data, this may still fail with `WouldBlock` 595 /// // if the readiness event is a false positive. 596 /// match stream.try_read(&mut buf) { 597 /// Ok(0) => break, 598 /// Ok(n) => { 599 /// println!("read {} bytes", n); 600 /// } 601 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 602 /// continue; 603 /// } 604 /// Err(e) => { 605 /// return Err(e.into()); 606 /// } 607 /// } 608 /// } 609 /// 610 /// Ok(()) 611 /// } 612 /// ``` try_read(&self, buf: &mut [u8]) -> io::Result<usize>613 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { 614 use std::io::Read; 615 616 self.io 617 .registration() 618 .try_io(Interest::READABLE, || (&*self.io).read(buf)) 619 } 620 621 /// Tries to read data from the stream into the provided buffers, returning 622 /// how many bytes were read. 623 /// 624 /// Data is copied to fill each buffer in order, with the final buffer 625 /// written to possibly being only partially filled. This method behaves 626 /// equivalently to a single call to [`try_read()`] with concatenated 627 /// buffers. 628 /// 629 /// Receives any pending data from the socket but does not wait for new data 630 /// to arrive. On success, returns the number of bytes read. Because 631 /// `try_read_vectored()` is non-blocking, the buffer does not have to be 632 /// stored by the async task and can exist entirely on the stack. 633 /// 634 /// Usually, [`readable()`] or [`ready()`] is used with this function. 635 /// 636 /// [`try_read()`]: TcpStream::try_read() 637 /// [`readable()`]: TcpStream::readable() 638 /// [`ready()`]: TcpStream::ready() 639 /// 640 /// # Return 641 /// 642 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 643 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 644 /// and will no longer yield data. If the stream is not ready to read data 645 /// `Err(io::ErrorKind::WouldBlock)` is returned. 646 /// 647 /// # Examples 648 /// 649 /// ```no_run 650 /// use tokio::net::TcpStream; 651 /// use std::error::Error; 652 /// use std::io::{self, IoSliceMut}; 653 /// 654 /// #[tokio::main] 655 /// async fn main() -> Result<(), Box<dyn Error>> { 656 /// // Connect to a peer 657 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 658 /// 659 /// loop { 660 /// // Wait for the socket to be readable 661 /// stream.readable().await?; 662 /// 663 /// // Creating the buffer **after** the `await` prevents it from 664 /// // being stored in the async task. 665 /// let mut buf_a = [0; 512]; 666 /// let mut buf_b = [0; 1024]; 667 /// let mut bufs = [ 668 /// IoSliceMut::new(&mut buf_a), 669 /// IoSliceMut::new(&mut buf_b), 670 /// ]; 671 /// 672 /// // Try to read data, this may still fail with `WouldBlock` 673 /// // if the readiness event is a false positive. 674 /// match stream.try_read_vectored(&mut bufs) { 675 /// Ok(0) => break, 676 /// Ok(n) => { 677 /// println!("read {} bytes", n); 678 /// } 679 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 680 /// continue; 681 /// } 682 /// Err(e) => { 683 /// return Err(e.into()); 684 /// } 685 /// } 686 /// } 687 /// 688 /// Ok(()) 689 /// } 690 /// ``` try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>691 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { 692 use std::io::Read; 693 694 self.io 695 .registration() 696 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) 697 } 698 699 cfg_io_util! { 700 /// Tries to read data from the stream into the provided buffer, advancing the 701 /// buffer's internal cursor, returning how many bytes were read. 702 /// 703 /// Receives any pending data from the socket but does not wait for new data 704 /// to arrive. On success, returns the number of bytes read. Because 705 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by 706 /// the async task and can exist entirely on the stack. 707 /// 708 /// Usually, [`readable()`] or [`ready()`] is used with this function. 709 /// 710 /// [`readable()`]: TcpStream::readable() 711 /// [`ready()`]: TcpStream::ready() 712 /// 713 /// # Return 714 /// 715 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 716 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 717 /// and will no longer yield data. If the stream is not ready to read data 718 /// `Err(io::ErrorKind::WouldBlock)` is returned. 719 /// 720 /// # Examples 721 /// 722 /// ```no_run 723 /// use tokio::net::TcpStream; 724 /// use std::error::Error; 725 /// use std::io; 726 /// 727 /// #[tokio::main] 728 /// async fn main() -> Result<(), Box<dyn Error>> { 729 /// // Connect to a peer 730 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 731 /// 732 /// loop { 733 /// // Wait for the socket to be readable 734 /// stream.readable().await?; 735 /// 736 /// let mut buf = Vec::with_capacity(4096); 737 /// 738 /// // Try to read data, this may still fail with `WouldBlock` 739 /// // if the readiness event is a false positive. 740 /// match stream.try_read_buf(&mut buf) { 741 /// Ok(0) => break, 742 /// Ok(n) => { 743 /// println!("read {} bytes", n); 744 /// } 745 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 746 /// continue; 747 /// } 748 /// Err(e) => { 749 /// return Err(e.into()); 750 /// } 751 /// } 752 /// } 753 /// 754 /// Ok(()) 755 /// } 756 /// ``` 757 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { 758 self.io.registration().try_io(Interest::READABLE, || { 759 use std::io::Read; 760 761 let dst = buf.chunk_mut(); 762 let dst = 763 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 764 765 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the 766 // buffer. 767 let n = (&*self.io).read(dst)?; 768 769 unsafe { 770 buf.advance_mut(n); 771 } 772 773 Ok(n) 774 }) 775 } 776 } 777 778 /// Waits for the socket to become writable. 779 /// 780 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually 781 /// paired with `try_write()`. 782 /// 783 /// # Cancel safety 784 /// 785 /// This method is cancel safe. Once a readiness event occurs, the method 786 /// will continue to return immediately until the readiness event is 787 /// consumed by an attempt to write that fails with `WouldBlock` or 788 /// `Poll::Pending`. 789 /// 790 /// # Examples 791 /// 792 /// ```no_run 793 /// use tokio::net::TcpStream; 794 /// use std::error::Error; 795 /// use std::io; 796 /// 797 /// #[tokio::main] 798 /// async fn main() -> Result<(), Box<dyn Error>> { 799 /// // Connect to a peer 800 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 801 /// 802 /// loop { 803 /// // Wait for the socket to be writable 804 /// stream.writable().await?; 805 /// 806 /// // Try to write data, this may still fail with `WouldBlock` 807 /// // if the readiness event is a false positive. 808 /// match stream.try_write(b"hello world") { 809 /// Ok(n) => { 810 /// break; 811 /// } 812 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 813 /// continue; 814 /// } 815 /// Err(e) => { 816 /// return Err(e.into()); 817 /// } 818 /// } 819 /// } 820 /// 821 /// Ok(()) 822 /// } 823 /// ``` writable(&self) -> io::Result<()>824 pub async fn writable(&self) -> io::Result<()> { 825 self.ready(Interest::WRITABLE).await?; 826 Ok(()) 827 } 828 829 /// Polls for write readiness. 830 /// 831 /// If the tcp stream is not currently ready for writing, this method will 832 /// store a clone of the `Waker` from the provided `Context`. When the tcp 833 /// stream becomes ready for writing, `Waker::wake` will be called on the 834 /// waker. 835 /// 836 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only 837 /// the `Waker` from the `Context` passed to the most recent call is 838 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a 839 /// second, independent waker.) 840 /// 841 /// This function is intended for cases where creating and pinning a future 842 /// via [`writable`] is not feasible. Where possible, using [`writable`] is 843 /// preferred, as this supports polling from multiple tasks at once. 844 /// 845 /// # Return value 846 /// 847 /// The function returns: 848 /// 849 /// * `Poll::Pending` if the tcp stream is not ready for writing. 850 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing. 851 /// * `Poll::Ready(Err(e))` if an error is encountered. 852 /// 853 /// # Errors 854 /// 855 /// This function may encounter any standard I/O error except `WouldBlock`. 856 /// 857 /// [`writable`]: method@Self::writable poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>858 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 859 self.io.registration().poll_write_ready(cx).map_ok(|_| ()) 860 } 861 862 /// Try to write a buffer to the stream, returning how many bytes were 863 /// written. 864 /// 865 /// The function will attempt to write the entire contents of `buf`, but 866 /// only part of the buffer may be written. 867 /// 868 /// This function is usually paired with `writable()`. 869 /// 870 /// # Return 871 /// 872 /// If data is successfully written, `Ok(n)` is returned, where `n` is the 873 /// number of bytes written. If the stream is not ready to write data, 874 /// `Err(io::ErrorKind::WouldBlock)` is returned. 875 /// 876 /// # Examples 877 /// 878 /// ```no_run 879 /// use tokio::net::TcpStream; 880 /// use std::error::Error; 881 /// use std::io; 882 /// 883 /// #[tokio::main] 884 /// async fn main() -> Result<(), Box<dyn Error>> { 885 /// // Connect to a peer 886 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 887 /// 888 /// loop { 889 /// // Wait for the socket to be writable 890 /// stream.writable().await?; 891 /// 892 /// // Try to write data, this may still fail with `WouldBlock` 893 /// // if the readiness event is a false positive. 894 /// match stream.try_write(b"hello world") { 895 /// Ok(n) => { 896 /// break; 897 /// } 898 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 899 /// continue; 900 /// } 901 /// Err(e) => { 902 /// return Err(e.into()); 903 /// } 904 /// } 905 /// } 906 /// 907 /// Ok(()) 908 /// } 909 /// ``` try_write(&self, buf: &[u8]) -> io::Result<usize>910 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { 911 use std::io::Write; 912 913 self.io 914 .registration() 915 .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) 916 } 917 918 /// Tries to write several buffers to the stream, returning how many bytes 919 /// were written. 920 /// 921 /// Data is written from each buffer in order, with the final buffer read 922 /// from possible being only partially consumed. This method behaves 923 /// equivalently to a single call to [`try_write()`] with concatenated 924 /// buffers. 925 /// 926 /// This function is usually paired with `writable()`. 927 /// 928 /// [`try_write()`]: TcpStream::try_write() 929 /// 930 /// # Return 931 /// 932 /// If data is successfully written, `Ok(n)` is returned, where `n` is the 933 /// number of bytes written. If the stream is not ready to write data, 934 /// `Err(io::ErrorKind::WouldBlock)` is returned. 935 /// 936 /// # Examples 937 /// 938 /// ```no_run 939 /// use tokio::net::TcpStream; 940 /// use std::error::Error; 941 /// use std::io; 942 /// 943 /// #[tokio::main] 944 /// async fn main() -> Result<(), Box<dyn Error>> { 945 /// // Connect to a peer 946 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 947 /// 948 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; 949 /// 950 /// loop { 951 /// // Wait for the socket to be writable 952 /// stream.writable().await?; 953 /// 954 /// // Try to write data, this may still fail with `WouldBlock` 955 /// // if the readiness event is a false positive. 956 /// match stream.try_write_vectored(&bufs) { 957 /// Ok(n) => { 958 /// break; 959 /// } 960 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 961 /// continue; 962 /// } 963 /// Err(e) => { 964 /// return Err(e.into()); 965 /// } 966 /// } 967 /// } 968 /// 969 /// Ok(()) 970 /// } 971 /// ``` try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize>972 pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> { 973 use std::io::Write; 974 975 self.io 976 .registration() 977 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs)) 978 } 979 980 /// Tries to read or write from the socket using a user-provided IO operation. 981 /// 982 /// If the socket is ready, the provided closure is called. The closure 983 /// should attempt to perform IO operation on the socket by manually 984 /// calling the appropriate syscall. If the operation fails because the 985 /// socket is not actually ready, then the closure should return a 986 /// `WouldBlock` error and the readiness flag is cleared. The return value 987 /// of the closure is then returned by `try_io`. 988 /// 989 /// If the socket is not ready, then the closure is not called 990 /// and a `WouldBlock` error is returned. 991 /// 992 /// The closure should only return a `WouldBlock` error if it has performed 993 /// an IO operation on the socket that failed due to the socket not being 994 /// ready. Returning a `WouldBlock` error in any other situation will 995 /// incorrectly clear the readiness flag, which can cause the socket to 996 /// behave incorrectly. 997 /// 998 /// The closure should not perform the IO operation using any of the methods 999 /// defined on the Tokio `TcpStream` type, as this will mess with the 1000 /// readiness flag and can cause the socket to behave incorrectly. 1001 /// 1002 /// This method is not intended to be used with combined interests. 1003 /// The closure should perform only one type of IO operation, so it should not 1004 /// require more than one ready state. This method may panic or sleep forever 1005 /// if it is called with a combined interest. 1006 /// 1007 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. 1008 /// 1009 /// [`readable()`]: TcpStream::readable() 1010 /// [`writable()`]: TcpStream::writable() 1011 /// [`ready()`]: TcpStream::ready() try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1012 pub fn try_io<R>( 1013 &self, 1014 interest: Interest, 1015 f: impl FnOnce() -> io::Result<R>, 1016 ) -> io::Result<R> { 1017 self.io 1018 .registration() 1019 .try_io(interest, || self.io.try_io(f)) 1020 } 1021 1022 /// Reads or writes from the socket using a user-provided IO operation. 1023 /// 1024 /// The readiness of the socket is awaited and when the socket is ready, 1025 /// the provided closure is called. The closure should attempt to perform 1026 /// IO operation on the socket by manually calling the appropriate syscall. 1027 /// If the operation fails because the socket is not actually ready, 1028 /// then the closure should return a `WouldBlock` error. In such case the 1029 /// readiness flag is cleared and the socket readiness is awaited again. 1030 /// This loop is repeated until the closure returns an `Ok` or an error 1031 /// other than `WouldBlock`. 1032 /// 1033 /// The closure should only return a `WouldBlock` error if it has performed 1034 /// an IO operation on the socket that failed due to the socket not being 1035 /// ready. Returning a `WouldBlock` error in any other situation will 1036 /// incorrectly clear the readiness flag, which can cause the socket to 1037 /// behave incorrectly. 1038 /// 1039 /// The closure should not perform the IO operation using any of the methods 1040 /// defined on the Tokio `TcpStream` type, as this will mess with the 1041 /// readiness flag and can cause the socket to behave incorrectly. 1042 /// 1043 /// This method is not intended to be used with combined interests. 1044 /// The closure should perform only one type of IO operation, so it should not 1045 /// require more than one ready state. This method may panic or sleep forever 1046 /// if it is called with a combined interest. async_io<R>( &self, interest: Interest, mut f: impl FnMut() -> io::Result<R>, ) -> io::Result<R>1047 pub async fn async_io<R>( 1048 &self, 1049 interest: Interest, 1050 mut f: impl FnMut() -> io::Result<R>, 1051 ) -> io::Result<R> { 1052 self.io 1053 .registration() 1054 .async_io(interest, || self.io.try_io(&mut f)) 1055 .await 1056 } 1057 1058 /// Receives data on the socket from the remote address to which it is 1059 /// connected, without removing that data from the queue. On success, 1060 /// returns the number of bytes peeked. 1061 /// 1062 /// Successive calls return the same data. This is accomplished by passing 1063 /// `MSG_PEEK` as a flag to the underlying `recv` system call. 1064 /// 1065 /// # Examples 1066 /// 1067 /// ```no_run 1068 /// use tokio::net::TcpStream; 1069 /// use tokio::io::AsyncReadExt; 1070 /// use std::error::Error; 1071 /// 1072 /// #[tokio::main] 1073 /// async fn main() -> Result<(), Box<dyn Error>> { 1074 /// // Connect to a peer 1075 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; 1076 /// 1077 /// let mut b1 = [0; 10]; 1078 /// let mut b2 = [0; 10]; 1079 /// 1080 /// // Peek at the data 1081 /// let n = stream.peek(&mut b1).await?; 1082 /// 1083 /// // Read the data 1084 /// assert_eq!(n, stream.read(&mut b2[..n]).await?); 1085 /// assert_eq!(&b1[..n], &b2[..n]); 1086 /// 1087 /// Ok(()) 1088 /// } 1089 /// ``` 1090 /// 1091 /// The [`read`] method is defined on the [`AsyncReadExt`] trait. 1092 /// 1093 /// [`read`]: fn@crate::io::AsyncReadExt::read 1094 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt peek(&self, buf: &mut [u8]) -> io::Result<usize>1095 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { 1096 self.io 1097 .registration() 1098 .async_io(Interest::READABLE, || self.io.peek(buf)) 1099 .await 1100 } 1101 1102 /// Shuts down the read, write, or both halves of this connection. 1103 /// 1104 /// This function will cause all pending and future I/O on the specified 1105 /// portions to return immediately with an appropriate value (see the 1106 /// documentation of `Shutdown`). shutdown_std(&self, how: Shutdown) -> io::Result<()>1107 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> { 1108 self.io.shutdown(how) 1109 } 1110 1111 /// Gets the value of the `TCP_NODELAY` option on this socket. 1112 /// 1113 /// For more information about this option, see [`set_nodelay`]. 1114 /// 1115 /// [`set_nodelay`]: TcpStream::set_nodelay 1116 /// 1117 /// # Examples 1118 /// 1119 /// ```no_run 1120 /// use tokio::net::TcpStream; 1121 /// 1122 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1123 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1124 /// 1125 /// println!("{:?}", stream.nodelay()?); 1126 /// # Ok(()) 1127 /// # } 1128 /// ``` nodelay(&self) -> io::Result<bool>1129 pub fn nodelay(&self) -> io::Result<bool> { 1130 self.io.nodelay() 1131 } 1132 1133 /// Sets the value of the `TCP_NODELAY` option on this socket. 1134 /// 1135 /// If set, this option disables the Nagle algorithm. This means that 1136 /// segments are always sent as soon as possible, even if there is only a 1137 /// small amount of data. When not set, data is buffered until there is a 1138 /// sufficient amount to send out, thereby avoiding the frequent sending of 1139 /// small packets. 1140 /// 1141 /// # Examples 1142 /// 1143 /// ```no_run 1144 /// use tokio::net::TcpStream; 1145 /// 1146 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1147 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1148 /// 1149 /// stream.set_nodelay(true)?; 1150 /// # Ok(()) 1151 /// # } 1152 /// ``` set_nodelay(&self, nodelay: bool) -> io::Result<()>1153 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { 1154 self.io.set_nodelay(nodelay) 1155 } 1156 1157 cfg_not_wasi! { 1158 /// Reads the linger duration for this socket by getting the `SO_LINGER` 1159 /// option. 1160 /// 1161 /// For more information about this option, see [`set_linger`]. 1162 /// 1163 /// [`set_linger`]: TcpStream::set_linger 1164 /// 1165 /// # Examples 1166 /// 1167 /// ```no_run 1168 /// use tokio::net::TcpStream; 1169 /// 1170 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1171 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1172 /// 1173 /// println!("{:?}", stream.linger()?); 1174 /// # Ok(()) 1175 /// # } 1176 /// ``` 1177 pub fn linger(&self) -> io::Result<Option<Duration>> { 1178 socket2::SockRef::from(self).linger() 1179 } 1180 1181 /// Sets the linger duration of this socket by setting the `SO_LINGER` option. 1182 /// 1183 /// This option controls the action taken when a stream has unsent messages and the stream is 1184 /// closed. If `SO_LINGER` is set, the system shall block the process until it can transmit the 1185 /// data or until the time expires. 1186 /// 1187 /// If `SO_LINGER` is not specified, and the stream is closed, the system handles the call in a 1188 /// way that allows the process to continue as quickly as possible. 1189 /// 1190 /// # Examples 1191 /// 1192 /// ```no_run 1193 /// use tokio::net::TcpStream; 1194 /// 1195 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1196 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1197 /// 1198 /// stream.set_linger(None)?; 1199 /// # Ok(()) 1200 /// # } 1201 /// ``` 1202 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> { 1203 socket2::SockRef::from(self).set_linger(dur) 1204 } 1205 } 1206 1207 /// Gets the value of the `IP_TTL` option for this socket. 1208 /// 1209 /// For more information about this option, see [`set_ttl`]. 1210 /// 1211 /// [`set_ttl`]: TcpStream::set_ttl 1212 /// 1213 /// # Examples 1214 /// 1215 /// ```no_run 1216 /// use tokio::net::TcpStream; 1217 /// 1218 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1219 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1220 /// 1221 /// println!("{:?}", stream.ttl()?); 1222 /// # Ok(()) 1223 /// # } 1224 /// ``` ttl(&self) -> io::Result<u32>1225 pub fn ttl(&self) -> io::Result<u32> { 1226 self.io.ttl() 1227 } 1228 1229 /// Sets the value for the `IP_TTL` option on this socket. 1230 /// 1231 /// This value sets the time-to-live field that is used in every packet sent 1232 /// from this socket. 1233 /// 1234 /// # Examples 1235 /// 1236 /// ```no_run 1237 /// use tokio::net::TcpStream; 1238 /// 1239 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1240 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1241 /// 1242 /// stream.set_ttl(123)?; 1243 /// # Ok(()) 1244 /// # } 1245 /// ``` set_ttl(&self, ttl: u32) -> io::Result<()>1246 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { 1247 self.io.set_ttl(ttl) 1248 } 1249 1250 // These lifetime markers also appear in the generated documentation, and make 1251 // it more clear that this is a *borrowed* split. 1252 #[allow(clippy::needless_lifetimes)] 1253 /// Splits a `TcpStream` into a read half and a write half, which can be used 1254 /// to read and write the stream concurrently. 1255 /// 1256 /// This method is more efficient than [`into_split`], but the halves cannot be 1257 /// moved into independently spawned tasks. 1258 /// 1259 /// [`into_split`]: TcpStream::into_split() split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>)1260 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { 1261 split(self) 1262 } 1263 1264 /// Splits a `TcpStream` into a read half and a write half, which can be used 1265 /// to read and write the stream concurrently. 1266 /// 1267 /// Unlike [`split`], the owned halves can be moved to separate tasks, however 1268 /// this comes at the cost of a heap allocation. 1269 /// 1270 /// **Note:** Dropping the write half will shut down the write half of the TCP 1271 /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`. 1272 /// 1273 /// [`split`]: TcpStream::split() 1274 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown into_split(self) -> (OwnedReadHalf, OwnedWriteHalf)1275 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { 1276 split_owned(self) 1277 } 1278 1279 // == Poll IO functions that takes `&self` == 1280 // 1281 // To read or write without mutable access to the `TcpStream`, combine the 1282 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or 1283 // `try_write` methods. 1284 poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1285 pub(crate) fn poll_read_priv( 1286 &self, 1287 cx: &mut Context<'_>, 1288 buf: &mut ReadBuf<'_>, 1289 ) -> Poll<io::Result<()>> { 1290 // Safety: `TcpStream::read` correctly handles reads into uninitialized memory 1291 unsafe { self.io.poll_read(cx, buf) } 1292 } 1293 poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1294 pub(super) fn poll_write_priv( 1295 &self, 1296 cx: &mut Context<'_>, 1297 buf: &[u8], 1298 ) -> Poll<io::Result<usize>> { 1299 self.io.poll_write(cx, buf) 1300 } 1301 poll_write_vectored_priv( &self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1302 pub(super) fn poll_write_vectored_priv( 1303 &self, 1304 cx: &mut Context<'_>, 1305 bufs: &[io::IoSlice<'_>], 1306 ) -> Poll<io::Result<usize>> { 1307 self.io.poll_write_vectored(cx, bufs) 1308 } 1309 } 1310 1311 impl TryFrom<std::net::TcpStream> for TcpStream { 1312 type Error = io::Error; 1313 1314 /// Consumes stream, returning the tokio I/O object. 1315 /// 1316 /// This is equivalent to 1317 /// [`TcpStream::from_std(stream)`](TcpStream::from_std). try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error>1318 fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> { 1319 Self::from_std(stream) 1320 } 1321 } 1322 1323 // ===== impl Read / Write ===== 1324 1325 impl AsyncRead for TcpStream { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1326 fn poll_read( 1327 self: Pin<&mut Self>, 1328 cx: &mut Context<'_>, 1329 buf: &mut ReadBuf<'_>, 1330 ) -> Poll<io::Result<()>> { 1331 self.poll_read_priv(cx, buf) 1332 } 1333 } 1334 1335 impl AsyncWrite for TcpStream { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1336 fn poll_write( 1337 self: Pin<&mut Self>, 1338 cx: &mut Context<'_>, 1339 buf: &[u8], 1340 ) -> Poll<io::Result<usize>> { 1341 self.poll_write_priv(cx, buf) 1342 } 1343 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1344 fn poll_write_vectored( 1345 self: Pin<&mut Self>, 1346 cx: &mut Context<'_>, 1347 bufs: &[io::IoSlice<'_>], 1348 ) -> Poll<io::Result<usize>> { 1349 self.poll_write_vectored_priv(cx, bufs) 1350 } 1351 is_write_vectored(&self) -> bool1352 fn is_write_vectored(&self) -> bool { 1353 true 1354 } 1355 1356 #[inline] poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1357 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 1358 // tcp flush is a no-op 1359 Poll::Ready(Ok(())) 1360 } 1361 poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1362 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 1363 self.shutdown_std(std::net::Shutdown::Write)?; 1364 Poll::Ready(Ok(())) 1365 } 1366 } 1367 1368 impl fmt::Debug for TcpStream { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1370 self.io.fmt(f) 1371 } 1372 } 1373 1374 #[cfg(unix)] 1375 mod sys { 1376 use super::TcpStream; 1377 use std::os::unix::prelude::*; 1378 1379 impl AsRawFd for TcpStream { as_raw_fd(&self) -> RawFd1380 fn as_raw_fd(&self) -> RawFd { 1381 self.io.as_raw_fd() 1382 } 1383 } 1384 1385 impl AsFd for TcpStream { as_fd(&self) -> BorrowedFd<'_>1386 fn as_fd(&self) -> BorrowedFd<'_> { 1387 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } 1388 } 1389 } 1390 } 1391 1392 cfg_windows! { 1393 use crate::os::windows::io::{AsRawSocket, RawSocket, AsSocket, BorrowedSocket}; 1394 1395 impl AsRawSocket for TcpStream { 1396 fn as_raw_socket(&self) -> RawSocket { 1397 self.io.as_raw_socket() 1398 } 1399 } 1400 1401 impl AsSocket for TcpStream { 1402 fn as_socket(&self) -> BorrowedSocket<'_> { 1403 unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) } 1404 } 1405 } 1406 } 1407 1408 #[cfg(all(tokio_unstable, target_os = "wasi"))] 1409 mod sys { 1410 use super::TcpStream; 1411 use std::os::wasi::prelude::*; 1412 1413 impl AsRawFd for TcpStream { as_raw_fd(&self) -> RawFd1414 fn as_raw_fd(&self) -> RawFd { 1415 self.io.as_raw_fd() 1416 } 1417 } 1418 1419 impl AsFd for TcpStream { as_fd(&self) -> BorrowedFd<'_>1420 fn as_fd(&self) -> BorrowedFd<'_> { 1421 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } 1422 } 1423 } 1424 } 1425