1 //! Reference counter for channels.
2 
3 use std::boxed::Box;
4 use std::isize;
5 use std::ops;
6 use std::process;
7 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 
9 /// Reference counter internals.
10 struct Counter<C> {
11     /// The number of senders associated with the channel.
12     senders: AtomicUsize,
13 
14     /// The number of receivers associated with the channel.
15     receivers: AtomicUsize,
16 
17     /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
18     destroy: AtomicBool,
19 
20     /// The internal channel.
21     chan: C,
22 }
23 
24 /// Wraps a channel into the reference counter.
new<C>(chan: C) -> (Sender<C>, Receiver<C>)25 pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
26     let counter = Box::into_raw(Box::new(Counter {
27         senders: AtomicUsize::new(1),
28         receivers: AtomicUsize::new(1),
29         destroy: AtomicBool::new(false),
30         chan,
31     }));
32     let s = Sender { counter };
33     let r = Receiver { counter };
34     (s, r)
35 }
36 
37 /// The sending side.
38 pub(crate) struct Sender<C> {
39     counter: *mut Counter<C>,
40 }
41 
42 impl<C> Sender<C> {
43     /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>44     fn counter(&self) -> &Counter<C> {
45         unsafe { &*self.counter }
46     }
47 
48     /// Acquires another sender reference.
acquire(&self) -> Sender<C>49     pub(crate) fn acquire(&self) -> Sender<C> {
50         let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
51 
52         // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
53         // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
54         // just abort when the count becomes very large.
55         if count > isize::MAX as usize {
56             process::abort();
57         }
58 
59         Sender {
60             counter: self.counter,
61         }
62     }
63 
64     /// Releases the sender reference.
65     ///
66     /// Function `disconnect` will be called if this is the last sender reference.
release<F: FnOnce(&C) -> bool>(&self, disconnect: F)67     pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
68         if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
69             disconnect(&self.counter().chan);
70 
71             if self.counter().destroy.swap(true, Ordering::AcqRel) {
72                 drop(Box::from_raw(self.counter));
73             }
74         }
75     }
76 }
77 
78 impl<C> ops::Deref for Sender<C> {
79     type Target = C;
80 
deref(&self) -> &C81     fn deref(&self) -> &C {
82         &self.counter().chan
83     }
84 }
85 
86 impl<C> PartialEq for Sender<C> {
eq(&self, other: &Sender<C>) -> bool87     fn eq(&self, other: &Sender<C>) -> bool {
88         self.counter == other.counter
89     }
90 }
91 
92 /// The receiving side.
93 pub(crate) struct Receiver<C> {
94     counter: *mut Counter<C>,
95 }
96 
97 impl<C> Receiver<C> {
98     /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>99     fn counter(&self) -> &Counter<C> {
100         unsafe { &*self.counter }
101     }
102 
103     /// Acquires another receiver reference.
acquire(&self) -> Receiver<C>104     pub(crate) fn acquire(&self) -> Receiver<C> {
105         let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
106 
107         // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
108         // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
109         // just abort when the count becomes very large.
110         if count > isize::MAX as usize {
111             process::abort();
112         }
113 
114         Receiver {
115             counter: self.counter,
116         }
117     }
118 
119     /// Releases the receiver reference.
120     ///
121     /// Function `disconnect` will be called if this is the last receiver reference.
release<F: FnOnce(&C) -> bool>(&self, disconnect: F)122     pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
123         if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
124             disconnect(&self.counter().chan);
125 
126             if self.counter().destroy.swap(true, Ordering::AcqRel) {
127                 drop(Box::from_raw(self.counter));
128             }
129         }
130     }
131 }
132 
133 impl<C> ops::Deref for Receiver<C> {
134     type Target = C;
135 
deref(&self) -> &C136     fn deref(&self) -> &C {
137         &self.counter().chan
138     }
139 }
140 
141 impl<C> PartialEq for Receiver<C> {
eq(&self, other: &Receiver<C>) -> bool142     fn eq(&self, other: &Receiver<C>) -> bool {
143         self.counter == other.counter
144     }
145 }
146