1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2 
3 //! A multi-producer, multi-consumer channel that only retains the *last* sent
4 //! value.
5 //!
6 //! This channel is useful for watching for changes to a value from multiple
7 //! points in the code base, for example, changes to configuration values.
8 //!
9 //! # Usage
10 //!
11 //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12 //! and consumer halves of the channel. The channel is created with an initial
13 //! value.
14 //!
15 //! Each [`Receiver`] independently tracks the last value *seen* by its caller.
16 //!
17 //! To access the **current** value stored in the channel and mark it as *seen*
18 //! by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
19 //!
20 //! To access the current value **without** marking it as *seen*, use
21 //! [`Receiver::borrow()`]. (If the value has already been marked *seen*,
22 //! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
23 //!
24 //! For more information on when to use these methods, see
25 //! [here](#borrow_and_update-versus-borrow).
26 //!
27 //! ## Change notifications
28 //!
29 //! The [`Receiver`] half provides an asynchronous [`changed`] method. This
30 //! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
31 //!
32 //! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
33 //!   `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped.
34 //! * If the current value is *unseen* when calling [`changed`], then
35 //!   [`changed`] will return immediately. If the current value is *seen*, then
36 //!   it will sleep until either a new message is sent via the [`Sender`] half,
37 //!   or the [`Sender`] is dropped.
38 //! * On completion, the [`changed`] method marks the new value as *seen*.
39 //! * At creation, the initial value is considered *seen*. In other words,
40 //!   [`Receiver::changed()`] will not return until a subsequent value is sent.
41 //! * New [`Receiver`] instances can be created with [`Sender::subscribe()`].
42 //!   The current value at the time the [`Receiver`] is created is considered
43 //!   *seen*.
44 //!
45 //! ## `borrow_and_update` versus `borrow`
46 //!
47 //! If the receiver intends to await notifications from [`changed`] in a loop,
48 //! [`Receiver::borrow_and_update()`] should be preferred over
49 //! [`Receiver::borrow()`].  This avoids a potential race where a new value is
50 //! sent between [`changed`] being ready and the value being read. (If
51 //! [`Receiver::borrow()`] is used, the loop may run twice with the same value.)
52 //!
53 //! If the receiver is only interested in the current value, and does not intend
54 //! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
55 //! convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
56 //! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
57 //! self`.
58 //!
59 //! # Examples
60 //!
61 //! The following example prints `hello! world! `.
62 //!
63 //! ```
64 //! use tokio::sync::watch;
65 //! use tokio::time::{Duration, sleep};
66 //!
67 //! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
68 //! let (tx, mut rx) = watch::channel("hello");
69 //!
70 //! tokio::spawn(async move {
71 //!     // Use the equivalent of a "do-while" loop so the initial value is
72 //!     // processed before awaiting the `changed()` future.
73 //!     loop {
74 //!         println!("{}! ", *rx.borrow_and_update());
75 //!         if rx.changed().await.is_err() {
76 //!             break;
77 //!         }
78 //!     }
79 //! });
80 //!
81 //! sleep(Duration::from_millis(100)).await;
82 //! tx.send("world")?;
83 //! # Ok(())
84 //! # }
85 //! ```
86 //!
87 //! # Closing
88 //!
89 //! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
90 //! when all [`Receiver`] handles have been dropped. This indicates that there
91 //! is no further interest in the values being produced and work can be stopped.
92 //!
93 //! The value in the channel will not be dropped until the sender and all
94 //! receivers have been dropped.
95 //!
96 //! # Thread safety
97 //!
98 //! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
99 //! threads and can be used in a concurrent environment. Clones of [`Receiver`]
100 //! handles may be moved to separate threads and also used concurrently.
101 //!
102 //! [`Sender`]: crate::sync::watch::Sender
103 //! [`Receiver`]: crate::sync::watch::Receiver
104 //! [`changed`]: crate::sync::watch::Receiver::changed
105 //! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
106 //! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
107 //! [`Receiver::borrow_and_update()`]:
108 //!     crate::sync::watch::Receiver::borrow_and_update
109 //! [`channel`]: crate::sync::watch::channel
110 //! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
111 //! [`Sender::closed`]: crate::sync::watch::Sender::closed
112 //! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
113 
114 use crate::runtime::coop::cooperative;
115 use crate::sync::notify::Notify;
116 
117 use crate::loom::sync::atomic::AtomicUsize;
118 use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed};
119 use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
120 use std::fmt;
121 use std::mem;
122 use std::ops;
123 use std::panic;
124 
125 /// Receives values from the associated [`Sender`](struct@Sender).
126 ///
127 /// Instances are created by the [`channel`](fn@channel) function.
128 ///
129 /// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
130 /// wrapper.
131 ///
132 /// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
133 #[derive(Debug)]
134 pub struct Receiver<T> {
135     /// Pointer to the shared state
136     shared: Arc<Shared<T>>,
137 
138     /// Last observed version
139     version: Version,
140 }
141 
142 /// Sends values to the associated [`Receiver`](struct@Receiver).
143 ///
144 /// Instances are created by the [`channel`](fn@channel) function.
145 #[derive(Debug)]
146 pub struct Sender<T> {
147     shared: Arc<Shared<T>>,
148 }
149 
150 impl<T> Clone for Sender<T> {
clone(&self) -> Self151     fn clone(&self) -> Self {
152         self.shared.ref_count_tx.fetch_add(1, Relaxed);
153 
154         Self {
155             shared: self.shared.clone(),
156         }
157     }
158 }
159 
160 impl<T: Default> Default for Sender<T> {
default() -> Self161     fn default() -> Self {
162         Self::new(T::default())
163     }
164 }
165 
166 /// Returns a reference to the inner value.
167 ///
168 /// Outstanding borrows hold a read lock on the inner value. This means that
169 /// long-lived borrows could cause the producer half to block. It is recommended
170 /// to keep the borrow as short-lived as possible. Additionally, if you are
171 /// running in an environment that allows `!Send` futures, you must ensure that
172 /// the returned `Ref` type is never held alive across an `.await` point,
173 /// otherwise, it can lead to a deadlock.
174 ///
175 /// The priority policy of the lock is dependent on the underlying lock
176 /// implementation, and this type does not guarantee that any particular policy
177 /// will be used. In particular, a producer which is waiting to acquire the lock
178 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
179 ///
180 /// <details><summary>Potential deadlock example</summary>
181 ///
182 /// ```text
183 /// // Task 1 (on thread A)    |  // Task 2 (on thread B)
184 /// let _ref1 = rx.borrow();   |
185 ///                            |  // will block
186 ///                            |  let _ = tx.send(());
187 /// // may deadlock            |
188 /// let _ref2 = rx.borrow();   |
189 /// ```
190 /// </details>
191 #[derive(Debug)]
192 pub struct Ref<'a, T> {
193     inner: RwLockReadGuard<'a, T>,
194     has_changed: bool,
195 }
196 
197 impl<'a, T> Ref<'a, T> {
198     /// Indicates if the borrowed value is considered as _changed_ since the last
199     /// time it has been marked as seen.
200     ///
201     /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
202     ///
203     /// When borrowed from the [`Sender`] this function will always return `false`.
204     ///
205     /// # Examples
206     ///
207     /// ```
208     /// use tokio::sync::watch;
209     ///
210     /// #[tokio::main]
211     /// async fn main() {
212     ///     let (tx, mut rx) = watch::channel("hello");
213     ///
214     ///     tx.send("goodbye").unwrap();
215     ///     // The sender does never consider the value as changed.
216     ///     assert!(!tx.borrow().has_changed());
217     ///
218     ///     // Drop the sender immediately, just for testing purposes.
219     ///     drop(tx);
220     ///
221     ///     // Even if the sender has already been dropped...
222     ///     assert!(rx.has_changed().is_err());
223     ///     // ...the modified value is still readable and detected as changed.
224     ///     assert_eq!(*rx.borrow(), "goodbye");
225     ///     assert!(rx.borrow().has_changed());
226     ///
227     ///     // Read the changed value and mark it as seen.
228     ///     {
229     ///         let received = rx.borrow_and_update();
230     ///         assert_eq!(*received, "goodbye");
231     ///         assert!(received.has_changed());
232     ///         // Release the read lock when leaving this scope.
233     ///     }
234     ///
235     ///     // Now the value has already been marked as seen and could
236     ///     // never be modified again (after the sender has been dropped).
237     ///     assert!(!rx.borrow().has_changed());
238     /// }
239     /// ```
has_changed(&self) -> bool240     pub fn has_changed(&self) -> bool {
241         self.has_changed
242     }
243 }
244 
245 struct Shared<T> {
246     /// The most recent value.
247     value: RwLock<T>,
248 
249     /// The current version.
250     ///
251     /// The lowest bit represents a "closed" state. The rest of the bits
252     /// represent the current version.
253     state: AtomicState,
254 
255     /// Tracks the number of `Receiver` instances.
256     ref_count_rx: AtomicUsize,
257 
258     /// Tracks the number of `Sender` instances.
259     ref_count_tx: AtomicUsize,
260 
261     /// Notifies waiting receivers that the value changed.
262     notify_rx: big_notify::BigNotify,
263 
264     /// Notifies any task listening for `Receiver` dropped events.
265     notify_tx: Notify,
266 }
267 
268 impl<T: fmt::Debug> fmt::Debug for Shared<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result269     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270         let state = self.state.load();
271         f.debug_struct("Shared")
272             .field("value", &self.value)
273             .field("version", &state.version())
274             .field("is_closed", &state.is_closed())
275             .field("ref_count_rx", &self.ref_count_rx)
276             .finish()
277     }
278 }
279 
280 pub mod error {
281     //! Watch error types.
282 
283     use std::error::Error;
284     use std::fmt;
285 
286     /// Error produced when sending a value fails.
287     #[derive(PartialEq, Eq, Clone, Copy)]
288     pub struct SendError<T>(pub T);
289 
290     // ===== impl SendError =====
291 
292     impl<T> fmt::Debug for SendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result293         fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294             f.debug_struct("SendError").finish_non_exhaustive()
295         }
296     }
297 
298     impl<T> fmt::Display for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result299         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
300             write!(fmt, "channel closed")
301         }
302     }
303 
304     impl<T> Error for SendError<T> {}
305 
306     /// Error produced when receiving a change notification.
307     #[derive(Debug, Clone)]
308     pub struct RecvError(pub(super) ());
309 
310     // ===== impl RecvError =====
311 
312     impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result313         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
314             write!(fmt, "channel closed")
315         }
316     }
317 
318     impl Error for RecvError {}
319 }
320 
321 mod big_notify {
322     use super::Notify;
323     use crate::sync::notify::Notified;
324 
325     // To avoid contention on the lock inside the `Notify`, we store multiple
326     // copies of it. Then, we use either circular access or randomness to spread
327     // out threads over different `Notify` objects.
328     //
329     // Some simple benchmarks show that randomness performs slightly better than
330     // circular access (probably due to contention on `next`), so we prefer to
331     // use randomness when Tokio is compiled with a random number generator.
332     //
333     // When the random number generator is not available, we fall back to
334     // circular access.
335 
336     pub(super) struct BigNotify {
337         #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
338         next: std::sync::atomic::AtomicUsize,
339         inner: [Notify; 8],
340     }
341 
342     impl BigNotify {
new() -> Self343         pub(super) fn new() -> Self {
344             Self {
345                 #[cfg(not(all(
346                     not(loom),
347                     feature = "sync",
348                     any(feature = "rt", feature = "macros")
349                 )))]
350                 next: std::sync::atomic::AtomicUsize::new(0),
351                 inner: Default::default(),
352             }
353         }
354 
notify_waiters(&self)355         pub(super) fn notify_waiters(&self) {
356             for notify in &self.inner {
357                 notify.notify_waiters();
358             }
359         }
360 
361         /// This function implements the case where randomness is not available.
362         #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
notified(&self) -> Notified<'_>363         pub(super) fn notified(&self) -> Notified<'_> {
364             let i = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % 8;
365             self.inner[i].notified()
366         }
367 
368         /// This function implements the case where randomness is available.
369         #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
notified(&self) -> Notified<'_>370         pub(super) fn notified(&self) -> Notified<'_> {
371             let i = crate::runtime::context::thread_rng_n(8) as usize;
372             self.inner[i].notified()
373         }
374     }
375 }
376 
377 use self::state::{AtomicState, Version};
378 mod state {
379     use crate::loom::sync::atomic::AtomicUsize;
380     use crate::loom::sync::atomic::Ordering;
381 
382     const CLOSED_BIT: usize = 1;
383 
384     // Using 2 as the step size preserves the `CLOSED_BIT`.
385     const STEP_SIZE: usize = 2;
386 
387     /// The version part of the state. The lowest bit is always zero.
388     #[derive(Copy, Clone, Debug, Eq, PartialEq)]
389     pub(super) struct Version(usize);
390 
391     /// Snapshot of the state. The first bit is used as the CLOSED bit.
392     /// The remaining bits are used as the version.
393     ///
394     /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
395     /// receivers does not set it.
396     #[derive(Copy, Clone, Debug)]
397     pub(super) struct StateSnapshot(usize);
398 
399     /// The state stored in an atomic integer.
400     ///
401     /// The `Sender` uses `Release` ordering for storing a new state
402     /// and the `Receiver`s use `Acquire` ordering for loading the
403     /// current state. This ensures that written values are seen by
404     /// the `Receiver`s for a proper handover.
405     #[derive(Debug)]
406     pub(super) struct AtomicState(AtomicUsize);
407 
408     impl Version {
409         /// Decrements the version.
decrement(&mut self)410         pub(super) fn decrement(&mut self) {
411             // Using a wrapping decrement here is required to ensure that the
412             // operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()`
413             // which wraps on overflow.
414             self.0 = self.0.wrapping_sub(STEP_SIZE);
415         }
416 
417         pub(super) const INITIAL: Self = Version(0);
418     }
419 
420     impl StateSnapshot {
421         /// Extract the version from the state.
version(self) -> Version422         pub(super) fn version(self) -> Version {
423             Version(self.0 & !CLOSED_BIT)
424         }
425 
426         /// Is the closed bit set?
is_closed(self) -> bool427         pub(super) fn is_closed(self) -> bool {
428             (self.0 & CLOSED_BIT) == CLOSED_BIT
429         }
430     }
431 
432     impl AtomicState {
433         /// Create a new `AtomicState` that is not closed and which has the
434         /// version set to `Version::INITIAL`.
new() -> Self435         pub(super) fn new() -> Self {
436             AtomicState(AtomicUsize::new(Version::INITIAL.0))
437         }
438 
439         /// Load the current value of the state.
440         ///
441         /// Only used by the receiver and for debugging purposes.
442         ///
443         /// The receiver side (read-only) uses `Acquire` ordering for a proper handover
444         /// of the shared value with the sender side (single writer). The state is always
445         /// updated after modifying and before releasing the (exclusive) lock on the
446         /// shared value.
load(&self) -> StateSnapshot447         pub(super) fn load(&self) -> StateSnapshot {
448             StateSnapshot(self.0.load(Ordering::Acquire))
449         }
450 
451         /// Increment the version counter.
increment_version_while_locked(&self)452         pub(super) fn increment_version_while_locked(&self) {
453             // Use `Release` ordering to ensure that the shared value
454             // has been written before updating the version. The shared
455             // value is still protected by an exclusive lock during this
456             // method.
457             self.0.fetch_add(STEP_SIZE, Ordering::Release);
458         }
459 
460         /// Set the closed bit in the state.
set_closed(&self)461         pub(super) fn set_closed(&self) {
462             self.0.fetch_or(CLOSED_BIT, Ordering::Release);
463         }
464     }
465 }
466 
467 /// Creates a new watch channel, returning the "send" and "receive" handles.
468 ///
469 /// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
470 /// Only the last value sent is made available to the [`Receiver`] half. All
471 /// intermediate values are dropped.
472 ///
473 /// # Examples
474 ///
475 /// The following example prints `hello! world! `.
476 ///
477 /// ```
478 /// use tokio::sync::watch;
479 /// use tokio::time::{Duration, sleep};
480 ///
481 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
482 /// let (tx, mut rx) = watch::channel("hello");
483 ///
484 /// tokio::spawn(async move {
485 ///     // Use the equivalent of a "do-while" loop so the initial value is
486 ///     // processed before awaiting the `changed()` future.
487 ///     loop {
488 ///         println!("{}! ", *rx.borrow_and_update());
489 ///         if rx.changed().await.is_err() {
490 ///             break;
491 ///         }
492 ///     }
493 /// });
494 ///
495 /// sleep(Duration::from_millis(100)).await;
496 /// tx.send("world")?;
497 /// # Ok(())
498 /// # }
499 /// ```
500 ///
501 /// [`Sender`]: struct@Sender
502 /// [`Receiver`]: struct@Receiver
channel<T>(init: T) -> (Sender<T>, Receiver<T>)503 pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
504     let shared = Arc::new(Shared {
505         value: RwLock::new(init),
506         state: AtomicState::new(),
507         ref_count_rx: AtomicUsize::new(1),
508         ref_count_tx: AtomicUsize::new(1),
509         notify_rx: big_notify::BigNotify::new(),
510         notify_tx: Notify::new(),
511     });
512 
513     let tx = Sender {
514         shared: shared.clone(),
515     };
516 
517     let rx = Receiver {
518         shared,
519         version: Version::INITIAL,
520     };
521 
522     (tx, rx)
523 }
524 
525 impl<T> Receiver<T> {
from_shared(version: Version, shared: Arc<Shared<T>>) -> Self526     fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
527         // No synchronization necessary as this is only used as a counter and
528         // not memory access.
529         shared.ref_count_rx.fetch_add(1, Relaxed);
530 
531         Self { shared, version }
532     }
533 
534     /// Returns a reference to the most recently sent value.
535     ///
536     /// This method does not mark the returned value as seen, so future calls to
537     /// [`changed`] may return immediately even if you have already seen the
538     /// value with a call to `borrow`.
539     ///
540     /// Outstanding borrows hold a read lock on the inner value. This means that
541     /// long-lived borrows could cause the producer half to block. It is recommended
542     /// to keep the borrow as short-lived as possible. Additionally, if you are
543     /// running in an environment that allows `!Send` futures, you must ensure that
544     /// the returned `Ref` type is never held alive across an `.await` point,
545     /// otherwise, it can lead to a deadlock.
546     ///
547     /// The priority policy of the lock is dependent on the underlying lock
548     /// implementation, and this type does not guarantee that any particular policy
549     /// will be used. In particular, a producer which is waiting to acquire the lock
550     /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
551     ///
552     /// <details><summary>Potential deadlock example</summary>
553     ///
554     /// ```text
555     /// // Task 1 (on thread A)    |  // Task 2 (on thread B)
556     /// let _ref1 = rx.borrow();   |
557     ///                            |  // will block
558     ///                            |  let _ = tx.send(());
559     /// // may deadlock            |
560     /// let _ref2 = rx.borrow();   |
561     /// ```
562     /// </details>
563     ///
564     /// For more information on when to use this method versus
565     /// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow).
566     ///
567     /// [`changed`]: Receiver::changed
568     /// [`borrow_and_update`]: Receiver::borrow_and_update
569     ///
570     /// # Examples
571     ///
572     /// ```
573     /// use tokio::sync::watch;
574     ///
575     /// let (_, rx) = watch::channel("hello");
576     /// assert_eq!(*rx.borrow(), "hello");
577     /// ```
borrow(&self) -> Ref<'_, T>578     pub fn borrow(&self) -> Ref<'_, T> {
579         let inner = self.shared.value.read();
580 
581         // After obtaining a read-lock no concurrent writes could occur
582         // and the loaded version matches that of the borrowed reference.
583         let new_version = self.shared.state.load().version();
584         let has_changed = self.version != new_version;
585 
586         Ref { inner, has_changed }
587     }
588 
589     /// Returns a reference to the most recently sent value and marks that value
590     /// as seen.
591     ///
592     /// This method marks the current value as seen. Subsequent calls to [`changed`]
593     /// will not return immediately until the [`Sender`] has modified the shared
594     /// value again.
595     ///
596     /// Outstanding borrows hold a read lock on the inner value. This means that
597     /// long-lived borrows could cause the producer half to block. It is recommended
598     /// to keep the borrow as short-lived as possible. Additionally, if you are
599     /// running in an environment that allows `!Send` futures, you must ensure that
600     /// the returned `Ref` type is never held alive across an `.await` point,
601     /// otherwise, it can lead to a deadlock.
602     ///
603     /// The priority policy of the lock is dependent on the underlying lock
604     /// implementation, and this type does not guarantee that any particular policy
605     /// will be used. In particular, a producer which is waiting to acquire the lock
606     /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
607     ///
608     /// <details><summary>Potential deadlock example</summary>
609     ///
610     /// ```text
611     /// // Task 1 (on thread A)                |  // Task 2 (on thread B)
612     /// let _ref1 = rx1.borrow_and_update();   |
613     ///                                        |  // will block
614     ///                                        |  let _ = tx.send(());
615     /// // may deadlock                        |
616     /// let _ref2 = rx2.borrow_and_update();   |
617     /// ```
618     /// </details>
619     ///
620     /// For more information on when to use this method versus [`borrow`], see
621     /// [here](self#borrow_and_update-versus-borrow).
622     ///
623     /// [`changed`]: Receiver::changed
624     /// [`borrow`]: Receiver::borrow
borrow_and_update(&mut self) -> Ref<'_, T>625     pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
626         let inner = self.shared.value.read();
627 
628         // After obtaining a read-lock no concurrent writes could occur
629         // and the loaded version matches that of the borrowed reference.
630         let new_version = self.shared.state.load().version();
631         let has_changed = self.version != new_version;
632 
633         // Mark the shared value as seen by updating the version
634         self.version = new_version;
635 
636         Ref { inner, has_changed }
637     }
638 
639     /// Checks if this channel contains a message that this receiver has not yet
640     /// seen. The new value is not marked as seen.
641     ///
642     /// Although this method is called `has_changed`, it does not check new
643     /// messages for equality, so this call will return true even if the new
644     /// message is equal to the old message.
645     ///
646     /// Returns an error if the channel has been closed.
647     /// # Examples
648     ///
649     /// ```
650     /// use tokio::sync::watch;
651     ///
652     /// #[tokio::main]
653     /// async fn main() {
654     ///     let (tx, mut rx) = watch::channel("hello");
655     ///
656     ///     tx.send("goodbye").unwrap();
657     ///
658     ///     assert!(rx.has_changed().unwrap());
659     ///     assert_eq!(*rx.borrow_and_update(), "goodbye");
660     ///
661     ///     // The value has been marked as seen
662     ///     assert!(!rx.has_changed().unwrap());
663     ///
664     ///     drop(tx);
665     ///     // The `tx` handle has been dropped
666     ///     assert!(rx.has_changed().is_err());
667     /// }
668     /// ```
has_changed(&self) -> Result<bool, error::RecvError>669     pub fn has_changed(&self) -> Result<bool, error::RecvError> {
670         // Load the version from the state
671         let state = self.shared.state.load();
672         if state.is_closed() {
673             // The sender has dropped.
674             return Err(error::RecvError(()));
675         }
676         let new_version = state.version();
677 
678         Ok(self.version != new_version)
679     }
680 
681     /// Marks the state as changed.
682     ///
683     /// After invoking this method [`has_changed()`](Self::has_changed)
684     /// returns `true` and [`changed()`](Self::changed) returns
685     /// immediately, regardless of whether a new value has been sent.
686     ///
687     /// This is useful for triggering an initial change notification after
688     /// subscribing to synchronize new receivers.
mark_changed(&mut self)689     pub fn mark_changed(&mut self) {
690         self.version.decrement();
691     }
692 
693     /// Marks the state as unchanged.
694     ///
695     /// The current value will be considered seen by the receiver.
696     ///
697     /// This is useful if you are not interested in the current value
698     /// visible in the receiver.
mark_unchanged(&mut self)699     pub fn mark_unchanged(&mut self) {
700         let current_version = self.shared.state.load().version();
701         self.version = current_version;
702     }
703 
704     /// Waits for a change notification, then marks the newest value as seen.
705     ///
706     /// If the newest value in the channel has not yet been marked seen when
707     /// this method is called, the method marks that value seen and returns
708     /// immediately. If the newest value has already been marked seen, then the
709     /// method sleeps until a new message is sent by the [`Sender`] connected to
710     /// this `Receiver`, or until the [`Sender`] is dropped.
711     ///
712     /// This method returns an error if and only if the [`Sender`] is dropped.
713     ///
714     /// For more information, see
715     /// [*Change notifications*](self#change-notifications) in the module-level documentation.
716     ///
717     /// # Cancel safety
718     ///
719     /// This method is cancel safe. If you use it as the event in a
720     /// [`tokio::select!`](crate::select) statement and some other branch
721     /// completes first, then it is guaranteed that no values have been marked
722     /// seen by this call to `changed`.
723     ///
724     /// [`Sender`]: struct@Sender
725     ///
726     /// # Examples
727     ///
728     /// ```
729     /// use tokio::sync::watch;
730     ///
731     /// #[tokio::main]
732     /// async fn main() {
733     ///     let (tx, mut rx) = watch::channel("hello");
734     ///
735     ///     tokio::spawn(async move {
736     ///         tx.send("goodbye").unwrap();
737     ///     });
738     ///
739     ///     assert!(rx.changed().await.is_ok());
740     ///     assert_eq!(*rx.borrow_and_update(), "goodbye");
741     ///
742     ///     // The `tx` handle has been dropped
743     ///     assert!(rx.changed().await.is_err());
744     /// }
745     /// ```
changed(&mut self) -> Result<(), error::RecvError>746     pub async fn changed(&mut self) -> Result<(), error::RecvError> {
747         cooperative(changed_impl(&self.shared, &mut self.version)).await
748     }
749 
750     /// Waits for a value that satisfies the provided condition.
751     ///
752     /// This method will call the provided closure whenever something is sent on
753     /// the channel. Once the closure returns `true`, this method will return a
754     /// reference to the value that was passed to the closure.
755     ///
756     /// Before `wait_for` starts waiting for changes, it will call the closure
757     /// on the current value. If the closure returns `true` when given the
758     /// current value, then `wait_for` will immediately return a reference to
759     /// the current value. This is the case even if the current value is already
760     /// considered seen.
761     ///
762     /// The watch channel only keeps track of the most recent value, so if
763     /// several messages are sent faster than `wait_for` is able to call the
764     /// closure, then it may skip some updates. Whenever the closure is called,
765     /// it will be called with the most recent value.
766     ///
767     /// When this function returns, the value that was passed to the closure
768     /// when it returned `true` will be considered seen.
769     ///
770     /// If the channel is closed, then `wait_for` will return a `RecvError`.
771     /// Once this happens, no more messages can ever be sent on the channel.
772     /// When an error is returned, it is guaranteed that the closure has been
773     /// called on the last value, and that it returned `false` for that value.
774     /// (If the closure returned `true`, then the last value would have been
775     /// returned instead of the error.)
776     ///
777     /// Like the `borrow` method, the returned borrow holds a read lock on the
778     /// inner value. This means that long-lived borrows could cause the producer
779     /// half to block. It is recommended to keep the borrow as short-lived as
780     /// possible. See the documentation of `borrow` for more information on
781     /// this.
782     ///
783     /// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
784     ///
785     /// # Examples
786     ///
787     /// ```
788     /// use tokio::sync::watch;
789     ///
790     /// #[tokio::main]
791     ///
792     /// async fn main() {
793     ///     let (tx, _rx) = watch::channel("hello");
794     ///
795     ///     tx.send("goodbye").unwrap();
796     ///
797     ///     // here we subscribe to a second receiver
798     ///     // now in case of using `changed` we would have
799     ///     // to first check the current value and then wait
800     ///     // for changes or else `changed` would hang.
801     ///     let mut rx2 = tx.subscribe();
802     ///
803     ///     // in place of changed we have use `wait_for`
804     ///     // which would automatically check the current value
805     ///     // and wait for changes until the closure returns true.
806     ///     assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok());
807     ///     assert_eq!(*rx2.borrow(), "goodbye");
808     /// }
809     /// ```
wait_for( &mut self, f: impl FnMut(&T) -> bool, ) -> Result<Ref<'_, T>, error::RecvError>810     pub async fn wait_for(
811         &mut self,
812         f: impl FnMut(&T) -> bool,
813     ) -> Result<Ref<'_, T>, error::RecvError> {
814         cooperative(self.wait_for_inner(f)).await
815     }
816 
wait_for_inner( &mut self, mut f: impl FnMut(&T) -> bool, ) -> Result<Ref<'_, T>, error::RecvError>817     async fn wait_for_inner(
818         &mut self,
819         mut f: impl FnMut(&T) -> bool,
820     ) -> Result<Ref<'_, T>, error::RecvError> {
821         let mut closed = false;
822         loop {
823             {
824                 let inner = self.shared.value.read();
825 
826                 let new_version = self.shared.state.load().version();
827                 let has_changed = self.version != new_version;
828                 self.version = new_version;
829 
830                 if !closed || has_changed {
831                     let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner)));
832                     match result {
833                         Ok(true) => {
834                             return Ok(Ref { inner, has_changed });
835                         }
836                         Ok(false) => {
837                             // Skip the value.
838                         }
839                         Err(panicked) => {
840                             // Drop the read-lock to avoid poisoning it.
841                             drop(inner);
842                             // Forward the panic to the caller.
843                             panic::resume_unwind(panicked);
844                             // Unreachable
845                         }
846                     };
847                 }
848             }
849 
850             if closed {
851                 return Err(error::RecvError(()));
852             }
853 
854             // Wait for the value to change.
855             closed = changed_impl(&self.shared, &mut self.version).await.is_err();
856         }
857     }
858 
859     /// Returns `true` if receivers belong to the same channel.
860     ///
861     /// # Examples
862     ///
863     /// ```
864     /// let (tx, rx) = tokio::sync::watch::channel(true);
865     /// let rx2 = rx.clone();
866     /// assert!(rx.same_channel(&rx2));
867     ///
868     /// let (tx3, rx3) = tokio::sync::watch::channel(true);
869     /// assert!(!rx3.same_channel(&rx2));
870     /// ```
same_channel(&self, other: &Self) -> bool871     pub fn same_channel(&self, other: &Self) -> bool {
872         Arc::ptr_eq(&self.shared, &other.shared)
873     }
874 
875     cfg_process_driver! {
876         pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
877             maybe_changed(&self.shared, &mut self.version)
878         }
879     }
880 }
881 
maybe_changed<T>( shared: &Shared<T>, version: &mut Version, ) -> Option<Result<(), error::RecvError>>882 fn maybe_changed<T>(
883     shared: &Shared<T>,
884     version: &mut Version,
885 ) -> Option<Result<(), error::RecvError>> {
886     // Load the version from the state
887     let state = shared.state.load();
888     let new_version = state.version();
889 
890     if *version != new_version {
891         // Observe the new version and return
892         *version = new_version;
893         return Some(Ok(()));
894     }
895 
896     if state.is_closed() {
897         // The sender has been dropped.
898         return Some(Err(error::RecvError(())));
899     }
900 
901     None
902 }
903 
changed_impl<T>( shared: &Shared<T>, version: &mut Version, ) -> Result<(), error::RecvError>904 async fn changed_impl<T>(
905     shared: &Shared<T>,
906     version: &mut Version,
907 ) -> Result<(), error::RecvError> {
908     crate::trace::async_trace_leaf().await;
909 
910     loop {
911         // In order to avoid a race condition, we first request a notification,
912         // **then** check the current value's version. If a new version exists,
913         // the notification request is dropped.
914         let notified = shared.notify_rx.notified();
915 
916         if let Some(ret) = maybe_changed(shared, version) {
917             return ret;
918         }
919 
920         notified.await;
921         // loop around again in case the wake-up was spurious
922     }
923 }
924 
925 impl<T> Clone for Receiver<T> {
clone(&self) -> Self926     fn clone(&self) -> Self {
927         let version = self.version;
928         let shared = self.shared.clone();
929 
930         Self::from_shared(version, shared)
931     }
932 }
933 
934 impl<T> Drop for Receiver<T> {
drop(&mut self)935     fn drop(&mut self) {
936         // No synchronization necessary as this is only used as a counter and
937         // not memory access.
938         if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
939             // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
940             self.shared.notify_tx.notify_waiters();
941         }
942     }
943 }
944 
945 impl<T> Sender<T> {
946     /// Creates the sending-half of the [`watch`] channel.
947     ///
948     /// See documentation of [`watch::channel`] for errors when calling this function.
949     /// Beware that attempting to send a value when there are no receivers will
950     /// return an error.
951     ///
952     /// [`watch`]: crate::sync::watch
953     /// [`watch::channel`]: crate::sync::watch
954     ///
955     /// # Examples
956     /// ```
957     /// let sender = tokio::sync::watch::Sender::new(0u8);
958     /// assert!(sender.send(3).is_err());
959     /// let _rec = sender.subscribe();
960     /// assert!(sender.send(4).is_ok());
961     /// ```
new(init: T) -> Self962     pub fn new(init: T) -> Self {
963         let (tx, _) = channel(init);
964         tx
965     }
966 
967     /// Sends a new value via the channel, notifying all receivers.
968     ///
969     /// This method fails if the channel is closed, which is the case when
970     /// every receiver has been dropped. It is possible to reopen the channel
971     /// using the [`subscribe`] method. However, when `send` fails, the value
972     /// isn't made available for future receivers (but returned with the
973     /// [`SendError`]).
974     ///
975     /// To always make a new value available for future receivers, even if no
976     /// receiver currently exists, one of the other send methods
977     /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
978     /// used instead.
979     ///
980     /// [`subscribe`]: Sender::subscribe
981     /// [`SendError`]: error::SendError
982     /// [`send_if_modified`]: Sender::send_if_modified
983     /// [`send_modify`]: Sender::send_modify
984     /// [`send_replace`]: Sender::send_replace
send(&self, value: T) -> Result<(), error::SendError<T>>985     pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
986         // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
987         if 0 == self.receiver_count() {
988             return Err(error::SendError(value));
989         }
990 
991         self.send_replace(value);
992         Ok(())
993     }
994 
995     /// Modifies the watched value **unconditionally** in-place,
996     /// notifying all receivers.
997     ///
998     /// This can be useful for modifying the watched value, without
999     /// having to allocate a new instance. Additionally, this
1000     /// method permits sending values even when there are no receivers.
1001     ///
1002     /// Prefer to use the more versatile function [`Self::send_if_modified()`]
1003     /// if the value is only modified conditionally during the mutable borrow
1004     /// to prevent unneeded change notifications for unmodified values.
1005     ///
1006     /// # Panics
1007     ///
1008     /// This function panics when the invocation of the `modify` closure panics.
1009     /// No receivers are notified when panicking. All changes of the watched
1010     /// value applied by the closure before panicking will be visible in
1011     /// subsequent calls to `borrow`.
1012     ///
1013     /// # Examples
1014     ///
1015     /// ```
1016     /// use tokio::sync::watch;
1017     ///
1018     /// struct State {
1019     ///     counter: usize,
1020     /// }
1021     /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
1022     /// state_tx.send_modify(|state| state.counter += 1);
1023     /// assert_eq!(state_rx.borrow().counter, 1);
1024     /// ```
send_modify<F>(&self, modify: F) where F: FnOnce(&mut T),1025     pub fn send_modify<F>(&self, modify: F)
1026     where
1027         F: FnOnce(&mut T),
1028     {
1029         self.send_if_modified(|value| {
1030             modify(value);
1031             true
1032         });
1033     }
1034 
1035     /// Modifies the watched value **conditionally** in-place,
1036     /// notifying all receivers only if modified.
1037     ///
1038     /// This can be useful for modifying the watched value, without
1039     /// having to allocate a new instance. Additionally, this
1040     /// method permits sending values even when there are no receivers.
1041     ///
1042     /// The `modify` closure must return `true` if the value has actually
1043     /// been modified during the mutable borrow. It should only return `false`
1044     /// if the value is guaranteed to be unmodified despite the mutable
1045     /// borrow.
1046     ///
1047     /// Receivers are only notified if the closure returned `true`. If the
1048     /// closure has modified the value but returned `false` this results
1049     /// in a *silent modification*, i.e. the modified value will be visible
1050     /// in subsequent calls to `borrow`, but receivers will not receive
1051     /// a change notification.
1052     ///
1053     /// Returns the result of the closure, i.e. `true` if the value has
1054     /// been modified and `false` otherwise.
1055     ///
1056     /// # Panics
1057     ///
1058     /// This function panics when the invocation of the `modify` closure panics.
1059     /// No receivers are notified when panicking. All changes of the watched
1060     /// value applied by the closure before panicking will be visible in
1061     /// subsequent calls to `borrow`.
1062     ///
1063     /// # Examples
1064     ///
1065     /// ```
1066     /// use tokio::sync::watch;
1067     ///
1068     /// struct State {
1069     ///     counter: usize,
1070     /// }
1071     /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
1072     /// let inc_counter_if_odd = |state: &mut State| {
1073     ///     if state.counter % 2 == 1 {
1074     ///         state.counter += 1;
1075     ///         return true;
1076     ///     }
1077     ///     false
1078     /// };
1079     ///
1080     /// assert_eq!(state_rx.borrow().counter, 1);
1081     ///
1082     /// assert!(!state_rx.has_changed().unwrap());
1083     /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
1084     /// assert!(state_rx.has_changed().unwrap());
1085     /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1086     ///
1087     /// assert!(!state_rx.has_changed().unwrap());
1088     /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
1089     /// assert!(!state_rx.has_changed().unwrap());
1090     /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1091     /// ```
send_if_modified<F>(&self, modify: F) -> bool where F: FnOnce(&mut T) -> bool,1092     pub fn send_if_modified<F>(&self, modify: F) -> bool
1093     where
1094         F: FnOnce(&mut T) -> bool,
1095     {
1096         {
1097             // Acquire the write lock and update the value.
1098             let mut lock = self.shared.value.write();
1099 
1100             // Update the value and catch possible panic inside func.
1101             let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
1102             match result {
1103                 Ok(modified) => {
1104                     if !modified {
1105                         // Abort, i.e. don't notify receivers if unmodified
1106                         return false;
1107                     }
1108                     // Continue if modified
1109                 }
1110                 Err(panicked) => {
1111                     // Drop the lock to avoid poisoning it.
1112                     drop(lock);
1113                     // Forward the panic to the caller.
1114                     panic::resume_unwind(panicked);
1115                     // Unreachable
1116                 }
1117             };
1118 
1119             self.shared.state.increment_version_while_locked();
1120 
1121             // Release the write lock.
1122             //
1123             // Incrementing the version counter while holding the lock ensures
1124             // that receivers are able to figure out the version number of the
1125             // value they are currently looking at.
1126             drop(lock);
1127         }
1128 
1129         self.shared.notify_rx.notify_waiters();
1130 
1131         true
1132     }
1133 
1134     /// Sends a new value via the channel, notifying all receivers and returning
1135     /// the previous value in the channel.
1136     ///
1137     /// This can be useful for reusing the buffers inside a watched value.
1138     /// Additionally, this method permits sending values even when there are no
1139     /// receivers.
1140     ///
1141     /// # Examples
1142     ///
1143     /// ```
1144     /// use tokio::sync::watch;
1145     ///
1146     /// let (tx, _rx) = watch::channel(1);
1147     /// assert_eq!(tx.send_replace(2), 1);
1148     /// assert_eq!(tx.send_replace(3), 2);
1149     /// ```
send_replace(&self, mut value: T) -> T1150     pub fn send_replace(&self, mut value: T) -> T {
1151         // swap old watched value with the new one
1152         self.send_modify(|old| mem::swap(old, &mut value));
1153 
1154         value
1155     }
1156 
1157     /// Returns a reference to the most recently sent value
1158     ///
1159     /// Outstanding borrows hold a read lock on the inner value. This means that
1160     /// long-lived borrows could cause the producer half to block. It is recommended
1161     /// to keep the borrow as short-lived as possible. Additionally, if you are
1162     /// running in an environment that allows `!Send` futures, you must ensure that
1163     /// the returned `Ref` type is never held alive across an `.await` point,
1164     /// otherwise, it can lead to a deadlock.
1165     ///
1166     /// # Examples
1167     ///
1168     /// ```
1169     /// use tokio::sync::watch;
1170     ///
1171     /// let (tx, _) = watch::channel("hello");
1172     /// assert_eq!(*tx.borrow(), "hello");
1173     /// ```
borrow(&self) -> Ref<'_, T>1174     pub fn borrow(&self) -> Ref<'_, T> {
1175         let inner = self.shared.value.read();
1176 
1177         // The sender/producer always sees the current version
1178         let has_changed = false;
1179 
1180         Ref { inner, has_changed }
1181     }
1182 
1183     /// Checks if the channel has been closed. This happens when all receivers
1184     /// have dropped.
1185     ///
1186     /// # Examples
1187     ///
1188     /// ```
1189     /// let (tx, rx) = tokio::sync::watch::channel(());
1190     /// assert!(!tx.is_closed());
1191     ///
1192     /// drop(rx);
1193     /// assert!(tx.is_closed());
1194     /// ```
is_closed(&self) -> bool1195     pub fn is_closed(&self) -> bool {
1196         self.receiver_count() == 0
1197     }
1198 
1199     /// Completes when all receivers have dropped.
1200     ///
1201     /// This allows the producer to get notified when interest in the produced
1202     /// values is canceled and immediately stop doing work. Once a channel is
1203     /// closed, the only way to reopen it is to call [`Sender::subscribe`] to
1204     /// get a new receiver.
1205     ///
1206     /// If the channel becomes closed for a brief amount of time (e.g., the last
1207     /// receiver is dropped and then `subscribe` is called), then this call to
1208     /// `closed` might return, but it is also possible that it does not "notice"
1209     /// that the channel was closed for a brief amount of time.
1210     ///
1211     /// # Cancel safety
1212     ///
1213     /// This method is cancel safe.
1214     ///
1215     /// # Examples
1216     ///
1217     /// ```
1218     /// use tokio::sync::watch;
1219     ///
1220     /// #[tokio::main]
1221     /// async fn main() {
1222     ///     let (tx, rx) = watch::channel("hello");
1223     ///
1224     ///     tokio::spawn(async move {
1225     ///         // use `rx`
1226     ///         drop(rx);
1227     ///     });
1228     ///
1229     ///     // Waits for `rx` to drop
1230     ///     tx.closed().await;
1231     ///     println!("the `rx` handles dropped")
1232     /// }
1233     /// ```
closed(&self)1234     pub async fn closed(&self) {
1235         cooperative(async {
1236             crate::trace::async_trace_leaf().await;
1237 
1238             while self.receiver_count() > 0 {
1239                 let notified = self.shared.notify_tx.notified();
1240 
1241                 if self.receiver_count() == 0 {
1242                     return;
1243                 }
1244 
1245                 notified.await;
1246                 // The channel could have been reopened in the meantime by calling
1247                 // `subscribe`, so we loop again.
1248             }
1249         })
1250         .await;
1251     }
1252 
1253     /// Creates a new [`Receiver`] connected to this `Sender`.
1254     ///
1255     /// All messages sent before this call to `subscribe` are initially marked
1256     /// as seen by the new `Receiver`.
1257     ///
1258     /// This method can be called even if there are no other receivers. In this
1259     /// case, the channel is reopened.
1260     ///
1261     /// # Examples
1262     ///
1263     /// The new channel will receive messages sent on this `Sender`.
1264     ///
1265     /// ```
1266     /// use tokio::sync::watch;
1267     ///
1268     /// #[tokio::main]
1269     /// async fn main() {
1270     ///     let (tx, _rx) = watch::channel(0u64);
1271     ///
1272     ///     tx.send(5).unwrap();
1273     ///
1274     ///     let rx = tx.subscribe();
1275     ///     assert_eq!(5, *rx.borrow());
1276     ///
1277     ///     tx.send(10).unwrap();
1278     ///     assert_eq!(10, *rx.borrow());
1279     /// }
1280     /// ```
1281     ///
1282     /// The most recent message is considered seen by the channel, so this test
1283     /// is guaranteed to pass.
1284     ///
1285     /// ```
1286     /// use tokio::sync::watch;
1287     /// use tokio::time::Duration;
1288     ///
1289     /// #[tokio::main]
1290     /// async fn main() {
1291     ///     let (tx, _rx) = watch::channel(0u64);
1292     ///     tx.send(5).unwrap();
1293     ///     let mut rx = tx.subscribe();
1294     ///
1295     ///     tokio::spawn(async move {
1296     ///         // by spawning and sleeping, the message is sent after `main`
1297     ///         // hits the call to `changed`.
1298     ///         # if false {
1299     ///         tokio::time::sleep(Duration::from_millis(10)).await;
1300     ///         # }
1301     ///         tx.send(100).unwrap();
1302     ///     });
1303     ///
1304     ///     rx.changed().await.unwrap();
1305     ///     assert_eq!(100, *rx.borrow());
1306     /// }
1307     /// ```
subscribe(&self) -> Receiver<T>1308     pub fn subscribe(&self) -> Receiver<T> {
1309         let shared = self.shared.clone();
1310         let version = shared.state.load().version();
1311 
1312         // The CLOSED bit in the state tracks only whether the sender is
1313         // dropped, so we do not need to unset it if this reopens the channel.
1314         Receiver::from_shared(version, shared)
1315     }
1316 
1317     /// Returns the number of receivers that currently exist.
1318     ///
1319     /// # Examples
1320     ///
1321     /// ```
1322     /// use tokio::sync::watch;
1323     ///
1324     /// #[tokio::main]
1325     /// async fn main() {
1326     ///     let (tx, rx1) = watch::channel("hello");
1327     ///
1328     ///     assert_eq!(1, tx.receiver_count());
1329     ///
1330     ///     let mut _rx2 = rx1.clone();
1331     ///
1332     ///     assert_eq!(2, tx.receiver_count());
1333     /// }
1334     /// ```
receiver_count(&self) -> usize1335     pub fn receiver_count(&self) -> usize {
1336         self.shared.ref_count_rx.load(Relaxed)
1337     }
1338 
1339     /// Returns the number of senders that currently exist.
1340     ///
1341     /// # Examples
1342     ///
1343     /// ```
1344     /// use tokio::sync::watch;
1345     ///
1346     /// #[tokio::main]
1347     /// async fn main() {
1348     ///     let (tx1, rx) = watch::channel("hello");
1349     ///
1350     ///     assert_eq!(1, tx1.sender_count());
1351     ///
1352     ///     let tx2 = tx1.clone();
1353     ///
1354     ///     assert_eq!(2, tx1.sender_count());
1355     ///     assert_eq!(2, tx2.sender_count());
1356     /// }
1357     /// ```
sender_count(&self) -> usize1358     pub fn sender_count(&self) -> usize {
1359         self.shared.ref_count_tx.load(Relaxed)
1360     }
1361 
1362     /// Returns `true` if senders belong to the same channel.
1363     ///
1364     /// # Examples
1365     ///
1366     /// ```
1367     /// let (tx, rx) = tokio::sync::watch::channel(true);
1368     /// let tx2 = tx.clone();
1369     /// assert!(tx.same_channel(&tx2));
1370     ///
1371     /// let (tx3, rx3) = tokio::sync::watch::channel(true);
1372     /// assert!(!tx3.same_channel(&tx2));
1373     /// ```
same_channel(&self, other: &Self) -> bool1374     pub fn same_channel(&self, other: &Self) -> bool {
1375         Arc::ptr_eq(&self.shared, &other.shared)
1376     }
1377 }
1378 
1379 impl<T> Drop for Sender<T> {
drop(&mut self)1380     fn drop(&mut self) {
1381         if self.shared.ref_count_tx.fetch_sub(1, AcqRel) == 1 {
1382             self.shared.state.set_closed();
1383             self.shared.notify_rx.notify_waiters();
1384         }
1385     }
1386 }
1387 
1388 // ===== impl Ref =====
1389 
1390 impl<T> ops::Deref for Ref<'_, T> {
1391     type Target = T;
1392 
deref(&self) -> &T1393     fn deref(&self) -> &T {
1394         self.inner.deref()
1395     }
1396 }
1397 
1398 #[cfg(all(test, loom))]
1399 mod tests {
1400     use futures::future::FutureExt;
1401     use loom::thread;
1402 
1403     // test for https://github.com/tokio-rs/tokio/issues/3168
1404     #[test]
watch_spurious_wakeup()1405     fn watch_spurious_wakeup() {
1406         loom::model(|| {
1407             let (send, mut recv) = crate::sync::watch::channel(0i32);
1408 
1409             send.send(1).unwrap();
1410 
1411             let send_thread = thread::spawn(move || {
1412                 send.send(2).unwrap();
1413                 send
1414             });
1415 
1416             recv.changed().now_or_never();
1417 
1418             let send = send_thread.join().unwrap();
1419             let recv_thread = thread::spawn(move || {
1420                 recv.changed().now_or_never();
1421                 recv.changed().now_or_never();
1422                 recv
1423             });
1424 
1425             send.send(3).unwrap();
1426 
1427             let mut recv = recv_thread.join().unwrap();
1428             let send_thread = thread::spawn(move || {
1429                 send.send(2).unwrap();
1430             });
1431 
1432             recv.changed().now_or_never();
1433 
1434             send_thread.join().unwrap();
1435         });
1436     }
1437 
1438     #[test]
watch_borrow()1439     fn watch_borrow() {
1440         loom::model(|| {
1441             let (send, mut recv) = crate::sync::watch::channel(0i32);
1442 
1443             assert!(send.borrow().eq(&0));
1444             assert!(recv.borrow().eq(&0));
1445 
1446             send.send(1).unwrap();
1447             assert!(send.borrow().eq(&1));
1448 
1449             let send_thread = thread::spawn(move || {
1450                 send.send(2).unwrap();
1451                 send
1452             });
1453 
1454             recv.changed().now_or_never();
1455 
1456             let send = send_thread.join().unwrap();
1457             let recv_thread = thread::spawn(move || {
1458                 recv.changed().now_or_never();
1459                 recv.changed().now_or_never();
1460                 recv
1461             });
1462 
1463             send.send(3).unwrap();
1464 
1465             let recv = recv_thread.join().unwrap();
1466             assert!(recv.borrow().eq(&3));
1467             assert!(send.borrow().eq(&3));
1468 
1469             send.send(2).unwrap();
1470 
1471             thread::spawn(move || {
1472                 assert!(recv.borrow().eq(&2));
1473             });
1474             assert!(send.borrow().eq(&2));
1475         });
1476     }
1477 }
1478