1 use super::copy::CopyBuffer;
2 
3 use crate::io::{AsyncRead, AsyncWrite};
4 
5 use std::future::poll_fn;
6 use std::io;
7 use std::pin::Pin;
8 use std::task::{ready, Context, Poll};
9 
10 enum TransferState {
11     Running(CopyBuffer),
12     ShuttingDown(u64),
13     Done(u64),
14 }
15 
transfer_one_direction<A, B>( cx: &mut Context<'_>, state: &mut TransferState, r: &mut A, w: &mut B, ) -> Poll<io::Result<u64>> where A: AsyncRead + AsyncWrite + Unpin + ?Sized, B: AsyncRead + AsyncWrite + Unpin + ?Sized,16 fn transfer_one_direction<A, B>(
17     cx: &mut Context<'_>,
18     state: &mut TransferState,
19     r: &mut A,
20     w: &mut B,
21 ) -> Poll<io::Result<u64>>
22 where
23     A: AsyncRead + AsyncWrite + Unpin + ?Sized,
24     B: AsyncRead + AsyncWrite + Unpin + ?Sized,
25 {
26     let mut r = Pin::new(r);
27     let mut w = Pin::new(w);
28 
29     loop {
30         match state {
31             TransferState::Running(buf) => {
32                 let count = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?;
33                 *state = TransferState::ShuttingDown(count);
34             }
35             TransferState::ShuttingDown(count) => {
36                 ready!(w.as_mut().poll_shutdown(cx))?;
37 
38                 *state = TransferState::Done(*count);
39             }
40             TransferState::Done(count) => return Poll::Ready(Ok(*count)),
41         }
42     }
43 }
44 /// Copies data in both directions between `a` and `b`.
45 ///
46 /// This function returns a future that will read from both streams,
47 /// writing any data read to the opposing stream.
48 /// This happens in both directions concurrently.
49 ///
50 /// If an EOF is observed on one stream, [`shutdown()`] will be invoked on
51 /// the other, and reading from that stream will stop. Copying of data in
52 /// the other direction will continue.
53 ///
54 /// The future will complete successfully once both directions of communication has been shut down.
55 /// A direction is shut down when the reader reports EOF,
56 /// at which point [`shutdown()`] is called on the corresponding writer. When finished,
57 /// it will return a tuple of the number of bytes copied from a to b
58 /// and the number of bytes copied from b to a, in that order.
59 ///
60 /// It uses two 8 KB buffers for transferring bytes between `a` and `b` by default.
61 /// To set your own buffers sizes use [`copy_bidirectional_with_sizes()`].
62 ///
63 /// [`shutdown()`]: crate::io::AsyncWriteExt::shutdown
64 ///
65 /// # Errors
66 ///
67 /// The future will immediately return an error if any IO operation on `a`
68 /// or `b` returns an error. Some data read from either stream may be lost (not
69 /// written to the other stream) in this case.
70 ///
71 /// # Return value
72 ///
73 /// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`.
74 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> io::Result<(u64, u64)> where A: AsyncRead + AsyncWrite + Unpin + ?Sized, B: AsyncRead + AsyncWrite + Unpin + ?Sized,75 pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> io::Result<(u64, u64)>
76 where
77     A: AsyncRead + AsyncWrite + Unpin + ?Sized,
78     B: AsyncRead + AsyncWrite + Unpin + ?Sized,
79 {
80     copy_bidirectional_impl(
81         a,
82         b,
83         CopyBuffer::new(super::DEFAULT_BUF_SIZE),
84         CopyBuffer::new(super::DEFAULT_BUF_SIZE),
85     )
86     .await
87 }
88 
89 /// Copies data in both directions between `a` and `b` using buffers of the specified size.
90 ///
91 /// This method is the same as the [`copy_bidirectional()`], except that it allows you to set the
92 /// size of the internal buffers used when copying data.
93 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
copy_bidirectional_with_sizes<A, B>( a: &mut A, b: &mut B, a_to_b_buf_size: usize, b_to_a_buf_size: usize, ) -> io::Result<(u64, u64)> where A: AsyncRead + AsyncWrite + Unpin + ?Sized, B: AsyncRead + AsyncWrite + Unpin + ?Sized,94 pub async fn copy_bidirectional_with_sizes<A, B>(
95     a: &mut A,
96     b: &mut B,
97     a_to_b_buf_size: usize,
98     b_to_a_buf_size: usize,
99 ) -> io::Result<(u64, u64)>
100 where
101     A: AsyncRead + AsyncWrite + Unpin + ?Sized,
102     B: AsyncRead + AsyncWrite + Unpin + ?Sized,
103 {
104     copy_bidirectional_impl(
105         a,
106         b,
107         CopyBuffer::new(a_to_b_buf_size),
108         CopyBuffer::new(b_to_a_buf_size),
109     )
110     .await
111 }
112 
copy_bidirectional_impl<A, B>( a: &mut A, b: &mut B, a_to_b_buffer: CopyBuffer, b_to_a_buffer: CopyBuffer, ) -> io::Result<(u64, u64)> where A: AsyncRead + AsyncWrite + Unpin + ?Sized, B: AsyncRead + AsyncWrite + Unpin + ?Sized,113 async fn copy_bidirectional_impl<A, B>(
114     a: &mut A,
115     b: &mut B,
116     a_to_b_buffer: CopyBuffer,
117     b_to_a_buffer: CopyBuffer,
118 ) -> io::Result<(u64, u64)>
119 where
120     A: AsyncRead + AsyncWrite + Unpin + ?Sized,
121     B: AsyncRead + AsyncWrite + Unpin + ?Sized,
122 {
123     let mut a_to_b = TransferState::Running(a_to_b_buffer);
124     let mut b_to_a = TransferState::Running(b_to_a_buffer);
125     poll_fn(|cx| {
126         let a_to_b = transfer_one_direction(cx, &mut a_to_b, a, b)?;
127         let b_to_a = transfer_one_direction(cx, &mut b_to_a, b, a)?;
128 
129         // It is not a problem if ready! returns early because transfer_one_direction for the
130         // other direction will keep returning TransferState::Done(count) in future calls to poll
131         let a_to_b = ready!(a_to_b);
132         let b_to_a = ready!(b_to_a);
133 
134         Poll::Ready(Ok((a_to_b, b_to_a)))
135     })
136     .await
137 }
138