1 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; 2 use crate::net::unix::split::{split, ReadHalf, WriteHalf}; 3 use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; 4 use crate::net::unix::ucred::{self, UCred}; 5 use crate::net::unix::SocketAddr; 6 7 use std::fmt; 8 use std::future::poll_fn; 9 use std::io::{self, Read, Write}; 10 use std::net::Shutdown; 11 #[cfg(target_os = "android")] 12 use std::os::android::net::SocketAddrExt; 13 #[cfg(target_os = "linux")] 14 use std::os::linux::net::SocketAddrExt; 15 #[cfg(any(target_os = "linux", target_os = "android"))] 16 use std::os::unix::ffi::OsStrExt; 17 use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; 18 use std::os::unix::net::{self, SocketAddr as StdSocketAddr}; 19 use std::path::Path; 20 use std::pin::Pin; 21 use std::task::{Context, Poll}; 22 23 cfg_io_util! { 24 use bytes::BufMut; 25 } 26 27 cfg_net_unix! { 28 /// A structure representing a connected Unix socket. 29 /// 30 /// This socket can be connected directly with [`UnixStream::connect`] or accepted 31 /// from a listener with [`UnixListener::accept`]. Additionally, a pair of 32 /// anonymous Unix sockets can be created with `UnixStream::pair`. 33 /// 34 /// To shut down the stream in the write direction, you can call the 35 /// [`shutdown()`] method. This will cause the other peer to receive a read of 36 /// length 0, indicating that no more data will be sent. This only closes 37 /// the stream in one direction. 38 /// 39 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown 40 /// [`UnixListener::accept`]: crate::net::UnixListener::accept 41 #[cfg_attr(docsrs, doc(alias = "uds"))] 42 pub struct UnixStream { 43 io: PollEvented<mio::net::UnixStream>, 44 } 45 } 46 47 impl UnixStream { connect_mio(sys: mio::net::UnixStream) -> io::Result<UnixStream>48 pub(crate) async fn connect_mio(sys: mio::net::UnixStream) -> io::Result<UnixStream> { 49 let stream = UnixStream::new(sys)?; 50 51 // Once we've connected, wait for the stream to be writable as 52 // that's when the actual connection has been initiated. Once we're 53 // writable we check for `take_socket_error` to see if the connect 54 // actually hit an error or not. 55 // 56 // If all that succeeded then we ship everything on up. 57 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; 58 59 if let Some(e) = stream.io.take_error()? { 60 return Err(e); 61 } 62 63 Ok(stream) 64 } 65 66 /// Connects to the socket named by `path`. 67 /// 68 /// This function will create a new Unix socket and connect to the path 69 /// specified, associating the returned stream with the default event loop's 70 /// handle. connect<P>(path: P) -> io::Result<UnixStream> where P: AsRef<Path>,71 pub async fn connect<P>(path: P) -> io::Result<UnixStream> 72 where 73 P: AsRef<Path>, 74 { 75 // On linux, abstract socket paths need to be considered. 76 #[cfg(any(target_os = "linux", target_os = "android"))] 77 let addr = { 78 let os_str_bytes = path.as_ref().as_os_str().as_bytes(); 79 if os_str_bytes.starts_with(b"\0") { 80 StdSocketAddr::from_abstract_name(&os_str_bytes[1..])? 81 } else { 82 StdSocketAddr::from_pathname(path)? 83 } 84 }; 85 #[cfg(not(any(target_os = "linux", target_os = "android")))] 86 let addr = StdSocketAddr::from_pathname(path)?; 87 88 let stream = mio::net::UnixStream::connect_addr(&addr)?; 89 let stream = UnixStream::new(stream)?; 90 91 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; 92 93 if let Some(e) = stream.io.take_error()? { 94 return Err(e); 95 } 96 97 Ok(stream) 98 } 99 100 /// Waits for any of the requested ready states. 101 /// 102 /// This function is usually paired with `try_read()` or `try_write()`. It 103 /// can be used to concurrently read / write to the same socket on a single 104 /// task without splitting the socket. 105 /// 106 /// The function may complete without the socket being ready. This is a 107 /// false-positive and attempting an operation will return with 108 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty 109 /// [`Ready`] set, so you should always check the returned value and possibly 110 /// wait again if the requested states are not set. 111 /// 112 /// # Cancel safety 113 /// 114 /// This method is cancel safe. Once a readiness event occurs, the method 115 /// will continue to return immediately until the readiness event is 116 /// consumed by an attempt to read or write that fails with `WouldBlock` or 117 /// `Poll::Pending`. 118 /// 119 /// # Examples 120 /// 121 /// Concurrently read and write to the stream on the same task without 122 /// splitting. 123 /// 124 /// ```no_run 125 /// use tokio::io::Interest; 126 /// use tokio::net::UnixStream; 127 /// use std::error::Error; 128 /// use std::io; 129 /// 130 /// #[tokio::main] 131 /// async fn main() -> Result<(), Box<dyn Error>> { 132 /// let dir = tempfile::tempdir().unwrap(); 133 /// let bind_path = dir.path().join("bind_path"); 134 /// let stream = UnixStream::connect(bind_path).await?; 135 /// 136 /// loop { 137 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?; 138 /// 139 /// if ready.is_readable() { 140 /// let mut data = vec![0; 1024]; 141 /// // Try to read data, this may still fail with `WouldBlock` 142 /// // if the readiness event is a false positive. 143 /// match stream.try_read(&mut data) { 144 /// Ok(n) => { 145 /// println!("read {} bytes", n); 146 /// } 147 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 148 /// continue; 149 /// } 150 /// Err(e) => { 151 /// return Err(e.into()); 152 /// } 153 /// } 154 /// 155 /// } 156 /// 157 /// if ready.is_writable() { 158 /// // Try to write data, this may still fail with `WouldBlock` 159 /// // if the readiness event is a false positive. 160 /// match stream.try_write(b"hello world") { 161 /// Ok(n) => { 162 /// println!("write {} bytes", n); 163 /// } 164 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 165 /// continue; 166 /// } 167 /// Err(e) => { 168 /// return Err(e.into()); 169 /// } 170 /// } 171 /// } 172 /// } 173 /// } 174 /// ``` ready(&self, interest: Interest) -> io::Result<Ready>175 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { 176 let event = self.io.registration().readiness(interest).await?; 177 Ok(event.ready) 178 } 179 180 /// Waits for the socket to become readable. 181 /// 182 /// This function is equivalent to `ready(Interest::READABLE)` and is usually 183 /// paired with `try_read()`. 184 /// 185 /// # Cancel safety 186 /// 187 /// This method is cancel safe. Once a readiness event occurs, the method 188 /// will continue to return immediately until the readiness event is 189 /// consumed by an attempt to read that fails with `WouldBlock` or 190 /// `Poll::Pending`. 191 /// 192 /// # Examples 193 /// 194 /// ```no_run 195 /// use tokio::net::UnixStream; 196 /// use std::error::Error; 197 /// use std::io; 198 /// 199 /// #[tokio::main] 200 /// async fn main() -> Result<(), Box<dyn Error>> { 201 /// // Connect to a peer 202 /// let dir = tempfile::tempdir().unwrap(); 203 /// let bind_path = dir.path().join("bind_path"); 204 /// let stream = UnixStream::connect(bind_path).await?; 205 /// 206 /// let mut msg = vec![0; 1024]; 207 /// 208 /// loop { 209 /// // Wait for the socket to be readable 210 /// stream.readable().await?; 211 /// 212 /// // Try to read data, this may still fail with `WouldBlock` 213 /// // if the readiness event is a false positive. 214 /// match stream.try_read(&mut msg) { 215 /// Ok(n) => { 216 /// msg.truncate(n); 217 /// break; 218 /// } 219 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 220 /// continue; 221 /// } 222 /// Err(e) => { 223 /// return Err(e.into()); 224 /// } 225 /// } 226 /// } 227 /// 228 /// println!("GOT = {:?}", msg); 229 /// Ok(()) 230 /// } 231 /// ``` readable(&self) -> io::Result<()>232 pub async fn readable(&self) -> io::Result<()> { 233 self.ready(Interest::READABLE).await?; 234 Ok(()) 235 } 236 237 /// Polls for read readiness. 238 /// 239 /// If the unix stream is not currently ready for reading, this method will 240 /// store a clone of the `Waker` from the provided `Context`. When the unix 241 /// stream becomes ready for reading, `Waker::wake` will be called on the 242 /// waker. 243 /// 244 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only 245 /// the `Waker` from the `Context` passed to the most recent call is 246 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a 247 /// second, independent waker.) 248 /// 249 /// This function is intended for cases where creating and pinning a future 250 /// via [`readable`] is not feasible. Where possible, using [`readable`] is 251 /// preferred, as this supports polling from multiple tasks at once. 252 /// 253 /// # Return value 254 /// 255 /// The function returns: 256 /// 257 /// * `Poll::Pending` if the unix stream is not ready for reading. 258 /// * `Poll::Ready(Ok(()))` if the unix stream is ready for reading. 259 /// * `Poll::Ready(Err(e))` if an error is encountered. 260 /// 261 /// # Errors 262 /// 263 /// This function may encounter any standard I/O error except `WouldBlock`. 264 /// 265 /// [`readable`]: method@Self::readable poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>266 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 267 self.io.registration().poll_read_ready(cx).map_ok(|_| ()) 268 } 269 270 /// Try to read data from the stream into the provided buffer, returning how 271 /// many bytes were read. 272 /// 273 /// Receives any pending data from the socket but does not wait for new data 274 /// to arrive. On success, returns the number of bytes read. Because 275 /// `try_read()` is non-blocking, the buffer does not have to be stored by 276 /// the async task and can exist entirely on the stack. 277 /// 278 /// Usually, [`readable()`] or [`ready()`] is used with this function. 279 /// 280 /// [`readable()`]: UnixStream::readable() 281 /// [`ready()`]: UnixStream::ready() 282 /// 283 /// # Return 284 /// 285 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 286 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: 287 /// 288 /// 1. The stream's read half is closed and will no longer yield data. 289 /// 2. The specified buffer was 0 bytes in length. 290 /// 291 /// If the stream is not ready to read data, 292 /// `Err(io::ErrorKind::WouldBlock)` is returned. 293 /// 294 /// # Examples 295 /// 296 /// ```no_run 297 /// use tokio::net::UnixStream; 298 /// use std::error::Error; 299 /// use std::io; 300 /// 301 /// #[tokio::main] 302 /// async fn main() -> Result<(), Box<dyn Error>> { 303 /// // Connect to a peer 304 /// let dir = tempfile::tempdir().unwrap(); 305 /// let bind_path = dir.path().join("bind_path"); 306 /// let stream = UnixStream::connect(bind_path).await?; 307 /// 308 /// loop { 309 /// // Wait for the socket to be readable 310 /// stream.readable().await?; 311 /// 312 /// // Creating the buffer **after** the `await` prevents it from 313 /// // being stored in the async task. 314 /// let mut buf = [0; 4096]; 315 /// 316 /// // Try to read data, this may still fail with `WouldBlock` 317 /// // if the readiness event is a false positive. 318 /// match stream.try_read(&mut buf) { 319 /// Ok(0) => break, 320 /// Ok(n) => { 321 /// println!("read {} bytes", n); 322 /// } 323 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 324 /// continue; 325 /// } 326 /// Err(e) => { 327 /// return Err(e.into()); 328 /// } 329 /// } 330 /// } 331 /// 332 /// Ok(()) 333 /// } 334 /// ``` try_read(&self, buf: &mut [u8]) -> io::Result<usize>335 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { 336 self.io 337 .registration() 338 .try_io(Interest::READABLE, || (&*self.io).read(buf)) 339 } 340 341 /// Tries to read data from the stream into the provided buffers, returning 342 /// how many bytes were read. 343 /// 344 /// Data is copied to fill each buffer in order, with the final buffer 345 /// written to possibly being only partially filled. This method behaves 346 /// equivalently to a single call to [`try_read()`] with concatenated 347 /// buffers. 348 /// 349 /// Receives any pending data from the socket but does not wait for new data 350 /// to arrive. On success, returns the number of bytes read. Because 351 /// `try_read_vectored()` is non-blocking, the buffer does not have to be 352 /// stored by the async task and can exist entirely on the stack. 353 /// 354 /// Usually, [`readable()`] or [`ready()`] is used with this function. 355 /// 356 /// [`try_read()`]: UnixStream::try_read() 357 /// [`readable()`]: UnixStream::readable() 358 /// [`ready()`]: UnixStream::ready() 359 /// 360 /// # Return 361 /// 362 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 363 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 364 /// and will no longer yield data. If the stream is not ready to read data 365 /// `Err(io::ErrorKind::WouldBlock)` is returned. 366 /// 367 /// # Examples 368 /// 369 /// ```no_run 370 /// use tokio::net::UnixStream; 371 /// use std::error::Error; 372 /// use std::io::{self, IoSliceMut}; 373 /// 374 /// #[tokio::main] 375 /// async fn main() -> Result<(), Box<dyn Error>> { 376 /// // Connect to a peer 377 /// let dir = tempfile::tempdir().unwrap(); 378 /// let bind_path = dir.path().join("bind_path"); 379 /// let stream = UnixStream::connect(bind_path).await?; 380 /// 381 /// loop { 382 /// // Wait for the socket to be readable 383 /// stream.readable().await?; 384 /// 385 /// // Creating the buffer **after** the `await` prevents it from 386 /// // being stored in the async task. 387 /// let mut buf_a = [0; 512]; 388 /// let mut buf_b = [0; 1024]; 389 /// let mut bufs = [ 390 /// IoSliceMut::new(&mut buf_a), 391 /// IoSliceMut::new(&mut buf_b), 392 /// ]; 393 /// 394 /// // Try to read data, this may still fail with `WouldBlock` 395 /// // if the readiness event is a false positive. 396 /// match stream.try_read_vectored(&mut bufs) { 397 /// Ok(0) => break, 398 /// Ok(n) => { 399 /// println!("read {} bytes", n); 400 /// } 401 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 402 /// continue; 403 /// } 404 /// Err(e) => { 405 /// return Err(e.into()); 406 /// } 407 /// } 408 /// } 409 /// 410 /// Ok(()) 411 /// } 412 /// ``` try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>413 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { 414 self.io 415 .registration() 416 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) 417 } 418 419 cfg_io_util! { 420 /// Tries to read data from the stream into the provided buffer, advancing the 421 /// buffer's internal cursor, returning how many bytes were read. 422 /// 423 /// Receives any pending data from the socket but does not wait for new data 424 /// to arrive. On success, returns the number of bytes read. Because 425 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by 426 /// the async task and can exist entirely on the stack. 427 /// 428 /// Usually, [`readable()`] or [`ready()`] is used with this function. 429 /// 430 /// [`readable()`]: UnixStream::readable() 431 /// [`ready()`]: UnixStream::ready() 432 /// 433 /// # Return 434 /// 435 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 436 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 437 /// and will no longer yield data. If the stream is not ready to read data 438 /// `Err(io::ErrorKind::WouldBlock)` is returned. 439 /// 440 /// # Examples 441 /// 442 /// ```no_run 443 /// use tokio::net::UnixStream; 444 /// use std::error::Error; 445 /// use std::io; 446 /// 447 /// #[tokio::main] 448 /// async fn main() -> Result<(), Box<dyn Error>> { 449 /// // Connect to a peer 450 /// let dir = tempfile::tempdir().unwrap(); 451 /// let bind_path = dir.path().join("bind_path"); 452 /// let stream = UnixStream::connect(bind_path).await?; 453 /// 454 /// loop { 455 /// // Wait for the socket to be readable 456 /// stream.readable().await?; 457 /// 458 /// let mut buf = Vec::with_capacity(4096); 459 /// 460 /// // Try to read data, this may still fail with `WouldBlock` 461 /// // if the readiness event is a false positive. 462 /// match stream.try_read_buf(&mut buf) { 463 /// Ok(0) => break, 464 /// Ok(n) => { 465 /// println!("read {} bytes", n); 466 /// } 467 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 468 /// continue; 469 /// } 470 /// Err(e) => { 471 /// return Err(e.into()); 472 /// } 473 /// } 474 /// } 475 /// 476 /// Ok(()) 477 /// } 478 /// ``` 479 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { 480 self.io.registration().try_io(Interest::READABLE, || { 481 use std::io::Read; 482 483 let dst = buf.chunk_mut(); 484 let dst = 485 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 486 487 // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the 488 // buffer. 489 let n = (&*self.io).read(dst)?; 490 491 unsafe { 492 buf.advance_mut(n); 493 } 494 495 Ok(n) 496 }) 497 } 498 } 499 500 /// Waits for the socket to become writable. 501 /// 502 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually 503 /// paired with `try_write()`. 504 /// 505 /// # Cancel safety 506 /// 507 /// This method is cancel safe. Once a readiness event occurs, the method 508 /// will continue to return immediately until the readiness event is 509 /// consumed by an attempt to write that fails with `WouldBlock` or 510 /// `Poll::Pending`. 511 /// 512 /// # Examples 513 /// 514 /// ```no_run 515 /// use tokio::net::UnixStream; 516 /// use std::error::Error; 517 /// use std::io; 518 /// 519 /// #[tokio::main] 520 /// async fn main() -> Result<(), Box<dyn Error>> { 521 /// // Connect to a peer 522 /// let dir = tempfile::tempdir().unwrap(); 523 /// let bind_path = dir.path().join("bind_path"); 524 /// let stream = UnixStream::connect(bind_path).await?; 525 /// 526 /// loop { 527 /// // Wait for the socket to be writable 528 /// stream.writable().await?; 529 /// 530 /// // Try to write data, this may still fail with `WouldBlock` 531 /// // if the readiness event is a false positive. 532 /// match stream.try_write(b"hello world") { 533 /// Ok(n) => { 534 /// break; 535 /// } 536 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 537 /// continue; 538 /// } 539 /// Err(e) => { 540 /// return Err(e.into()); 541 /// } 542 /// } 543 /// } 544 /// 545 /// Ok(()) 546 /// } 547 /// ``` writable(&self) -> io::Result<()>548 pub async fn writable(&self) -> io::Result<()> { 549 self.ready(Interest::WRITABLE).await?; 550 Ok(()) 551 } 552 553 /// Polls for write readiness. 554 /// 555 /// If the unix stream is not currently ready for writing, this method will 556 /// store a clone of the `Waker` from the provided `Context`. When the unix 557 /// stream becomes ready for writing, `Waker::wake` will be called on the 558 /// waker. 559 /// 560 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only 561 /// the `Waker` from the `Context` passed to the most recent call is 562 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a 563 /// second, independent waker.) 564 /// 565 /// This function is intended for cases where creating and pinning a future 566 /// via [`writable`] is not feasible. Where possible, using [`writable`] is 567 /// preferred, as this supports polling from multiple tasks at once. 568 /// 569 /// # Return value 570 /// 571 /// The function returns: 572 /// 573 /// * `Poll::Pending` if the unix stream is not ready for writing. 574 /// * `Poll::Ready(Ok(()))` if the unix stream is ready for writing. 575 /// * `Poll::Ready(Err(e))` if an error is encountered. 576 /// 577 /// # Errors 578 /// 579 /// This function may encounter any standard I/O error except `WouldBlock`. 580 /// 581 /// [`writable`]: method@Self::writable poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>582 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 583 self.io.registration().poll_write_ready(cx).map_ok(|_| ()) 584 } 585 586 /// Tries to write a buffer to the stream, returning how many bytes were 587 /// written. 588 /// 589 /// The function will attempt to write the entire contents of `buf`, but 590 /// only part of the buffer may be written. 591 /// 592 /// This function is usually paired with `writable()`. 593 /// 594 /// # Return 595 /// 596 /// If data is successfully written, `Ok(n)` is returned, where `n` is the 597 /// number of bytes written. If the stream is not ready to write data, 598 /// `Err(io::ErrorKind::WouldBlock)` is returned. 599 /// 600 /// # Examples 601 /// 602 /// ```no_run 603 /// use tokio::net::UnixStream; 604 /// use std::error::Error; 605 /// use std::io; 606 /// 607 /// #[tokio::main] 608 /// async fn main() -> Result<(), Box<dyn Error>> { 609 /// // Connect to a peer 610 /// let dir = tempfile::tempdir().unwrap(); 611 /// let bind_path = dir.path().join("bind_path"); 612 /// let stream = UnixStream::connect(bind_path).await?; 613 /// 614 /// loop { 615 /// // Wait for the socket to be writable 616 /// stream.writable().await?; 617 /// 618 /// // Try to write data, this may still fail with `WouldBlock` 619 /// // if the readiness event is a false positive. 620 /// match stream.try_write(b"hello world") { 621 /// Ok(n) => { 622 /// break; 623 /// } 624 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 625 /// continue; 626 /// } 627 /// Err(e) => { 628 /// return Err(e.into()); 629 /// } 630 /// } 631 /// } 632 /// 633 /// Ok(()) 634 /// } 635 /// ``` try_write(&self, buf: &[u8]) -> io::Result<usize>636 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { 637 self.io 638 .registration() 639 .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) 640 } 641 642 /// Tries to write several buffers to the stream, returning how many bytes 643 /// were written. 644 /// 645 /// Data is written from each buffer in order, with the final buffer read 646 /// from possible being only partially consumed. This method behaves 647 /// equivalently to a single call to [`try_write()`] with concatenated 648 /// buffers. 649 /// 650 /// This function is usually paired with `writable()`. 651 /// 652 /// [`try_write()`]: UnixStream::try_write() 653 /// 654 /// # Return 655 /// 656 /// If data is successfully written, `Ok(n)` is returned, where `n` is the 657 /// number of bytes written. If the stream is not ready to write data, 658 /// `Err(io::ErrorKind::WouldBlock)` is returned. 659 /// 660 /// # Examples 661 /// 662 /// ```no_run 663 /// use tokio::net::UnixStream; 664 /// use std::error::Error; 665 /// use std::io; 666 /// 667 /// #[tokio::main] 668 /// async fn main() -> Result<(), Box<dyn Error>> { 669 /// // Connect to a peer 670 /// let dir = tempfile::tempdir().unwrap(); 671 /// let bind_path = dir.path().join("bind_path"); 672 /// let stream = UnixStream::connect(bind_path).await?; 673 /// 674 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; 675 /// 676 /// loop { 677 /// // Wait for the socket to be writable 678 /// stream.writable().await?; 679 /// 680 /// // Try to write data, this may still fail with `WouldBlock` 681 /// // if the readiness event is a false positive. 682 /// match stream.try_write_vectored(&bufs) { 683 /// Ok(n) => { 684 /// break; 685 /// } 686 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 687 /// continue; 688 /// } 689 /// Err(e) => { 690 /// return Err(e.into()); 691 /// } 692 /// } 693 /// } 694 /// 695 /// Ok(()) 696 /// } 697 /// ``` try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>698 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { 699 self.io 700 .registration() 701 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) 702 } 703 704 /// Tries to read or write from the socket using a user-provided IO operation. 705 /// 706 /// If the socket is ready, the provided closure is called. The closure 707 /// should attempt to perform IO operation on the socket by manually 708 /// calling the appropriate syscall. If the operation fails because the 709 /// socket is not actually ready, then the closure should return a 710 /// `WouldBlock` error and the readiness flag is cleared. The return value 711 /// of the closure is then returned by `try_io`. 712 /// 713 /// If the socket is not ready, then the closure is not called 714 /// and a `WouldBlock` error is returned. 715 /// 716 /// The closure should only return a `WouldBlock` error if it has performed 717 /// an IO operation on the socket that failed due to the socket not being 718 /// ready. Returning a `WouldBlock` error in any other situation will 719 /// incorrectly clear the readiness flag, which can cause the socket to 720 /// behave incorrectly. 721 /// 722 /// The closure should not perform the IO operation using any of the methods 723 /// defined on the Tokio `UnixStream` type, as this will mess with the 724 /// readiness flag and can cause the socket to behave incorrectly. 725 /// 726 /// This method is not intended to be used with combined interests. 727 /// The closure should perform only one type of IO operation, so it should not 728 /// require more than one ready state. This method may panic or sleep forever 729 /// if it is called with a combined interest. 730 /// 731 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. 732 /// 733 /// [`readable()`]: UnixStream::readable() 734 /// [`writable()`]: UnixStream::writable() 735 /// [`ready()`]: UnixStream::ready() try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>736 pub fn try_io<R>( 737 &self, 738 interest: Interest, 739 f: impl FnOnce() -> io::Result<R>, 740 ) -> io::Result<R> { 741 self.io 742 .registration() 743 .try_io(interest, || self.io.try_io(f)) 744 } 745 746 /// Reads or writes from the socket using a user-provided IO operation. 747 /// 748 /// The readiness of the socket is awaited and when the socket is ready, 749 /// the provided closure is called. The closure should attempt to perform 750 /// IO operation on the socket by manually calling the appropriate syscall. 751 /// If the operation fails because the socket is not actually ready, 752 /// then the closure should return a `WouldBlock` error. In such case the 753 /// readiness flag is cleared and the socket readiness is awaited again. 754 /// This loop is repeated until the closure returns an `Ok` or an error 755 /// other than `WouldBlock`. 756 /// 757 /// The closure should only return a `WouldBlock` error if it has performed 758 /// an IO operation on the socket that failed due to the socket not being 759 /// ready. Returning a `WouldBlock` error in any other situation will 760 /// incorrectly clear the readiness flag, which can cause the socket to 761 /// behave incorrectly. 762 /// 763 /// The closure should not perform the IO operation using any of the methods 764 /// defined on the Tokio `UnixStream` type, as this will mess with the 765 /// readiness flag and can cause the socket to behave incorrectly. 766 /// 767 /// This method is not intended to be used with combined interests. 768 /// The closure should perform only one type of IO operation, so it should not 769 /// require more than one ready state. This method may panic or sleep forever 770 /// if it is called with a combined interest. async_io<R>( &self, interest: Interest, mut f: impl FnMut() -> io::Result<R>, ) -> io::Result<R>771 pub async fn async_io<R>( 772 &self, 773 interest: Interest, 774 mut f: impl FnMut() -> io::Result<R>, 775 ) -> io::Result<R> { 776 self.io 777 .registration() 778 .async_io(interest, || self.io.try_io(&mut f)) 779 .await 780 } 781 782 /// Creates new [`UnixStream`] from a [`std::os::unix::net::UnixStream`]. 783 /// 784 /// This function is intended to be used to wrap a `UnixStream` from the 785 /// standard library in the Tokio equivalent. 786 /// 787 /// # Notes 788 /// 789 /// The caller is responsible for ensuring that the stream is in 790 /// non-blocking mode. Otherwise all I/O operations on the stream 791 /// will block the thread, which will cause unexpected behavior. 792 /// Non-blocking mode can be set using [`set_nonblocking`]. 793 /// 794 /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking 795 /// 796 /// # Examples 797 /// 798 /// ```no_run 799 /// use tokio::net::UnixStream; 800 /// use std::os::unix::net::UnixStream as StdUnixStream; 801 /// # use std::error::Error; 802 /// 803 /// # async fn dox() -> Result<(), Box<dyn Error>> { 804 /// let std_stream = StdUnixStream::connect("/path/to/the/socket")?; 805 /// std_stream.set_nonblocking(true)?; 806 /// let stream = UnixStream::from_std(std_stream)?; 807 /// # Ok(()) 808 /// # } 809 /// ``` 810 /// 811 /// # Panics 812 /// 813 /// This function panics if it is not called from within a runtime with 814 /// IO enabled. 815 /// 816 /// The runtime is usually set implicitly when this function is called 817 /// from a future driven by a tokio runtime, otherwise runtime can be set 818 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. 819 #[track_caller] from_std(stream: net::UnixStream) -> io::Result<UnixStream>820 pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> { 821 let stream = mio::net::UnixStream::from_std(stream); 822 let io = PollEvented::new(stream)?; 823 824 Ok(UnixStream { io }) 825 } 826 827 /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`]. 828 /// 829 /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking 830 /// mode set as `true`. Use [`set_nonblocking`] to change the blocking 831 /// mode if needed. 832 /// 833 /// # Examples 834 /// 835 /// ``` 836 /// use std::error::Error; 837 /// use std::io::Read; 838 /// use tokio::net::UnixListener; 839 /// # use tokio::net::UnixStream; 840 /// # use tokio::io::AsyncWriteExt; 841 /// 842 /// #[tokio::main] 843 /// async fn main() -> Result<(), Box<dyn Error>> { 844 /// let dir = tempfile::tempdir().unwrap(); 845 /// let bind_path = dir.path().join("bind_path"); 846 /// 847 /// let mut data = [0u8; 12]; 848 /// let listener = UnixListener::bind(&bind_path)?; 849 /// # let handle = tokio::spawn(async { 850 /// # let mut stream = UnixStream::connect(bind_path).await.unwrap(); 851 /// # stream.write(b"Hello world!").await.unwrap(); 852 /// # }); 853 /// let (tokio_unix_stream, _) = listener.accept().await?; 854 /// let mut std_unix_stream = tokio_unix_stream.into_std()?; 855 /// # handle.await.expect("The task being joined has panicked"); 856 /// std_unix_stream.set_nonblocking(false)?; 857 /// std_unix_stream.read_exact(&mut data)?; 858 /// # assert_eq!(b"Hello world!", &data); 859 /// Ok(()) 860 /// } 861 /// ``` 862 /// [`tokio::net::UnixStream`]: UnixStream 863 /// [`std::os::unix::net::UnixStream`]: std::os::unix::net::UnixStream 864 /// [`set_nonblocking`]: fn@std::os::unix::net::UnixStream::set_nonblocking into_std(self) -> io::Result<std::os::unix::net::UnixStream>865 pub fn into_std(self) -> io::Result<std::os::unix::net::UnixStream> { 866 self.io 867 .into_inner() 868 .map(IntoRawFd::into_raw_fd) 869 .map(|raw_fd| unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) }) 870 } 871 872 /// Creates an unnamed pair of connected sockets. 873 /// 874 /// This function will create a pair of interconnected Unix sockets for 875 /// communicating back and forth between one another. Each socket will 876 /// be associated with the default event loop's handle. pair() -> io::Result<(UnixStream, UnixStream)>877 pub fn pair() -> io::Result<(UnixStream, UnixStream)> { 878 let (a, b) = mio::net::UnixStream::pair()?; 879 let a = UnixStream::new(a)?; 880 let b = UnixStream::new(b)?; 881 882 Ok((a, b)) 883 } 884 new(stream: mio::net::UnixStream) -> io::Result<UnixStream>885 pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> { 886 let io = PollEvented::new(stream)?; 887 Ok(UnixStream { io }) 888 } 889 890 /// Returns the socket address of the local half of this connection. 891 /// 892 /// # Examples 893 /// 894 /// ```no_run 895 /// use tokio::net::UnixStream; 896 /// 897 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 898 /// let dir = tempfile::tempdir().unwrap(); 899 /// let bind_path = dir.path().join("bind_path"); 900 /// let stream = UnixStream::connect(bind_path).await?; 901 /// 902 /// println!("{:?}", stream.local_addr()?); 903 /// # Ok(()) 904 /// # } 905 /// ``` local_addr(&self) -> io::Result<SocketAddr>906 pub fn local_addr(&self) -> io::Result<SocketAddr> { 907 self.io.local_addr().map(SocketAddr) 908 } 909 910 /// Returns the socket address of the remote half of this connection. 911 /// 912 /// # Examples 913 /// 914 /// ```no_run 915 /// use tokio::net::UnixStream; 916 /// 917 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 918 /// let dir = tempfile::tempdir().unwrap(); 919 /// let bind_path = dir.path().join("bind_path"); 920 /// let stream = UnixStream::connect(bind_path).await?; 921 /// 922 /// println!("{:?}", stream.peer_addr()?); 923 /// # Ok(()) 924 /// # } 925 /// ``` peer_addr(&self) -> io::Result<SocketAddr>926 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 927 self.io.peer_addr().map(SocketAddr) 928 } 929 930 /// Returns effective credentials of the process which called `connect` or `pair`. peer_cred(&self) -> io::Result<UCred>931 pub fn peer_cred(&self) -> io::Result<UCred> { 932 ucred::get_peer_cred(self) 933 } 934 935 /// Returns the value of the `SO_ERROR` option. take_error(&self) -> io::Result<Option<io::Error>>936 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 937 self.io.take_error() 938 } 939 940 /// Shuts down the read, write, or both halves of this connection. 941 /// 942 /// This function will cause all pending and future I/O calls on the 943 /// specified portions to immediately return with an appropriate value 944 /// (see the documentation of `Shutdown`). shutdown_std(&self, how: Shutdown) -> io::Result<()>945 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> { 946 self.io.shutdown(how) 947 } 948 949 // These lifetime markers also appear in the generated documentation, and make 950 // it more clear that this is a *borrowed* split. 951 #[allow(clippy::needless_lifetimes)] 952 /// Splits a `UnixStream` into a read half and a write half, which can be used 953 /// to read and write the stream concurrently. 954 /// 955 /// This method is more efficient than [`into_split`], but the halves cannot be 956 /// moved into independently spawned tasks. 957 /// 958 /// [`into_split`]: Self::into_split() split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>)959 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { 960 split(self) 961 } 962 963 /// Splits a `UnixStream` into a read half and a write half, which can be used 964 /// to read and write the stream concurrently. 965 /// 966 /// Unlike [`split`], the owned halves can be moved to separate tasks, however 967 /// this comes at the cost of a heap allocation. 968 /// 969 /// **Note:** Dropping the write half will shut down the write half of the 970 /// stream. This is equivalent to calling [`shutdown()`] on the `UnixStream`. 971 /// 972 /// [`split`]: Self::split() 973 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown into_split(self) -> (OwnedReadHalf, OwnedWriteHalf)974 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { 975 split_owned(self) 976 } 977 } 978 979 impl TryFrom<net::UnixStream> for UnixStream { 980 type Error = io::Error; 981 982 /// Consumes stream, returning the tokio I/O object. 983 /// 984 /// This is equivalent to 985 /// [`UnixStream::from_std(stream)`](UnixStream::from_std). try_from(stream: net::UnixStream) -> io::Result<Self>986 fn try_from(stream: net::UnixStream) -> io::Result<Self> { 987 Self::from_std(stream) 988 } 989 } 990 991 impl AsyncRead for UnixStream { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>992 fn poll_read( 993 self: Pin<&mut Self>, 994 cx: &mut Context<'_>, 995 buf: &mut ReadBuf<'_>, 996 ) -> Poll<io::Result<()>> { 997 self.poll_read_priv(cx, buf) 998 } 999 } 1000 1001 impl AsyncWrite for UnixStream { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1002 fn poll_write( 1003 self: Pin<&mut Self>, 1004 cx: &mut Context<'_>, 1005 buf: &[u8], 1006 ) -> Poll<io::Result<usize>> { 1007 self.poll_write_priv(cx, buf) 1008 } 1009 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1010 fn poll_write_vectored( 1011 self: Pin<&mut Self>, 1012 cx: &mut Context<'_>, 1013 bufs: &[io::IoSlice<'_>], 1014 ) -> Poll<io::Result<usize>> { 1015 self.poll_write_vectored_priv(cx, bufs) 1016 } 1017 is_write_vectored(&self) -> bool1018 fn is_write_vectored(&self) -> bool { 1019 true 1020 } 1021 poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1022 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 1023 Poll::Ready(Ok(())) 1024 } 1025 poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1026 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 1027 self.shutdown_std(std::net::Shutdown::Write)?; 1028 Poll::Ready(Ok(())) 1029 } 1030 } 1031 1032 impl UnixStream { 1033 // == Poll IO functions that takes `&self` == 1034 // 1035 // To read or write without mutable access to the `UnixStream`, combine the 1036 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or 1037 // `try_write` methods. 1038 poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1039 pub(crate) fn poll_read_priv( 1040 &self, 1041 cx: &mut Context<'_>, 1042 buf: &mut ReadBuf<'_>, 1043 ) -> Poll<io::Result<()>> { 1044 // Safety: `UnixStream::read` correctly handles reads into uninitialized memory 1045 unsafe { self.io.poll_read(cx, buf) } 1046 } 1047 poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1048 pub(crate) fn poll_write_priv( 1049 &self, 1050 cx: &mut Context<'_>, 1051 buf: &[u8], 1052 ) -> Poll<io::Result<usize>> { 1053 self.io.poll_write(cx, buf) 1054 } 1055 poll_write_vectored_priv( &self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1056 pub(super) fn poll_write_vectored_priv( 1057 &self, 1058 cx: &mut Context<'_>, 1059 bufs: &[io::IoSlice<'_>], 1060 ) -> Poll<io::Result<usize>> { 1061 self.io.poll_write_vectored(cx, bufs) 1062 } 1063 } 1064 1065 impl fmt::Debug for UnixStream { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1066 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1067 self.io.fmt(f) 1068 } 1069 } 1070 1071 impl AsRawFd for UnixStream { as_raw_fd(&self) -> RawFd1072 fn as_raw_fd(&self) -> RawFd { 1073 self.io.as_raw_fd() 1074 } 1075 } 1076 1077 impl AsFd for UnixStream { as_fd(&self) -> BorrowedFd<'_>1078 fn as_fd(&self) -> BorrowedFd<'_> { 1079 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } 1080 } 1081 } 1082