1 use futures_01::executor::{
2 spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01, Spawn as Spawn01,
3 UnsafeNotify as UnsafeNotify01,
4 };
5 use futures_01::{Async as Async01, Future as Future01, Stream as Stream01};
6 #[cfg(feature = "sink")]
7 use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01};
8 use futures_core::{future::Future as Future03, stream::Stream as Stream03, task as task03};
9 #[cfg(feature = "sink")]
10 use futures_sink::Sink as Sink03;
11 use std::boxed::Box;
12 use std::pin::Pin;
13 use std::task::Context;
14
15 #[cfg(feature = "io-compat")]
16 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
17 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
18 pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
19
20 /// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
21 /// object to a futures 0.3-compatible version,
22 #[derive(Debug)]
23 #[must_use = "futures do nothing unless you `.await` or poll them"]
24 pub struct Compat01As03<T> {
25 pub(crate) inner: Spawn01<T>,
26 }
27
28 impl<T> Unpin for Compat01As03<T> {}
29
30 impl<T> Compat01As03<T> {
31 /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
32 /// object in a futures 0.3-compatible wrapper.
new(object: T) -> Self33 pub fn new(object: T) -> Self {
34 Self { inner: spawn01(object) }
35 }
36
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R37 fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R {
38 let notify = &WakerToHandle(cx.waker());
39 self.inner.poll_fn_notify(notify, 0, f)
40 }
41
42 /// Get a reference to 0.1 Future, Stream, AsyncRead, or AsyncWrite object contained within.
get_ref(&self) -> &T43 pub fn get_ref(&self) -> &T {
44 self.inner.get_ref()
45 }
46
47 /// Get a mutable reference to 0.1 Future, Stream, AsyncRead or AsyncWrite object contained
48 /// within.
get_mut(&mut self) -> &mut T49 pub fn get_mut(&mut self) -> &mut T {
50 self.inner.get_mut()
51 }
52
53 /// Consume this wrapper to return the underlying 0.1 Future, Stream, AsyncRead, or
54 /// AsyncWrite object.
into_inner(self) -> T55 pub fn into_inner(self) -> T {
56 self.inner.into_inner()
57 }
58 }
59
60 /// Extension trait for futures 0.1 [`Future`](futures_01::future::Future)
61 pub trait Future01CompatExt: Future01 {
62 /// Converts a futures 0.1
63 /// [`Future<Item = T, Error = E>`](futures_01::future::Future)
64 /// into a futures 0.3
65 /// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
66 ///
67 /// ```
68 /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
69 /// # futures::executor::block_on(async {
70 /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
71 /// # // feature issues
72 /// use futures_util::compat::Future01CompatExt;
73 ///
74 /// let future = futures_01::future::ok::<u32, ()>(1);
75 /// assert_eq!(future.compat().await, Ok(1));
76 /// # });
77 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,78 fn compat(self) -> Compat01As03<Self>
79 where
80 Self: Sized,
81 {
82 Compat01As03::new(self)
83 }
84 }
85 impl<Fut: Future01> Future01CompatExt for Fut {}
86
87 /// Extension trait for futures 0.1 [`Stream`](futures_01::stream::Stream)
88 pub trait Stream01CompatExt: Stream01 {
89 /// Converts a futures 0.1
90 /// [`Stream<Item = T, Error = E>`](futures_01::stream::Stream)
91 /// into a futures 0.3
92 /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
93 ///
94 /// ```
95 /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
96 /// # futures::executor::block_on(async {
97 /// use futures::stream::StreamExt;
98 /// use futures_util::compat::Stream01CompatExt;
99 ///
100 /// let stream = futures_01::stream::once::<u32, ()>(Ok(1));
101 /// let mut stream = stream.compat();
102 /// assert_eq!(stream.next().await, Some(Ok(1)));
103 /// assert_eq!(stream.next().await, None);
104 /// # });
105 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,106 fn compat(self) -> Compat01As03<Self>
107 where
108 Self: Sized,
109 {
110 Compat01As03::new(self)
111 }
112 }
113 impl<St: Stream01> Stream01CompatExt for St {}
114
115 /// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
116 #[cfg(feature = "sink")]
117 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
118 pub trait Sink01CompatExt: Sink01 {
119 /// Converts a futures 0.1
120 /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
121 /// into a futures 0.3
122 /// [`Sink<T, Error = E>`](futures_sink::Sink).
123 ///
124 /// ```
125 /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
126 /// # futures::executor::block_on(async {
127 /// use futures::{sink::SinkExt, stream::StreamExt};
128 /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
129 ///
130 /// let (tx, rx) = futures_01::unsync::mpsc::channel(1);
131 /// let (mut tx, mut rx) = (tx.sink_compat(), rx.compat());
132 ///
133 /// tx.send(1).await.unwrap();
134 /// drop(tx);
135 /// assert_eq!(rx.next().await, Some(Ok(1)));
136 /// assert_eq!(rx.next().await, None);
137 /// # });
138 /// ```
sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem> where Self: Sized,139 fn sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem>
140 where
141 Self: Sized,
142 {
143 Compat01As03Sink::new(self)
144 }
145 }
146 #[cfg(feature = "sink")]
147 impl<Si: Sink01> Sink01CompatExt for Si {}
148
poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>>149 fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
150 match x? {
151 Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
152 Async01::NotReady => task03::Poll::Pending,
153 }
154 }
155
156 impl<Fut: Future01> Future03 for Compat01As03<Fut> {
157 type Output = Result<Fut::Item, Fut::Error>;
158
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Self::Output>159 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Self::Output> {
160 poll_01_to_03(self.in_notify(cx, Future01::poll))
161 }
162 }
163
164 impl<St: Stream01> Stream03 for Compat01As03<St> {
165 type Item = Result<St::Item, St::Error>;
166
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>167 fn poll_next(
168 mut self: Pin<&mut Self>,
169 cx: &mut Context<'_>,
170 ) -> task03::Poll<Option<Self::Item>> {
171 match self.in_notify(cx, Stream01::poll)? {
172 Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
173 Async01::Ready(None) => task03::Poll::Ready(None),
174 Async01::NotReady => task03::Poll::Pending,
175 }
176 }
177 }
178
179 /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
180 #[cfg(feature = "sink")]
181 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
182 #[derive(Debug)]
183 #[must_use = "sinks do nothing unless polled"]
184 pub struct Compat01As03Sink<S, SinkItem> {
185 pub(crate) inner: Spawn01<S>,
186 pub(crate) buffer: Option<SinkItem>,
187 pub(crate) close_started: bool,
188 }
189
190 #[cfg(feature = "sink")]
191 impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
192
193 #[cfg(feature = "sink")]
194 impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
195 /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
new(inner: S) -> Self196 pub fn new(inner: S) -> Self {
197 Self { inner: spawn01(inner), buffer: None, close_started: false }
198 }
199
in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R) -> R200 fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R) -> R {
201 let notify = &WakerToHandle(cx.waker());
202 self.inner.poll_fn_notify(notify, 0, f)
203 }
204
205 /// Get a reference to 0.1 Sink object contained within.
get_ref(&self) -> &S206 pub fn get_ref(&self) -> &S {
207 self.inner.get_ref()
208 }
209
210 /// Get a mutable reference to 0.1 Sink contained within.
get_mut(&mut self) -> &mut S211 pub fn get_mut(&mut self) -> &mut S {
212 self.inner.get_mut()
213 }
214
215 /// Consume this wrapper to return the underlying 0.1 Sink.
into_inner(self) -> S216 pub fn into_inner(self) -> S {
217 self.inner.into_inner()
218 }
219 }
220
221 #[cfg(feature = "sink")]
222 impl<S, SinkItem> Stream03 for Compat01As03Sink<S, SinkItem>
223 where
224 S: Stream01,
225 {
226 type Item = Result<S::Item, S::Error>;
227
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Option<Self::Item>>228 fn poll_next(
229 mut self: Pin<&mut Self>,
230 cx: &mut Context<'_>,
231 ) -> task03::Poll<Option<Self::Item>> {
232 match self.in_notify(cx, Stream01::poll)? {
233 Async01::Ready(Some(t)) => task03::Poll::Ready(Some(Ok(t))),
234 Async01::Ready(None) => task03::Poll::Ready(None),
235 Async01::NotReady => task03::Poll::Pending,
236 }
237 }
238 }
239
240 #[cfg(feature = "sink")]
241 impl<S, SinkItem> Sink03<SinkItem> for Compat01As03Sink<S, SinkItem>
242 where
243 S: Sink01<SinkItem = SinkItem>,
244 {
245 type Error = S::SinkError;
246
start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error>247 fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
248 debug_assert!(self.buffer.is_none());
249 self.buffer = Some(item);
250 Ok(())
251 }
252
poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>253 fn poll_ready(
254 mut self: Pin<&mut Self>,
255 cx: &mut Context<'_>,
256 ) -> task03::Poll<Result<(), Self::Error>> {
257 match self.buffer.take() {
258 Some(item) => match self.in_notify(cx, |f| f.start_send(item))? {
259 AsyncSink01::Ready => task03::Poll::Ready(Ok(())),
260 AsyncSink01::NotReady(i) => {
261 self.buffer = Some(i);
262 task03::Poll::Pending
263 }
264 },
265 None => task03::Poll::Ready(Ok(())),
266 }
267 }
268
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>269 fn poll_flush(
270 mut self: Pin<&mut Self>,
271 cx: &mut Context<'_>,
272 ) -> task03::Poll<Result<(), Self::Error>> {
273 let item = self.buffer.take();
274 match self.in_notify(cx, |f| match item {
275 Some(i) => match f.start_send(i)? {
276 AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)),
277 AsyncSink01::NotReady(t) => Ok((Async01::NotReady, Some(t))),
278 },
279 None => f.poll_complete().map(|i| (i, None)),
280 })? {
281 (Async01::Ready(_), _) => task03::Poll::Ready(Ok(())),
282 (Async01::NotReady, item) => {
283 self.buffer = item;
284 task03::Poll::Pending
285 }
286 }
287 }
288
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Self::Error>>289 fn poll_close(
290 mut self: Pin<&mut Self>,
291 cx: &mut Context<'_>,
292 ) -> task03::Poll<Result<(), Self::Error>> {
293 let item = self.buffer.take();
294 let close_started = self.close_started;
295
296 let result = self.in_notify(cx, |f| {
297 if !close_started {
298 if let Some(item) = item {
299 if let AsyncSink01::NotReady(item) = f.start_send(item)? {
300 return Ok((Async01::NotReady, Some(item), false));
301 }
302 }
303
304 if let Async01::NotReady = f.poll_complete()? {
305 return Ok((Async01::NotReady, None, false));
306 }
307 }
308
309 Ok((<S as Sink01>::close(f)?, None, true))
310 });
311
312 match result? {
313 (Async01::Ready(_), _, _) => task03::Poll::Ready(Ok(())),
314 (Async01::NotReady, item, close_started) => {
315 self.buffer = item;
316 self.close_started = close_started;
317 task03::Poll::Pending
318 }
319 }
320 }
321 }
322
323 struct NotifyWaker(task03::Waker);
324
325 #[allow(missing_debug_implementations)] // false positive: this is private type
326 #[derive(Clone)]
327 struct WakerToHandle<'a>(&'a task03::Waker);
328
329 impl From<WakerToHandle<'_>> for NotifyHandle01 {
from(handle: WakerToHandle<'_>) -> Self330 fn from(handle: WakerToHandle<'_>) -> Self {
331 let ptr = Box::new(NotifyWaker(handle.0.clone()));
332
333 unsafe { Self::new(Box::into_raw(ptr)) }
334 }
335 }
336
337 impl Notify01 for NotifyWaker {
notify(&self, _: usize)338 fn notify(&self, _: usize) {
339 self.0.wake_by_ref();
340 }
341 }
342
343 unsafe impl UnsafeNotify01 for NotifyWaker {
clone_raw(&self) -> NotifyHandle01344 unsafe fn clone_raw(&self) -> NotifyHandle01 {
345 WakerToHandle(&self.0).into()
346 }
347
drop_raw(&self)348 unsafe fn drop_raw(&self) {
349 let ptr: *const dyn UnsafeNotify01 = self;
350 drop(unsafe { Box::from_raw(ptr as *mut dyn UnsafeNotify01) });
351 }
352 }
353
354 #[cfg(feature = "io-compat")]
355 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
356 mod io {
357 use super::*;
358 use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
359 use std::io::Error;
360 use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
361
362 /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
363 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
364 pub trait AsyncRead01CompatExt: AsyncRead01 {
365 /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
366 /// [`AsyncRead`](futures_io::AsyncRead).
367 ///
368 /// ```
369 /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
370 /// # futures::executor::block_on(async {
371 /// use futures::io::AsyncReadExt;
372 /// use futures_util::compat::AsyncRead01CompatExt;
373 ///
374 /// let input = b"Hello World!";
375 /// let reader /* : impl tokio_io::AsyncRead */ = std::io::Cursor::new(input);
376 /// let mut reader /* : impl futures::io::AsyncRead + Unpin */ = reader.compat();
377 ///
378 /// let mut output = Vec::with_capacity(12);
379 /// reader.read_to_end(&mut output).await.unwrap();
380 /// assert_eq!(output, input);
381 /// # });
382 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,383 fn compat(self) -> Compat01As03<Self>
384 where
385 Self: Sized,
386 {
387 Compat01As03::new(self)
388 }
389 }
390 impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
391
392 /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
393 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
394 pub trait AsyncWrite01CompatExt: AsyncWrite01 {
395 /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
396 /// [`AsyncWrite`](futures_io::AsyncWrite).
397 ///
398 /// ```
399 /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
400 /// # futures::executor::block_on(async {
401 /// use futures::io::AsyncWriteExt;
402 /// use futures_util::compat::AsyncWrite01CompatExt;
403 ///
404 /// let input = b"Hello World!";
405 /// let mut cursor = std::io::Cursor::new(Vec::with_capacity(12));
406 ///
407 /// let mut writer = (&mut cursor).compat();
408 /// writer.write_all(input).await.unwrap();
409 ///
410 /// assert_eq!(cursor.into_inner(), input);
411 /// # });
412 /// ```
compat(self) -> Compat01As03<Self> where Self: Sized,413 fn compat(self) -> Compat01As03<Self>
414 where
415 Self: Sized,
416 {
417 Compat01As03::new(self)
418 }
419 }
420 impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
421
422 impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> task03::Poll<Result<usize, Error>>423 fn poll_read(
424 mut self: Pin<&mut Self>,
425 cx: &mut Context<'_>,
426 buf: &mut [u8],
427 ) -> task03::Poll<Result<usize, Error>> {
428 poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf)))
429 }
430 }
431
432 impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> task03::Poll<Result<usize, Error>>433 fn poll_write(
434 mut self: Pin<&mut Self>,
435 cx: &mut Context<'_>,
436 buf: &[u8],
437 ) -> task03::Poll<Result<usize, Error>> {
438 poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf)))
439 }
440
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Error>>441 fn poll_flush(
442 mut self: Pin<&mut Self>,
443 cx: &mut Context<'_>,
444 ) -> task03::Poll<Result<(), Error>> {
445 poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush))
446 }
447
poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> task03::Poll<Result<(), Error>>448 fn poll_close(
449 mut self: Pin<&mut Self>,
450 cx: &mut Context<'_>,
451 ) -> task03::Poll<Result<(), Error>> {
452 poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown))
453 }
454 }
455 }
456