1 //! `UnixStream` split support.
2 //!
3 //! A `UnixStream` can be split into a read half and a write half with
4 //! `UnixStream::split`. The read half implements `AsyncRead` while the write
5 //! half implements `AsyncWrite`.
6 //!
7 //! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
8 //! split has no associated overhead and enforces all invariants at the type
9 //! level.
10 
11 use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
12 use crate::net::UnixStream;
13 
14 use crate::net::unix::SocketAddr;
15 use std::io;
16 use std::net::Shutdown;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19 
20 cfg_io_util! {
21     use bytes::BufMut;
22 }
23 
24 /// Borrowed read half of a [`UnixStream`], created by [`split`].
25 ///
26 /// Reading from a `ReadHalf` is usually done using the convenience methods found on the
27 /// [`AsyncReadExt`] trait.
28 ///
29 /// [`UnixStream`]: UnixStream
30 /// [`split`]: UnixStream::split()
31 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
32 #[derive(Debug)]
33 pub struct ReadHalf<'a>(&'a UnixStream);
34 
35 /// Borrowed write half of a [`UnixStream`], created by [`split`].
36 ///
37 /// Note that in the [`AsyncWrite`] implementation of this type, [`poll_shutdown`] will
38 /// shut down the [`UnixStream`] stream in the write direction.
39 ///
40 /// Writing to an `WriteHalf` is usually done using the convenience methods found
41 /// on the [`AsyncWriteExt`] trait.
42 ///
43 /// [`UnixStream`]: UnixStream
44 /// [`split`]: UnixStream::split()
45 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite
46 /// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
47 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
48 #[derive(Debug)]
49 pub struct WriteHalf<'a>(&'a UnixStream);
50 
split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>)51 pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
52     (ReadHalf(stream), WriteHalf(stream))
53 }
54 
55 impl ReadHalf<'_> {
56     /// Wait for any of the requested ready states.
57     ///
58     /// This function is usually paired with [`try_read()`]. It can be used instead
59     /// of [`readable()`] to check the returned ready set for [`Ready::READABLE`]
60     /// and [`Ready::READ_CLOSED`] events.
61     ///
62     /// The function may complete without the socket being ready. This is a
63     /// false-positive and attempting an operation will return with
64     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
65     /// [`Ready`] set, so you should always check the returned value and possibly
66     /// wait again if the requested states are not set.
67     ///
68     /// This function is equivalent to [`UnixStream::ready`].
69     ///
70     /// [`try_read()`]: Self::try_read
71     /// [`readable()`]: Self::readable
72     ///
73     /// # Cancel safety
74     ///
75     /// This method is cancel safe. Once a readiness event occurs, the method
76     /// will continue to return immediately until the readiness event is
77     /// consumed by an attempt to read or write that fails with `WouldBlock` or
78     /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>79     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
80         self.0.ready(interest).await
81     }
82 
83     /// Waits for the socket to become readable.
84     ///
85     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
86     /// paired with `try_read()`.
87     ///
88     /// # Cancel safety
89     ///
90     /// This method is cancel safe. Once a readiness event occurs, the method
91     /// will continue to return immediately until the readiness event is
92     /// consumed by an attempt to read that fails with `WouldBlock` or
93     /// `Poll::Pending`.
readable(&self) -> io::Result<()>94     pub async fn readable(&self) -> io::Result<()> {
95         self.0.readable().await
96     }
97 
98     /// Tries to read data from the stream into the provided buffer, returning how
99     /// many bytes were read.
100     ///
101     /// Receives any pending data from the socket but does not wait for new data
102     /// to arrive. On success, returns the number of bytes read. Because
103     /// `try_read()` is non-blocking, the buffer does not have to be stored by
104     /// the async task and can exist entirely on the stack.
105     ///
106     /// Usually, [`readable()`] or [`ready()`] is used with this function.
107     ///
108     /// [`readable()`]: Self::readable()
109     /// [`ready()`]: Self::ready()
110     ///
111     /// # Return
112     ///
113     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
114     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
115     ///
116     /// 1. The stream's read half is closed and will no longer yield data.
117     /// 2. The specified buffer was 0 bytes in length.
118     ///
119     /// If the stream is not ready to read data,
120     /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_read(&self, buf: &mut [u8]) -> io::Result<usize>121     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
122         self.0.try_read(buf)
123     }
124 
125     cfg_io_util! {
126         /// Tries to read data from the stream into the provided buffer, advancing the
127         /// buffer's internal cursor, returning how many bytes were read.
128         ///
129         /// Receives any pending data from the socket but does not wait for new data
130         /// to arrive. On success, returns the number of bytes read. Because
131         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
132         /// the async task and can exist entirely on the stack.
133         ///
134         /// Usually, [`readable()`] or [`ready()`] is used with this function.
135         ///
136         /// [`readable()`]: Self::readable()
137         /// [`ready()`]: Self::ready()
138         ///
139         /// # Return
140         ///
141         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
142         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
143         /// and will no longer yield data. If the stream is not ready to read data
144         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
145             self.0.try_read_buf(buf)
146         }
147     }
148 
149     /// Tries to read data from the stream into the provided buffers, returning
150     /// how many bytes were read.
151     ///
152     /// Data is copied to fill each buffer in order, with the final buffer
153     /// written to possibly being only partially filled. This method behaves
154     /// equivalently to a single call to [`try_read()`] with concatenated
155     /// buffers.
156     ///
157     /// Receives any pending data from the socket but does not wait for new data
158     /// to arrive. On success, returns the number of bytes read. Because
159     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
160     /// stored by the async task and can exist entirely on the stack.
161     ///
162     /// Usually, [`readable()`] or [`ready()`] is used with this function.
163     ///
164     /// [`try_read()`]: Self::try_read()
165     /// [`readable()`]: Self::readable()
166     /// [`ready()`]: Self::ready()
167     ///
168     /// # Return
169     ///
170     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
171     /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
172     /// and will no longer yield data. If the stream is not ready to read data
173     /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>174     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
175         self.0.try_read_vectored(bufs)
176     }
177 
178     /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<SocketAddr>179     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
180         self.0.peer_addr()
181     }
182 
183     /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<SocketAddr>184     pub fn local_addr(&self) -> io::Result<SocketAddr> {
185         self.0.local_addr()
186     }
187 }
188 
189 impl WriteHalf<'_> {
190     /// Waits for any of the requested ready states.
191     ///
192     /// This function is usually paired with [`try_write()`]. It can be used instead
193     /// of [`writable()`] to check the returned ready set for [`Ready::WRITABLE`]
194     /// and [`Ready::WRITE_CLOSED`] events.
195     ///
196     /// The function may complete without the socket being ready. This is a
197     /// false-positive and attempting an operation will return with
198     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
199     /// [`Ready`] set, so you should always check the returned value and possibly
200     /// wait again if the requested states are not set.
201     ///
202     /// This function is equivalent to [`UnixStream::ready`].
203     ///
204     /// [`try_write()`]: Self::try_write
205     /// [`writable()`]: Self::writable
206     ///
207     /// # Cancel safety
208     ///
209     /// This method is cancel safe. Once a readiness event occurs, the method
210     /// will continue to return immediately until the readiness event is
211     /// consumed by an attempt to read or write that fails with `WouldBlock` or
212     /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>213     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
214         self.0.ready(interest).await
215     }
216 
217     /// Waits for the socket to become writable.
218     ///
219     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
220     /// paired with `try_write()`.
221     ///
222     /// # Cancel safety
223     ///
224     /// This method is cancel safe. Once a readiness event occurs, the method
225     /// will continue to return immediately until the readiness event is
226     /// consumed by an attempt to write that fails with `WouldBlock` or
227     /// `Poll::Pending`.
writable(&self) -> io::Result<()>228     pub async fn writable(&self) -> io::Result<()> {
229         self.0.writable().await
230     }
231 
232     /// Tries to write a buffer to the stream, returning how many bytes were
233     /// written.
234     ///
235     /// The function will attempt to write the entire contents of `buf`, but
236     /// only part of the buffer may be written.
237     ///
238     /// This function is usually paired with `writable()`.
239     ///
240     /// # Return
241     ///
242     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
243     /// number of bytes written. If the stream is not ready to write data,
244     /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_write(&self, buf: &[u8]) -> io::Result<usize>245     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
246         self.0.try_write(buf)
247     }
248 
249     /// Tries to write several buffers to the stream, returning how many bytes
250     /// were written.
251     ///
252     /// Data is written from each buffer in order, with the final buffer read
253     /// from possible being only partially consumed. This method behaves
254     /// equivalently to a single call to [`try_write()`] with concatenated
255     /// buffers.
256     ///
257     /// This function is usually paired with `writable()`.
258     ///
259     /// [`try_write()`]: Self::try_write()
260     ///
261     /// # Return
262     ///
263     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
264     /// number of bytes written. If the stream is not ready to write data,
265     /// `Err(io::ErrorKind::WouldBlock)` is returned.
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>266     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
267         self.0.try_write_vectored(buf)
268     }
269 
270     /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<SocketAddr>271     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
272         self.0.peer_addr()
273     }
274 
275     /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<SocketAddr>276     pub fn local_addr(&self) -> io::Result<SocketAddr> {
277         self.0.local_addr()
278     }
279 }
280 
281 impl AsyncRead for ReadHalf<'_> {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>282     fn poll_read(
283         self: Pin<&mut Self>,
284         cx: &mut Context<'_>,
285         buf: &mut ReadBuf<'_>,
286     ) -> Poll<io::Result<()>> {
287         self.0.poll_read_priv(cx, buf)
288     }
289 }
290 
291 impl AsyncWrite for WriteHalf<'_> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>292     fn poll_write(
293         self: Pin<&mut Self>,
294         cx: &mut Context<'_>,
295         buf: &[u8],
296     ) -> Poll<io::Result<usize>> {
297         self.0.poll_write_priv(cx, buf)
298     }
299 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>300     fn poll_write_vectored(
301         self: Pin<&mut Self>,
302         cx: &mut Context<'_>,
303         bufs: &[io::IoSlice<'_>],
304     ) -> Poll<io::Result<usize>> {
305         self.0.poll_write_vectored_priv(cx, bufs)
306     }
307 
is_write_vectored(&self) -> bool308     fn is_write_vectored(&self) -> bool {
309         self.0.is_write_vectored()
310     }
311 
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>312     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
313         Poll::Ready(Ok(()))
314     }
315 
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>316     fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
317         self.0.shutdown_std(Shutdown::Write).into()
318     }
319 }
320 
321 impl AsRef<UnixStream> for ReadHalf<'_> {
as_ref(&self) -> &UnixStream322     fn as_ref(&self) -> &UnixStream {
323         self.0
324     }
325 }
326 
327 impl AsRef<UnixStream> for WriteHalf<'_> {
as_ref(&self) -> &UnixStream328     fn as_ref(&self) -> &UnixStream {
329         self.0
330     }
331 }
332