1 use futures_01::executor::{
2     spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01, Spawn as Spawn01,
3     UnsafeNotify as UnsafeNotify01,
4 };
5 use futures_01::{Async as Async01, Future as Future01, Stream as Stream01};
6 #[cfg(feature = "sink")]
7 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01};
8 use futures_core::{future::Future as Future03, stream::Stream as Stream03, task as task03};
9 #[cfg(feature = "sink")]
10 use futures_sink::Sink as Sink03;
11 use std::boxed::Box;
12 use std::pin::Pin;
13 use std::task::Context;
14 
15 #[cfg(feature = "io-compat")]
16 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
17 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
18 pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
19 
20 /// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
21 /// object to a futures 0.3-compatible version,
22 #[derive(Debug)]
23 #[must_use = "futures do nothing unless you `.await` or poll them"]
24 pub struct Compat01As03<T> {
25     pub(crate) inner: Spawn01<T>,
26 }
27 
28 impl<T> Unpin for Compat01As03<T> {}
29 
30 impl<T> Compat01As03<T> {
31     /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
32     /// object in a futures 0.3-compatible wrapper.
new(object: T) -> Self33     pub fn new(object: T) -> Self {
34         Self { inner: spawn01(object) }
35     }
36 
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R37     fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R {
38         let notify = &WakerToHandle(cx.waker());
39         self.inner.poll_fn_notify(notify, 0, f)
40     }
41 
42     /// Get a reference to 0.1 Future, Stream, AsyncRead, or AsyncWrite object contained within.
get_ref(&self) -> &T43     pub fn get_ref(&self) -> &T {
44         self.inner.get_ref()
45     }
46 
47     /// Get a mutable reference to 0.1 Future, Stream, AsyncRead or AsyncWrite object contained
48     /// within.
get_mut(&mut self) -> &mut T49     pub fn get_mut(&mut self) -> &mut T {
50         self.inner.get_mut()
51     }
52 
53     /// Consume this wrapper to return the underlying 0.1 Future, Stream, AsyncRead, or
54     /// AsyncWrite object.
into_inner(self) -> T55     pub fn into_inner(self) -> T {
56         self.inner.into_inner()
57     }
58 }
59 
60 /// Extension trait for futures 0.1 [`Future`](futures_01::future::Future)
61 pub trait Future01CompatExt: Future01 {
62     /// Converts a futures 0.1
63     /// [`Future<Item = T, Error = E>`](futures_01::future::Future)
64     /// into a futures 0.3
65     /// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
66     ///
67     /// ```
68     /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
69     /// # futures::executor::block_on(async {
70     /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
71     /// # // feature issues
72     /// use futures_util::compat::Future01CompatExt;
73     ///
74     /// let future = futures_01::future::ok::<u32, ()>(1);
75     /// assert_eq!(future.compat().await, Ok(1));
76     /// # });
77     /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,78     fn compat(self) -> Compat01As03<Self>
79     where
80         Self: Sized,
81     {
82         Compat01As03::new(self)
83     }
84 }
85 impl<Fut: Future01> Future01CompatExt for Fut {}
86 
87 /// Extension trait for futures 0.1 [`Stream`](futures_01::stream::Stream)
88 pub trait Stream01CompatExt: Stream01 {
89     /// Converts a futures 0.1
90     /// [`Stream<Item = T, Error = E>`](futures_01::stream::Stream)
91     /// into a futures 0.3
92     /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
93     ///
94     /// ```
95     /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
96     /// # futures::executor::block_on(async {
97     /// use futures::stream::StreamExt;
98     /// use futures_util::compat::Stream01CompatExt;
99     ///
100     /// let stream = futures_01::stream::once::<u32, ()>(Ok(1));
101     /// let mut stream = stream.compat();
102     /// assert_eq!(stream.next().await, Some(Ok(1)));
103     /// assert_eq!(stream.next().await, None);
104     /// # });
105     /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,106     fn compat(self) -> Compat01As03<Self>
107     where
108         Self: Sized,
109     {
110         Compat01As03::new(self)
111     }
112 }
113 impl<St: Stream01> Stream01CompatExt for St {}
114 
115 /// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
116 #[cfg(feature = "sink")]
117 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
118 pub trait Sink01CompatExt: Sink01 {
119     /// Converts a futures 0.1
120     /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
121     /// into a futures 0.3
122     /// [`Sink<T, Error = E>`](futures_sink::Sink).
123     ///
124     /// ```
125     /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
126     /// # futures::executor::block_on(async {
127     /// use futures::{sink::SinkExt, stream::StreamExt};
128     /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
129     ///
130     /// let (tx, rx) = futures_01::unsync::mpsc::channel(1);
131     /// let (mut tx, mut rx) = (tx.sink_compat(), rx.compat());
132     ///
133     /// tx.send(1).await.unwrap();
134     /// drop(tx);
135     /// assert_eq!(rx.next().await, Some(Ok(1)));
136     /// assert_eq!(rx.next().await, None);
137     /// # });
138     /// ```
sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem> where Self: Sized,139     fn sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem>
140     where
141         Self: Sized,
142     {
143         Compat01As03Sink::new(self)
144     }
145 }
146 #[cfg(feature = "sink")]
147 impl<Si: Sink01> Sink01CompatExt for Si {}
148 
poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>>149 fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
150     match x? {
151         Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
152         Async01::NotReady => task03::Poll::Pending,
153     }
154 }
155 
156 impl<Fut: Future01> Future03 for Compat01As03<Fut> {
157     type Output = Result<Fut::Item, Fut::Error>;
158 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Self::Output>159     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Self::Output> {
160         poll_01_to_03(self.in_notify(cx, Future01::poll))
161     }
162 }
163 
164 impl<St: Stream01> Stream03 for Compat01As03<St> {
165     type Item = Result<St::Item, St::Error>;
166 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>167     fn poll_next(
168         mut self: Pin<&mut Self>,
169         cx: &mut Context<'_>,
170     ) -> task03::Poll<Option<Self::Item>> {
171         match self.in_notify(cx, Stream01::poll)? {
172             Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
173             Async01::Ready(None) => task03::Poll::Ready(None),
174             Async01::NotReady => task03::Poll::Pending,
175         }
176     }
177 }
178 
179 /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
180 #[cfg(feature = "sink")]
181 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
182 #[derive(Debug)]
183 #[must_use = "sinks do nothing unless polled"]
184 pub struct Compat01As03Sink<S, SinkItem> {
185     pub(crate) inner: Spawn01<S>,
186     pub(crate) buffer: Option<SinkItem>,
187     pub(crate) close_started: bool,
188 }
189 
190 #[cfg(feature = "sink")]
191 impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
192 
193 #[cfg(feature = "sink")]
194 impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
195     /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
new(inner: S) -> Self196     pub fn new(inner: S) -> Self {
197         Self { inner: spawn01(inner), buffer: None, close_started: false }
198     }
199 
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R) -> R200     fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R) -> R {
201         let notify = &WakerToHandle(cx.waker());
202         self.inner.poll_fn_notify(notify, 0, f)
203     }
204 
205     /// Get a reference to 0.1 Sink object contained within.
get_ref(&self) -> &S206     pub fn get_ref(&self) -> &S {
207         self.inner.get_ref()
208     }
209 
210     /// Get a mutable reference to 0.1 Sink contained within.
get_mut(&mut self) -> &mut S211     pub fn get_mut(&mut self) -> &mut S {
212         self.inner.get_mut()
213     }
214 
215     /// Consume this wrapper to return the underlying 0.1 Sink.
into_inner(self) -> S216     pub fn into_inner(self) -> S {
217         self.inner.into_inner()
218     }
219 }
220 
221 #[cfg(feature = "sink")]
222 impl<S, SinkItem> Stream03 for Compat01As03Sink<S, SinkItem>
223 where
224     S: Stream01,
225 {
226     type Item = Result<S::Item, S::Error>;
227 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>228     fn poll_next(
229         mut self: Pin<&mut Self>,
230         cx: &mut Context<'_>,
231     ) -> task03::Poll<Option<Self::Item>> {
232         match self.in_notify(cx, Stream01::poll)? {
233             Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
234             Async01::Ready(None) => task03::Poll::Ready(None),
235             Async01::NotReady => task03::Poll::Pending,
236         }
237     }
238 }
239 
240 #[cfg(feature = "sink")]
241 impl<S, SinkItem> Sink03<SinkItem> for Compat01As03Sink<S, SinkItem>
242 where
243     S: Sink01<SinkItem = SinkItem>,
244 {
245     type Error = S::SinkError;
246 
start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error>247     fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
248         debug_assert!(self.buffer.is_none());
249         self.buffer = Some(item);
250         Ok(())
251     }
252 
poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>253     fn poll_ready(
254         mut self: Pin<&mut Self>,
255         cx: &mut Context<'_>,
256     ) -> task03::Poll<Result<(), Self::Error>> {
257         match self.buffer.take() {
258             Some(item) => match self.in_notify(cx, |f| f.start_send(item))? {
259                 AsyncSink01::Ready => task03::Poll::Ready(Ok(())),
260                 AsyncSink01::NotReady(i) => {
261                     self.buffer = Some(i);
262                     task03::Poll::Pending
263                 }
264             },
265             None => task03::Poll::Ready(Ok(())),
266         }
267     }
268 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>269     fn poll_flush(
270         mut self: Pin<&mut Self>,
271         cx: &mut Context<'_>,
272     ) -> task03::Poll<Result<(), Self::Error>> {
273         let item = self.buffer.take();
274         match self.in_notify(cx, |f| match item {
275             Some(i) => match f.start_send(i)? {
276                 AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)),
277                 AsyncSink01::NotReady(t) => Ok((Async01::NotReady, Some(t))),
278             },
279             None => f.poll_complete().map(|i| (i, None)),
280         })? {
281             (Async01::Ready(_), _) => task03::Poll::Ready(Ok(())),
282             (Async01::NotReady, item) => {
283                 self.buffer = item;
284                 task03::Poll::Pending
285             }
286         }
287     }
288 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>289     fn poll_close(
290         mut self: Pin<&mut Self>,
291         cx: &mut Context<'_>,
292     ) -> task03::Poll<Result<(), Self::Error>> {
293         let item = self.buffer.take();
294         let close_started = self.close_started;
295 
296         let result = self.in_notify(cx, |f| {
297             if !close_started {
298                 if let Some(item) = item {
299                     if let AsyncSink01::NotReady(item) = f.start_send(item)? {
300                         return Ok((Async01::NotReady, Some(item), false));
301                     }
302                 }
303 
304                 if let Async01::NotReady = f.poll_complete()? {
305                     return Ok((Async01::NotReady, None, false));
306                 }
307             }
308 
309             Ok((<S as Sink01>::close(f)?, None, true))
310         });
311 
312         match result? {
313             (Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())),
314             (Async01::NotReady, item, close_started) => {
315                 self.buffer = item;
316                 self.close_started = close_started;
317                 task03::Poll::Pending
318             }
319         }
320     }
321 }
322 
323 struct NotifyWaker(task03::Waker);
324 
325 #[allow(missing_debug_implementations)] // false positive: this is private type
326 #[derive(Clone)]
327 struct WakerToHandle<'a>(&'a task03::Waker);
328 
329 impl From<WakerToHandle<'_>> for NotifyHandle01 {
from(handle: WakerToHandle<'_>) -> Self330     fn from(handle: WakerToHandle<'_>) -> Self {
331         let ptr = Box::new(NotifyWaker(handle.0.clone()));
332 
333         unsafe { Self::new(Box::into_raw(ptr)) }
334     }
335 }
336 
337 impl Notify01 for NotifyWaker {
notify(&self, _: usize)338     fn notify(&self, _: usize) {
339         self.0.wake_by_ref();
340     }
341 }
342 
343 unsafe impl UnsafeNotify01 for NotifyWaker {
clone_raw(&self) -> NotifyHandle01344     unsafe fn clone_raw(&self) -> NotifyHandle01 {
345         WakerToHandle(&self.0).into()
346     }
347 
drop_raw(&self)348     unsafe fn drop_raw(&self) {
349         let ptr: *const dyn UnsafeNotify01 = self;
350         drop(unsafe { Box::from_raw(ptr as *mut dyn UnsafeNotify01) });
351     }
352 }
353 
354 #[cfg(feature = "io-compat")]
355 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
356 mod io {
357     use super::*;
358     use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
359     use std::io::Error;
360     use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
361 
362     /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
363     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
364     pub trait AsyncRead01CompatExt: AsyncRead01 {
365         /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
366         /// [`AsyncRead`](futures_io::AsyncRead).
367         ///
368         /// ```
369         /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
370         /// # futures::executor::block_on(async {
371         /// use futures::io::AsyncReadExt;
372         /// use futures_util::compat::AsyncRead01CompatExt;
373         ///
374         /// let input = b"Hello World!";
375         /// let reader /* : impl tokio_io::AsyncRead */ = std::io::Cursor::new(input);
376         /// let mut reader /* : impl futures::io::AsyncRead + Unpin */ = reader.compat();
377         ///
378         /// let mut output = Vec::with_capacity(12);
379         /// reader.read_to_end(&mut output).await.unwrap();
380         /// assert_eq!(output, input);
381         /// # });
382         /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,383         fn compat(self) -> Compat01As03<Self>
384         where
385             Self: Sized,
386         {
387             Compat01As03::new(self)
388         }
389     }
390     impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
391 
392     /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
393     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
394     pub trait AsyncWrite01CompatExt: AsyncWrite01 {
395         /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
396         /// [`AsyncWrite`](futures_io::AsyncWrite).
397         ///
398         /// ```
399         /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
400         /// # futures::executor::block_on(async {
401         /// use futures::io::AsyncWriteExt;
402         /// use futures_util::compat::AsyncWrite01CompatExt;
403         ///
404         /// let input = b"Hello World!";
405         /// let mut cursor = std::io::Cursor::new(Vec::with_capacity(12));
406         ///
407         /// let mut writer = (&mut cursor).compat();
408         /// writer.write_all(input).await.unwrap();
409         ///
410         /// assert_eq!(cursor.into_inner(), input);
411         /// # });
412         /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,413         fn compat(self) -> Compat01As03<Self>
414         where
415             Self: Sized,
416         {
417             Compat01As03::new(self)
418         }
419     }
420     impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
421 
422     impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> task03::Poll<Result<usize, Error>>423         fn poll_read(
424             mut self: Pin<&mut Self>,
425             cx: &mut Context<'_>,
426             buf: &mut [u8],
427         ) -> task03::Poll<Result<usize, Error>> {
428             poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf)))
429         }
430     }
431 
432     impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> task03::Poll<Result<usize, Error>>433         fn poll_write(
434             mut self: Pin<&mut Self>,
435             cx: &mut Context<'_>,
436             buf: &[u8],
437         ) -> task03::Poll<Result<usize, Error>> {
438             poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf)))
439         }
440 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Error>>441         fn poll_flush(
442             mut self: Pin<&mut Self>,
443             cx: &mut Context<'_>,
444         ) -> task03::Poll<Result<(), Error>> {
445             poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush))
446         }
447 
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Error>>448         fn poll_close(
449             mut self: Pin<&mut Self>,
450             cx: &mut Context<'_>,
451         ) -> task03::Poll<Result<(), Error>> {
452             poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown))
453         }
454     }
455 }
456