1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2 
3 //! A one-shot channel is used for sending a single message between
4 //! asynchronous tasks. The [`channel`] function is used to create a
5 //! [`Sender`] and [`Receiver`] handle pair that form the channel.
6 //!
7 //! The `Sender` handle is used by the producer to send the value.
8 //! The `Receiver` handle is used by the consumer to receive the value.
9 //!
10 //! Each handle can be used on separate tasks.
11 //!
12 //! Since the `send` method is not async, it can be used anywhere. This includes
13 //! sending between two runtimes, and using it from non-async code.
14 //!
15 //! If the [`Receiver`] is closed before receiving a message which has already
16 //! been sent, the message will remain in the channel until the receiver is
17 //! dropped, at which point the message will be dropped immediately.
18 //!
19 //! # Examples
20 //!
21 //! ```
22 //! use tokio::sync::oneshot;
23 //!
24 //! #[tokio::main]
25 //! async fn main() {
26 //!     let (tx, rx) = oneshot::channel();
27 //!
28 //!     tokio::spawn(async move {
29 //!         if let Err(_) = tx.send(3) {
30 //!             println!("the receiver dropped");
31 //!         }
32 //!     });
33 //!
34 //!     match rx.await {
35 //!         Ok(v) => println!("got = {:?}", v),
36 //!         Err(_) => println!("the sender dropped"),
37 //!     }
38 //! }
39 //! ```
40 //!
41 //! If the sender is dropped without sending, the receiver will fail with
42 //! [`error::RecvError`]:
43 //!
44 //! ```
45 //! use tokio::sync::oneshot;
46 //!
47 //! #[tokio::main]
48 //! async fn main() {
49 //!     let (tx, rx) = oneshot::channel::<u32>();
50 //!
51 //!     tokio::spawn(async move {
52 //!         drop(tx);
53 //!     });
54 //!
55 //!     match rx.await {
56 //!         Ok(_) => panic!("This doesn't happen"),
57 //!         Err(_) => println!("the sender dropped"),
58 //!     }
59 //! }
60 //! ```
61 //!
62 //! To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
63 //! the channel.
64 //!
65 //! ```
66 //! use tokio::sync::oneshot;
67 //! use tokio::time::{interval, sleep, Duration};
68 //!
69 //! #[tokio::main]
70 //! # async fn _doc() {}
71 //! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72 //! async fn main() {
73 //!     let (send, mut recv) = oneshot::channel();
74 //!     let mut interval = interval(Duration::from_millis(100));
75 //!
76 //!     # let handle =
77 //!     tokio::spawn(async move {
78 //!         sleep(Duration::from_secs(1)).await;
79 //!         send.send("shut down").unwrap();
80 //!     });
81 //!
82 //!     loop {
83 //!         tokio::select! {
84 //!             _ = interval.tick() => println!("Another 100ms"),
85 //!             msg = &mut recv => {
86 //!                 println!("Got message: {}", msg.unwrap());
87 //!                 break;
88 //!             }
89 //!         }
90 //!     }
91 //!     # handle.await.unwrap();
92 //! }
93 //! ```
94 //!
95 //! To use a `Sender` from a destructor, put it in an [`Option`] and call
96 //! [`Option::take`].
97 //!
98 //! ```
99 //! use tokio::sync::oneshot;
100 //!
101 //! struct SendOnDrop {
102 //!     sender: Option<oneshot::Sender<&'static str>>,
103 //! }
104 //! impl Drop for SendOnDrop {
105 //!     fn drop(&mut self) {
106 //!         if let Some(sender) = self.sender.take() {
107 //!             // Using `let _ =` to ignore send errors.
108 //!             let _ = sender.send("I got dropped!");
109 //!         }
110 //!     }
111 //! }
112 //!
113 //! #[tokio::main]
114 //! # async fn _doc() {}
115 //! # #[tokio::main(flavor = "current_thread")]
116 //! async fn main() {
117 //!     let (send, recv) = oneshot::channel();
118 //!
119 //!     let send_on_drop = SendOnDrop { sender: Some(send) };
120 //!     drop(send_on_drop);
121 //!
122 //!     assert_eq!(recv.await, Ok("I got dropped!"));
123 //! }
124 //! ```
125 
126 use crate::loom::cell::UnsafeCell;
127 use crate::loom::sync::atomic::AtomicUsize;
128 use crate::loom::sync::Arc;
129 #[cfg(all(tokio_unstable, feature = "tracing"))]
130 use crate::util::trace;
131 
132 use std::fmt;
133 use std::future::Future;
134 use std::mem::MaybeUninit;
135 use std::pin::Pin;
136 use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137 use std::task::Poll::{Pending, Ready};
138 use std::task::{ready, Context, Poll, Waker};
139 
140 /// Sends a value to the associated [`Receiver`].
141 ///
142 /// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
143 /// [`channel`](fn@channel) function.
144 ///
145 /// # Examples
146 ///
147 /// ```
148 /// use tokio::sync::oneshot;
149 ///
150 /// #[tokio::main]
151 /// async fn main() {
152 ///     let (tx, rx) = oneshot::channel();
153 ///
154 ///     tokio::spawn(async move {
155 ///         if let Err(_) = tx.send(3) {
156 ///             println!("the receiver dropped");
157 ///         }
158 ///     });
159 ///
160 ///     match rx.await {
161 ///         Ok(v) => println!("got = {:?}", v),
162 ///         Err(_) => println!("the sender dropped"),
163 ///     }
164 /// }
165 /// ```
166 ///
167 /// If the sender is dropped without sending, the receiver will fail with
168 /// [`error::RecvError`]:
169 ///
170 /// ```
171 /// use tokio::sync::oneshot;
172 ///
173 /// #[tokio::main]
174 /// async fn main() {
175 ///     let (tx, rx) = oneshot::channel::<u32>();
176 ///
177 ///     tokio::spawn(async move {
178 ///         drop(tx);
179 ///     });
180 ///
181 ///     match rx.await {
182 ///         Ok(_) => panic!("This doesn't happen"),
183 ///         Err(_) => println!("the sender dropped"),
184 ///     }
185 /// }
186 /// ```
187 ///
188 /// To use a `Sender` from a destructor, put it in an [`Option`] and call
189 /// [`Option::take`].
190 ///
191 /// ```
192 /// use tokio::sync::oneshot;
193 ///
194 /// struct SendOnDrop {
195 ///     sender: Option<oneshot::Sender<&'static str>>,
196 /// }
197 /// impl Drop for SendOnDrop {
198 ///     fn drop(&mut self) {
199 ///         if let Some(sender) = self.sender.take() {
200 ///             // Using `let _ =` to ignore send errors.
201 ///             let _ = sender.send("I got dropped!");
202 ///         }
203 ///     }
204 /// }
205 ///
206 /// #[tokio::main]
207 /// # async fn _doc() {}
208 /// # #[tokio::main(flavor = "current_thread")]
209 /// async fn main() {
210 ///     let (send, recv) = oneshot::channel();
211 ///
212 ///     let send_on_drop = SendOnDrop { sender: Some(send) };
213 ///     drop(send_on_drop);
214 ///
215 ///     assert_eq!(recv.await, Ok("I got dropped!"));
216 /// }
217 /// ```
218 ///
219 /// [`Option`]: std::option::Option
220 /// [`Option::take`]: std::option::Option::take
221 #[derive(Debug)]
222 pub struct Sender<T> {
223     inner: Option<Arc<Inner<T>>>,
224     #[cfg(all(tokio_unstable, feature = "tracing"))]
225     resource_span: tracing::Span,
226 }
227 
228 /// Receives a value from the associated [`Sender`].
229 ///
230 /// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
231 /// [`channel`](fn@channel) function.
232 ///
233 /// This channel has no `recv` method because the receiver itself implements the
234 /// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235 ///
236 /// The `poll` method on the `Future` trait is allowed to spuriously return
237 /// `Poll::Pending` even if the message has been sent. If such a spurious
238 /// failure happens, then the caller will be woken when the spurious failure has
239 /// been resolved so that the caller can attempt to receive the message again.
240 /// Note that receiving such a wakeup does not guarantee that the next call will
241 /// succeed — it could fail with another spurious failure. (A spurious failure
242 /// does not mean that the message is lost. It is just delayed.)
243 ///
244 /// [`Future`]: trait@std::future::Future
245 ///
246 /// # Examples
247 ///
248 /// ```
249 /// use tokio::sync::oneshot;
250 ///
251 /// #[tokio::main]
252 /// async fn main() {
253 ///     let (tx, rx) = oneshot::channel();
254 ///
255 ///     tokio::spawn(async move {
256 ///         if let Err(_) = tx.send(3) {
257 ///             println!("the receiver dropped");
258 ///         }
259 ///     });
260 ///
261 ///     match rx.await {
262 ///         Ok(v) => println!("got = {:?}", v),
263 ///         Err(_) => println!("the sender dropped"),
264 ///     }
265 /// }
266 /// ```
267 ///
268 /// If the sender is dropped without sending, the receiver will fail with
269 /// [`error::RecvError`]:
270 ///
271 /// ```
272 /// use tokio::sync::oneshot;
273 ///
274 /// #[tokio::main]
275 /// async fn main() {
276 ///     let (tx, rx) = oneshot::channel::<u32>();
277 ///
278 ///     tokio::spawn(async move {
279 ///         drop(tx);
280 ///     });
281 ///
282 ///     match rx.await {
283 ///         Ok(_) => panic!("This doesn't happen"),
284 ///         Err(_) => println!("the sender dropped"),
285 ///     }
286 /// }
287 /// ```
288 ///
289 /// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
290 /// channel.
291 ///
292 /// ```
293 /// use tokio::sync::oneshot;
294 /// use tokio::time::{interval, sleep, Duration};
295 ///
296 /// #[tokio::main]
297 /// # async fn _doc() {}
298 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
299 /// async fn main() {
300 ///     let (send, mut recv) = oneshot::channel();
301 ///     let mut interval = interval(Duration::from_millis(100));
302 ///
303 ///     # let handle =
304 ///     tokio::spawn(async move {
305 ///         sleep(Duration::from_secs(1)).await;
306 ///         send.send("shut down").unwrap();
307 ///     });
308 ///
309 ///     loop {
310 ///         tokio::select! {
311 ///             _ = interval.tick() => println!("Another 100ms"),
312 ///             msg = &mut recv => {
313 ///                 println!("Got message: {}", msg.unwrap());
314 ///                 break;
315 ///             }
316 ///         }
317 ///     }
318 ///     # handle.await.unwrap();
319 /// }
320 /// ```
321 #[derive(Debug)]
322 pub struct Receiver<T> {
323     inner: Option<Arc<Inner<T>>>,
324     #[cfg(all(tokio_unstable, feature = "tracing"))]
325     resource_span: tracing::Span,
326     #[cfg(all(tokio_unstable, feature = "tracing"))]
327     async_op_span: tracing::Span,
328     #[cfg(all(tokio_unstable, feature = "tracing"))]
329     async_op_poll_span: tracing::Span,
330 }
331 
332 pub mod error {
333     //! `Oneshot` error types.
334 
335     use std::fmt;
336 
337     /// Error returned by the `Future` implementation for `Receiver`.
338     ///
339     /// This error is returned by the receiver when the sender is dropped without sending.
340     #[derive(Debug, Eq, PartialEq, Clone)]
341     pub struct RecvError(pub(super) ());
342 
343     /// Error returned by the `try_recv` function on `Receiver`.
344     #[derive(Debug, Eq, PartialEq, Clone)]
345     pub enum TryRecvError {
346         /// The send half of the channel has not yet sent a value.
347         Empty,
348 
349         /// The send half of the channel was dropped without sending a value.
350         Closed,
351     }
352 
353     // ===== impl RecvError =====
354 
355     impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result356         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
357             write!(fmt, "channel closed")
358         }
359     }
360 
361     impl std::error::Error for RecvError {}
362 
363     // ===== impl TryRecvError =====
364 
365     impl fmt::Display for TryRecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result366         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
367             match self {
368                 TryRecvError::Empty => write!(fmt, "channel empty"),
369                 TryRecvError::Closed => write!(fmt, "channel closed"),
370             }
371         }
372     }
373 
374     impl std::error::Error for TryRecvError {}
375 }
376 
377 use self::error::*;
378 
379 struct Inner<T> {
380     /// Manages the state of the inner cell.
381     state: AtomicUsize,
382 
383     /// The value. This is set by `Sender` and read by `Receiver`. The state of
384     /// the cell is tracked by `state`.
385     value: UnsafeCell<Option<T>>,
386 
387     /// The task to notify when the receiver drops without consuming the value.
388     ///
389     /// ## Safety
390     ///
391     /// The `TX_TASK_SET` bit in the `state` field is set if this field is
392     /// initialized. If that bit is unset, this field may be uninitialized.
393     tx_task: Task,
394 
395     /// The task to notify when the value is sent.
396     ///
397     /// ## Safety
398     ///
399     /// The `RX_TASK_SET` bit in the `state` field is set if this field is
400     /// initialized. If that bit is unset, this field may be uninitialized.
401     rx_task: Task,
402 }
403 
404 struct Task(UnsafeCell<MaybeUninit<Waker>>);
405 
406 impl Task {
will_wake(&self, cx: &mut Context<'_>) -> bool407     unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
408         self.with_task(|w| w.will_wake(cx.waker()))
409     }
410 
with_task<F, R>(&self, f: F) -> R where F: FnOnce(&Waker) -> R,411     unsafe fn with_task<F, R>(&self, f: F) -> R
412     where
413         F: FnOnce(&Waker) -> R,
414     {
415         self.0.with(|ptr| {
416             let waker: *const Waker = (*ptr).as_ptr();
417             f(&*waker)
418         })
419     }
420 
drop_task(&self)421     unsafe fn drop_task(&self) {
422         self.0.with_mut(|ptr| {
423             let ptr: *mut Waker = (*ptr).as_mut_ptr();
424             ptr.drop_in_place();
425         });
426     }
427 
set_task(&self, cx: &mut Context<'_>)428     unsafe fn set_task(&self, cx: &mut Context<'_>) {
429         self.0.with_mut(|ptr| {
430             let ptr: *mut Waker = (*ptr).as_mut_ptr();
431             ptr.write(cx.waker().clone());
432         });
433     }
434 }
435 
436 #[derive(Clone, Copy)]
437 struct State(usize);
438 
439 /// Creates a new one-shot channel for sending single values across asynchronous
440 /// tasks.
441 ///
442 /// The function returns separate "send" and "receive" handles. The `Sender`
443 /// handle is used by the producer to send the value. The `Receiver` handle is
444 /// used by the consumer to receive the value.
445 ///
446 /// Each handle can be used on separate tasks.
447 ///
448 /// # Examples
449 ///
450 /// ```
451 /// use tokio::sync::oneshot;
452 ///
453 /// #[tokio::main]
454 /// async fn main() {
455 ///     let (tx, rx) = oneshot::channel();
456 ///
457 ///     tokio::spawn(async move {
458 ///         if let Err(_) = tx.send(3) {
459 ///             println!("the receiver dropped");
460 ///         }
461 ///     });
462 ///
463 ///     match rx.await {
464 ///         Ok(v) => println!("got = {:?}", v),
465 ///         Err(_) => println!("the sender dropped"),
466 ///     }
467 /// }
468 /// ```
469 #[track_caller]
channel<T>() -> (Sender<T>, Receiver<T>)470 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
471     #[cfg(all(tokio_unstable, feature = "tracing"))]
472     let resource_span = {
473         let location = std::panic::Location::caller();
474 
475         let resource_span = tracing::trace_span!(
476             parent: None,
477             "runtime.resource",
478             concrete_type = "Sender|Receiver",
479             kind = "Sync",
480             loc.file = location.file(),
481             loc.line = location.line(),
482             loc.col = location.column(),
483         );
484 
485         resource_span.in_scope(|| {
486             tracing::trace!(
487             target: "runtime::resource::state_update",
488             tx_dropped = false,
489             tx_dropped.op = "override",
490             )
491         });
492 
493         resource_span.in_scope(|| {
494             tracing::trace!(
495             target: "runtime::resource::state_update",
496             rx_dropped = false,
497             rx_dropped.op = "override",
498             )
499         });
500 
501         resource_span.in_scope(|| {
502             tracing::trace!(
503             target: "runtime::resource::state_update",
504             value_sent = false,
505             value_sent.op = "override",
506             )
507         });
508 
509         resource_span.in_scope(|| {
510             tracing::trace!(
511             target: "runtime::resource::state_update",
512             value_received = false,
513             value_received.op = "override",
514             )
515         });
516 
517         resource_span
518     };
519 
520     let inner = Arc::new(Inner {
521         state: AtomicUsize::new(State::new().as_usize()),
522         value: UnsafeCell::new(None),
523         tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
524         rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
525     });
526 
527     let tx = Sender {
528         inner: Some(inner.clone()),
529         #[cfg(all(tokio_unstable, feature = "tracing"))]
530         resource_span: resource_span.clone(),
531     };
532 
533     #[cfg(all(tokio_unstable, feature = "tracing"))]
534     let async_op_span = resource_span
535         .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
536 
537     #[cfg(all(tokio_unstable, feature = "tracing"))]
538     let async_op_poll_span =
539         async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
540 
541     let rx = Receiver {
542         inner: Some(inner),
543         #[cfg(all(tokio_unstable, feature = "tracing"))]
544         resource_span,
545         #[cfg(all(tokio_unstable, feature = "tracing"))]
546         async_op_span,
547         #[cfg(all(tokio_unstable, feature = "tracing"))]
548         async_op_poll_span,
549     };
550 
551     (tx, rx)
552 }
553 
554 impl<T> Sender<T> {
555     /// Attempts to send a value on this channel, returning it back if it could
556     /// not be sent.
557     ///
558     /// This method consumes `self` as only one value may ever be sent on a `oneshot`
559     /// channel. It is not marked async because sending a message to an `oneshot`
560     /// channel never requires any form of waiting.  Because of this, the `send`
561     /// method can be used in both synchronous and asynchronous code without
562     /// problems.
563     ///
564     /// A successful send occurs when it is determined that the other end of the
565     /// channel has not hung up already. An unsuccessful send would be one where
566     /// the corresponding receiver has already been deallocated. Note that a
567     /// return value of `Err` means that the data will never be received, but
568     /// a return value of `Ok` does *not* mean that the data will be received.
569     /// It is possible for the corresponding receiver to hang up immediately
570     /// after this function returns `Ok`.
571     ///
572     /// # Examples
573     ///
574     /// Send a value to another task
575     ///
576     /// ```
577     /// use tokio::sync::oneshot;
578     ///
579     /// #[tokio::main]
580     /// async fn main() {
581     ///     let (tx, rx) = oneshot::channel();
582     ///
583     ///     tokio::spawn(async move {
584     ///         if let Err(_) = tx.send(3) {
585     ///             println!("the receiver dropped");
586     ///         }
587     ///     });
588     ///
589     ///     match rx.await {
590     ///         Ok(v) => println!("got = {:?}", v),
591     ///         Err(_) => println!("the sender dropped"),
592     ///     }
593     /// }
594     /// ```
send(mut self, t: T) -> Result<(), T>595     pub fn send(mut self, t: T) -> Result<(), T> {
596         let inner = self.inner.take().unwrap();
597 
598         inner.value.with_mut(|ptr| unsafe {
599             // SAFETY: The receiver will not access the `UnsafeCell` unless the
600             // channel has been marked as "complete" (the `VALUE_SENT` state bit
601             // is set).
602             // That bit is only set by the sender later on in this method, and
603             // calling this method consumes `self`. Therefore, if it was possible to
604             // call this method, we know that the `VALUE_SENT` bit is unset, and
605             // the receiver is not currently accessing the `UnsafeCell`.
606             *ptr = Some(t);
607         });
608 
609         if !inner.complete() {
610             unsafe {
611                 // SAFETY: The receiver will not access the `UnsafeCell` unless
612                 // the channel has been marked as "complete". Calling
613                 // `complete()` will return true if this bit is set, and false
614                 // if it is not set. Thus, if `complete()` returned false, it is
615                 // safe for us to access the value, because we know that the
616                 // receiver will not.
617                 return Err(inner.consume_value().unwrap());
618             }
619         }
620 
621         #[cfg(all(tokio_unstable, feature = "tracing"))]
622         self.resource_span.in_scope(|| {
623             tracing::trace!(
624             target: "runtime::resource::state_update",
625             value_sent = true,
626             value_sent.op = "override",
627             )
628         });
629 
630         Ok(())
631     }
632 
633     /// Waits for the associated [`Receiver`] handle to close.
634     ///
635     /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
636     /// [`Receiver`] value is dropped.
637     ///
638     /// This function is useful when paired with `select!` to abort a
639     /// computation when the receiver is no longer interested in the result.
640     ///
641     /// # Return
642     ///
643     /// Returns a `Future` which must be awaited on.
644     ///
645     /// [`Receiver`]: Receiver
646     /// [`close`]: Receiver::close
647     ///
648     /// # Examples
649     ///
650     /// Basic usage
651     ///
652     /// ```
653     /// use tokio::sync::oneshot;
654     ///
655     /// #[tokio::main]
656     /// async fn main() {
657     ///     let (mut tx, rx) = oneshot::channel::<()>();
658     ///
659     ///     tokio::spawn(async move {
660     ///         drop(rx);
661     ///     });
662     ///
663     ///     tx.closed().await;
664     ///     println!("the receiver dropped");
665     /// }
666     /// ```
667     ///
668     /// Paired with select
669     ///
670     /// ```
671     /// use tokio::sync::oneshot;
672     /// use tokio::time::{self, Duration};
673     ///
674     /// async fn compute() -> String {
675     ///     // Complex computation returning a `String`
676     /// # "hello".to_string()
677     /// }
678     ///
679     /// #[tokio::main]
680     /// async fn main() {
681     ///     let (mut tx, rx) = oneshot::channel();
682     ///
683     ///     tokio::spawn(async move {
684     ///         tokio::select! {
685     ///             _ = tx.closed() => {
686     ///                 // The receiver dropped, no need to do any further work
687     ///             }
688     ///             value = compute() => {
689     ///                 // The send can fail if the channel was closed at the exact same
690     ///                 // time as when compute() finished, so just ignore the failure.
691     ///                 let _ = tx.send(value);
692     ///             }
693     ///         }
694     ///     });
695     ///
696     ///     // Wait for up to 10 seconds
697     ///     let _ = time::timeout(Duration::from_secs(10), rx).await;
698     /// }
699     /// ```
closed(&mut self)700     pub async fn closed(&mut self) {
701         use std::future::poll_fn;
702 
703         #[cfg(all(tokio_unstable, feature = "tracing"))]
704         let resource_span = self.resource_span.clone();
705         #[cfg(all(tokio_unstable, feature = "tracing"))]
706         let closed = trace::async_op(
707             || poll_fn(|cx| self.poll_closed(cx)),
708             resource_span,
709             "Sender::closed",
710             "poll_closed",
711             false,
712         );
713         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
714         let closed = poll_fn(|cx| self.poll_closed(cx));
715 
716         closed.await;
717     }
718 
719     /// Returns `true` if the associated [`Receiver`] handle has been dropped.
720     ///
721     /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
722     /// [`Receiver`] value is dropped.
723     ///
724     /// If `true` is returned, a call to `send` will always result in an error.
725     ///
726     /// [`Receiver`]: Receiver
727     /// [`close`]: Receiver::close
728     ///
729     /// # Examples
730     ///
731     /// ```
732     /// use tokio::sync::oneshot;
733     ///
734     /// #[tokio::main]
735     /// async fn main() {
736     ///     let (tx, rx) = oneshot::channel();
737     ///
738     ///     assert!(!tx.is_closed());
739     ///
740     ///     drop(rx);
741     ///
742     ///     assert!(tx.is_closed());
743     ///     assert!(tx.send("never received").is_err());
744     /// }
745     /// ```
is_closed(&self) -> bool746     pub fn is_closed(&self) -> bool {
747         let inner = self.inner.as_ref().unwrap();
748 
749         let state = State::load(&inner.state, Acquire);
750         state.is_closed()
751     }
752 
753     /// Checks whether the `oneshot` channel has been closed, and if not, schedules the
754     /// `Waker` in the provided `Context` to receive a notification when the channel is
755     /// closed.
756     ///
757     /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
758     /// [`Receiver`] value is dropped.
759     ///
760     /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
761     /// to the most recent call will be scheduled to receive a wakeup.
762     ///
763     /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
764     /// [`close`]: fn@crate::sync::oneshot::Receiver::close
765     ///
766     /// # Return value
767     ///
768     /// This function returns:
769     ///
770     ///  * `Poll::Pending` if the channel is still open.
771     ///  * `Poll::Ready(())` if the channel is closed.
772     ///
773     /// # Examples
774     ///
775     /// ```
776     /// use tokio::sync::oneshot;
777     ///
778     /// use std::future::poll_fn;
779     ///
780     /// #[tokio::main]
781     /// async fn main() {
782     ///     let (mut tx, mut rx) = oneshot::channel::<()>();
783     ///
784     ///     tokio::spawn(async move {
785     ///         rx.close();
786     ///     });
787     ///
788     ///     poll_fn(|cx| tx.poll_closed(cx)).await;
789     ///
790     ///     println!("the receiver dropped");
791     /// }
792     /// ```
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>793     pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
794         ready!(crate::trace::trace_leaf(cx));
795 
796         // Keep track of task budget
797         let coop = ready!(crate::runtime::coop::poll_proceed(cx));
798 
799         let inner = self.inner.as_ref().unwrap();
800 
801         let mut state = State::load(&inner.state, Acquire);
802 
803         if state.is_closed() {
804             coop.made_progress();
805             return Ready(());
806         }
807 
808         if state.is_tx_task_set() {
809             let will_notify = unsafe { inner.tx_task.will_wake(cx) };
810 
811             if !will_notify {
812                 state = State::unset_tx_task(&inner.state);
813 
814                 if state.is_closed() {
815                     // Set the flag again so that the waker is released in drop
816                     State::set_tx_task(&inner.state);
817                     coop.made_progress();
818                     return Ready(());
819                 } else {
820                     unsafe { inner.tx_task.drop_task() };
821                 }
822             }
823         }
824 
825         if !state.is_tx_task_set() {
826             // Attempt to set the task
827             unsafe {
828                 inner.tx_task.set_task(cx);
829             }
830 
831             // Update the state
832             state = State::set_tx_task(&inner.state);
833 
834             if state.is_closed() {
835                 coop.made_progress();
836                 return Ready(());
837             }
838         }
839 
840         Pending
841     }
842 }
843 
844 impl<T> Drop for Sender<T> {
drop(&mut self)845     fn drop(&mut self) {
846         if let Some(inner) = self.inner.as_ref() {
847             inner.complete();
848             #[cfg(all(tokio_unstable, feature = "tracing"))]
849             self.resource_span.in_scope(|| {
850                 tracing::trace!(
851                 target: "runtime::resource::state_update",
852                 tx_dropped = true,
853                 tx_dropped.op = "override",
854                 )
855             });
856         }
857     }
858 }
859 
860 impl<T> Receiver<T> {
861     /// Prevents the associated [`Sender`] handle from sending a value.
862     ///
863     /// Any `send` operation which happens after calling `close` is guaranteed
864     /// to fail. After calling `close`, [`try_recv`] should be called to
865     /// receive a value if one was sent **before** the call to `close`
866     /// completed.
867     ///
868     /// This function is useful to perform a graceful shutdown and ensure that a
869     /// value will not be sent into the channel and never received.
870     ///
871     /// `close` is no-op if a message is already received or the channel
872     /// is already closed.
873     ///
874     /// [`Sender`]: Sender
875     /// [`try_recv`]: Receiver::try_recv
876     ///
877     /// # Examples
878     ///
879     /// Prevent a value from being sent
880     ///
881     /// ```
882     /// use tokio::sync::oneshot;
883     /// use tokio::sync::oneshot::error::TryRecvError;
884     ///
885     /// #[tokio::main]
886     /// async fn main() {
887     ///     let (tx, mut rx) = oneshot::channel();
888     ///
889     ///     assert!(!tx.is_closed());
890     ///
891     ///     rx.close();
892     ///
893     ///     assert!(tx.is_closed());
894     ///     assert!(tx.send("never received").is_err());
895     ///
896     ///     match rx.try_recv() {
897     ///         Err(TryRecvError::Closed) => {}
898     ///         _ => unreachable!(),
899     ///     }
900     /// }
901     /// ```
902     ///
903     /// Receive a value sent **before** calling `close`
904     ///
905     /// ```
906     /// use tokio::sync::oneshot;
907     ///
908     /// #[tokio::main]
909     /// async fn main() {
910     ///     let (tx, mut rx) = oneshot::channel();
911     ///
912     ///     assert!(tx.send("will receive").is_ok());
913     ///
914     ///     rx.close();
915     ///
916     ///     let msg = rx.try_recv().unwrap();
917     ///     assert_eq!(msg, "will receive");
918     /// }
919     /// ```
close(&mut self)920     pub fn close(&mut self) {
921         if let Some(inner) = self.inner.as_ref() {
922             inner.close();
923             #[cfg(all(tokio_unstable, feature = "tracing"))]
924             self.resource_span.in_scope(|| {
925                 tracing::trace!(
926                 target: "runtime::resource::state_update",
927                 rx_dropped = true,
928                 rx_dropped.op = "override",
929                 )
930             });
931         }
932     }
933 
934     /// Attempts to receive a value.
935     ///
936     /// If a pending value exists in the channel, it is returned. If no value
937     /// has been sent, the current task **will not** be registered for
938     /// future notification.
939     ///
940     /// This function is useful to call from outside the context of an
941     /// asynchronous task.
942     ///
943     /// Note that unlike the `poll` method, the `try_recv` method cannot fail
944     /// spuriously. Any send or close event that happens before this call to
945     /// `try_recv` will be correctly returned to the caller.
946     ///
947     /// # Return
948     ///
949     /// - `Ok(T)` if a value is pending in the channel.
950     /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
951     /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
952     ///   a value, or if the message has already been received.
953     ///
954     /// # Examples
955     ///
956     /// `try_recv` before a value is sent, then after.
957     ///
958     /// ```
959     /// use tokio::sync::oneshot;
960     /// use tokio::sync::oneshot::error::TryRecvError;
961     ///
962     /// #[tokio::main]
963     /// async fn main() {
964     ///     let (tx, mut rx) = oneshot::channel();
965     ///
966     ///     match rx.try_recv() {
967     ///         // The channel is currently empty
968     ///         Err(TryRecvError::Empty) => {}
969     ///         _ => unreachable!(),
970     ///     }
971     ///
972     ///     // Send a value
973     ///     tx.send("hello").unwrap();
974     ///
975     ///     match rx.try_recv() {
976     ///         Ok(value) => assert_eq!(value, "hello"),
977     ///         _ => unreachable!(),
978     ///     }
979     /// }
980     /// ```
981     ///
982     /// `try_recv` when the sender dropped before sending a value
983     ///
984     /// ```
985     /// use tokio::sync::oneshot;
986     /// use tokio::sync::oneshot::error::TryRecvError;
987     ///
988     /// #[tokio::main]
989     /// async fn main() {
990     ///     let (tx, mut rx) = oneshot::channel::<()>();
991     ///
992     ///     drop(tx);
993     ///
994     ///     match rx.try_recv() {
995     ///         // The channel will never receive a value.
996     ///         Err(TryRecvError::Closed) => {}
997     ///         _ => unreachable!(),
998     ///     }
999     /// }
1000     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>1001     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1002         let result = if let Some(inner) = self.inner.as_ref() {
1003             let state = State::load(&inner.state, Acquire);
1004 
1005             if state.is_complete() {
1006                 // SAFETY: If `state.is_complete()` returns true, then the
1007                 // `VALUE_SENT` bit has been set and the sender side of the
1008                 // channel will no longer attempt to access the inner
1009                 // `UnsafeCell`. Therefore, it is now safe for us to access the
1010                 // cell.
1011                 match unsafe { inner.consume_value() } {
1012                     Some(value) => {
1013                         #[cfg(all(tokio_unstable, feature = "tracing"))]
1014                         self.resource_span.in_scope(|| {
1015                             tracing::trace!(
1016                             target: "runtime::resource::state_update",
1017                             value_received = true,
1018                             value_received.op = "override",
1019                             )
1020                         });
1021                         Ok(value)
1022                     }
1023                     None => Err(TryRecvError::Closed),
1024                 }
1025             } else if state.is_closed() {
1026                 Err(TryRecvError::Closed)
1027             } else {
1028                 // Not ready, this does not clear `inner`
1029                 return Err(TryRecvError::Empty);
1030             }
1031         } else {
1032             Err(TryRecvError::Closed)
1033         };
1034 
1035         self.inner = None;
1036         result
1037     }
1038 
1039     /// Blocking receive to call outside of asynchronous contexts.
1040     ///
1041     /// # Panics
1042     ///
1043     /// This function panics if called within an asynchronous execution
1044     /// context.
1045     ///
1046     /// # Examples
1047     ///
1048     /// ```
1049     /// use std::thread;
1050     /// use tokio::sync::oneshot;
1051     ///
1052     /// #[tokio::main]
1053     /// async fn main() {
1054     ///     let (tx, rx) = oneshot::channel::<u8>();
1055     ///
1056     ///     let sync_code = thread::spawn(move || {
1057     ///         assert_eq!(Ok(10), rx.blocking_recv());
1058     ///     });
1059     ///
1060     ///     let _ = tx.send(10);
1061     ///     sync_code.join().unwrap();
1062     /// }
1063     /// ```
1064     #[track_caller]
1065     #[cfg(feature = "sync")]
1066     #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
blocking_recv(self) -> Result<T, RecvError>1067     pub fn blocking_recv(self) -> Result<T, RecvError> {
1068         crate::future::block_on(self)
1069     }
1070 }
1071 
1072 impl<T> Drop for Receiver<T> {
drop(&mut self)1073     fn drop(&mut self) {
1074         if let Some(inner) = self.inner.as_ref() {
1075             let state = inner.close();
1076 
1077             if state.is_complete() {
1078                 // SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
1079                 // so only the receiver can access the value.
1080                 drop(unsafe { inner.consume_value() });
1081             }
1082 
1083             #[cfg(all(tokio_unstable, feature = "tracing"))]
1084             self.resource_span.in_scope(|| {
1085                 tracing::trace!(
1086                 target: "runtime::resource::state_update",
1087                 rx_dropped = true,
1088                 rx_dropped.op = "override",
1089                 )
1090             });
1091         }
1092     }
1093 }
1094 
1095 impl<T> Future for Receiver<T> {
1096     type Output = Result<T, RecvError>;
1097 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1098     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1099         // If `inner` is `None`, then `poll()` has already completed.
1100         #[cfg(all(tokio_unstable, feature = "tracing"))]
1101         let _res_span = self.resource_span.clone().entered();
1102         #[cfg(all(tokio_unstable, feature = "tracing"))]
1103         let _ao_span = self.async_op_span.clone().entered();
1104         #[cfg(all(tokio_unstable, feature = "tracing"))]
1105         let _ao_poll_span = self.async_op_poll_span.clone().entered();
1106 
1107         let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1108             #[cfg(all(tokio_unstable, feature = "tracing"))]
1109             let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;
1110 
1111             #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1112             let res = ready!(inner.poll_recv(cx))?;
1113 
1114             res
1115         } else {
1116             panic!("called after complete");
1117         };
1118 
1119         self.inner = None;
1120         Ready(Ok(ret))
1121     }
1122 }
1123 
1124 impl<T> Inner<T> {
complete(&self) -> bool1125     fn complete(&self) -> bool {
1126         let prev = State::set_complete(&self.state);
1127 
1128         if prev.is_closed() {
1129             return false;
1130         }
1131 
1132         if prev.is_rx_task_set() {
1133             // TODO: Consume waker?
1134             unsafe {
1135                 self.rx_task.with_task(Waker::wake_by_ref);
1136             }
1137         }
1138 
1139         true
1140     }
1141 
poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>1142     fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1143         ready!(crate::trace::trace_leaf(cx));
1144         // Keep track of task budget
1145         let coop = ready!(crate::runtime::coop::poll_proceed(cx));
1146 
1147         // Load the state
1148         let mut state = State::load(&self.state, Acquire);
1149 
1150         if state.is_complete() {
1151             coop.made_progress();
1152             match unsafe { self.consume_value() } {
1153                 Some(value) => Ready(Ok(value)),
1154                 None => Ready(Err(RecvError(()))),
1155             }
1156         } else if state.is_closed() {
1157             coop.made_progress();
1158             Ready(Err(RecvError(())))
1159         } else {
1160             if state.is_rx_task_set() {
1161                 let will_notify = unsafe { self.rx_task.will_wake(cx) };
1162 
1163                 // Check if the task is still the same
1164                 if !will_notify {
1165                     // Unset the task
1166                     state = State::unset_rx_task(&self.state);
1167                     if state.is_complete() {
1168                         // Set the flag again so that the waker is released in drop
1169                         State::set_rx_task(&self.state);
1170 
1171                         coop.made_progress();
1172                         // SAFETY: If `state.is_complete()` returns true, then the
1173                         // `VALUE_SENT` bit has been set and the sender side of the
1174                         // channel will no longer attempt to access the inner
1175                         // `UnsafeCell`. Therefore, it is now safe for us to access the
1176                         // cell.
1177                         return match unsafe { self.consume_value() } {
1178                             Some(value) => Ready(Ok(value)),
1179                             None => Ready(Err(RecvError(()))),
1180                         };
1181                     } else {
1182                         unsafe { self.rx_task.drop_task() };
1183                     }
1184                 }
1185             }
1186 
1187             if !state.is_rx_task_set() {
1188                 // Attempt to set the task
1189                 unsafe {
1190                     self.rx_task.set_task(cx);
1191                 }
1192 
1193                 // Update the state
1194                 state = State::set_rx_task(&self.state);
1195 
1196                 if state.is_complete() {
1197                     coop.made_progress();
1198                     match unsafe { self.consume_value() } {
1199                         Some(value) => Ready(Ok(value)),
1200                         None => Ready(Err(RecvError(()))),
1201                     }
1202                 } else {
1203                     Pending
1204                 }
1205             } else {
1206                 Pending
1207             }
1208         }
1209     }
1210 
1211     /// Called by `Receiver` to indicate that the value will never be received.
close(&self) -> State1212     fn close(&self) -> State {
1213         let prev = State::set_closed(&self.state);
1214 
1215         if prev.is_tx_task_set() && !prev.is_complete() {
1216             unsafe {
1217                 self.tx_task.with_task(Waker::wake_by_ref);
1218             }
1219         }
1220 
1221         prev
1222     }
1223 
1224     /// Consumes the value. This function does not check `state`.
1225     ///
1226     /// # Safety
1227     ///
1228     /// Calling this method concurrently on multiple threads will result in a
1229     /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1230     /// sender *or* the receiver will call this method at a given point in time.
1231     /// If `VALUE_SENT` is not set, then only the sender may call this method;
1232     /// if it is set, then only the receiver may call this method.
consume_value(&self) -> Option<T>1233     unsafe fn consume_value(&self) -> Option<T> {
1234         self.value.with_mut(|ptr| (*ptr).take())
1235     }
1236 }
1237 
1238 unsafe impl<T: Send> Send for Inner<T> {}
1239 unsafe impl<T: Send> Sync for Inner<T> {}
1240 
mut_load(this: &mut AtomicUsize) -> usize1241 fn mut_load(this: &mut AtomicUsize) -> usize {
1242     this.with_mut(|v| *v)
1243 }
1244 
1245 impl<T> Drop for Inner<T> {
drop(&mut self)1246     fn drop(&mut self) {
1247         let state = State(mut_load(&mut self.state));
1248 
1249         if state.is_rx_task_set() {
1250             unsafe {
1251                 self.rx_task.drop_task();
1252             }
1253         }
1254 
1255         if state.is_tx_task_set() {
1256             unsafe {
1257                 self.tx_task.drop_task();
1258             }
1259         }
1260 
1261         // SAFETY: we have `&mut self`, and therefore we have
1262         // exclusive access to the value.
1263         unsafe {
1264             // Note: the assertion holds because if the value has been sent by sender,
1265             // we must ensure that the value must have been consumed by the receiver before
1266             // dropping the `Inner`.
1267             debug_assert!(self.consume_value().is_none());
1268         }
1269     }
1270 }
1271 
1272 impl<T: fmt::Debug> fmt::Debug for Inner<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1273     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1274         use std::sync::atomic::Ordering::Relaxed;
1275 
1276         fmt.debug_struct("Inner")
1277             .field("state", &State::load(&self.state, Relaxed))
1278             .finish()
1279     }
1280 }
1281 
1282 /// Indicates that a waker for the receiving task has been set.
1283 ///
1284 /// # Safety
1285 ///
1286 /// If this bit is not set, the `rx_task` field may be uninitialized.
1287 const RX_TASK_SET: usize = 0b00001;
1288 /// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1289 ///
1290 /// # Safety
1291 ///
1292 /// This bit controls which side of the channel is permitted to access the
1293 /// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1294 /// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1295 /// the sender.
1296 const VALUE_SENT: usize = 0b00010;
1297 const CLOSED: usize = 0b00100;
1298 
1299 /// Indicates that a waker for the sending task has been set.
1300 ///
1301 /// # Safety
1302 ///
1303 /// If this bit is not set, the `tx_task` field may be uninitialized.
1304 const TX_TASK_SET: usize = 0b01000;
1305 
1306 impl State {
new() -> State1307     fn new() -> State {
1308         State(0)
1309     }
1310 
is_complete(self) -> bool1311     fn is_complete(self) -> bool {
1312         self.0 & VALUE_SENT == VALUE_SENT
1313     }
1314 
set_complete(cell: &AtomicUsize) -> State1315     fn set_complete(cell: &AtomicUsize) -> State {
1316         // This method is a compare-and-swap loop rather than a fetch-or like
1317         // other `set_$WHATEVER` methods on `State`. This is because we must
1318         // check if the state has been closed before setting the `VALUE_SENT`
1319         // bit.
1320         //
1321         // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1322         // bit is already set, because `VALUE_SENT` will tell the receiver that
1323         // it's okay to access the inner `UnsafeCell`. Immediately after calling
1324         // `set_complete`, if the channel was closed, the sender will _also_
1325         // access the `UnsafeCell` to take the value back out, so if a
1326         // `poll_recv` or `try_recv` call is occurring concurrently, both
1327         // threads may try to access the `UnsafeCell` if we were to set the
1328         // `VALUE_SENT` bit on a closed channel.
1329         let mut state = cell.load(Ordering::Relaxed);
1330         loop {
1331             if State(state).is_closed() {
1332                 break;
1333             }
1334             // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1335             // the `RX_TASK_SET` flag is set. However, `loom` does not support
1336             // fences yet.
1337             match cell.compare_exchange_weak(
1338                 state,
1339                 state | VALUE_SENT,
1340                 Ordering::AcqRel,
1341                 Ordering::Acquire,
1342             ) {
1343                 Ok(_) => break,
1344                 Err(actual) => state = actual,
1345             }
1346         }
1347         State(state)
1348     }
1349 
is_rx_task_set(self) -> bool1350     fn is_rx_task_set(self) -> bool {
1351         self.0 & RX_TASK_SET == RX_TASK_SET
1352     }
1353 
set_rx_task(cell: &AtomicUsize) -> State1354     fn set_rx_task(cell: &AtomicUsize) -> State {
1355         let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1356         State(val | RX_TASK_SET)
1357     }
1358 
unset_rx_task(cell: &AtomicUsize) -> State1359     fn unset_rx_task(cell: &AtomicUsize) -> State {
1360         let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1361         State(val & !RX_TASK_SET)
1362     }
1363 
is_closed(self) -> bool1364     fn is_closed(self) -> bool {
1365         self.0 & CLOSED == CLOSED
1366     }
1367 
set_closed(cell: &AtomicUsize) -> State1368     fn set_closed(cell: &AtomicUsize) -> State {
1369         // Acquire because we want all later writes (attempting to poll) to be
1370         // ordered after this.
1371         let val = cell.fetch_or(CLOSED, Acquire);
1372         State(val)
1373     }
1374 
set_tx_task(cell: &AtomicUsize) -> State1375     fn set_tx_task(cell: &AtomicUsize) -> State {
1376         let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1377         State(val | TX_TASK_SET)
1378     }
1379 
unset_tx_task(cell: &AtomicUsize) -> State1380     fn unset_tx_task(cell: &AtomicUsize) -> State {
1381         let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1382         State(val & !TX_TASK_SET)
1383     }
1384 
is_tx_task_set(self) -> bool1385     fn is_tx_task_set(self) -> bool {
1386         self.0 & TX_TASK_SET == TX_TASK_SET
1387     }
1388 
as_usize(self) -> usize1389     fn as_usize(self) -> usize {
1390         self.0
1391     }
1392 
load(cell: &AtomicUsize, order: Ordering) -> State1393     fn load(cell: &AtomicUsize, order: Ordering) -> State {
1394         let val = cell.load(order);
1395         State(val)
1396     }
1397 }
1398 
1399 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1400     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1401         fmt.debug_struct("State")
1402             .field("is_complete", &self.is_complete())
1403             .field("is_closed", &self.is_closed())
1404             .field("is_rx_task_set", &self.is_rx_task_set())
1405             .field("is_tx_task_set", &self.is_tx_task_set())
1406             .finish()
1407     }
1408 }
1409