1 //! Reference counter for channels.
2
3 use std::boxed::Box;
4 use std::isize;
5 use std::ops;
6 use std::process;
7 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8
9 /// Reference counter internals.
10 struct Counter<C> {
11 /// The number of senders associated with the channel.
12 senders: AtomicUsize,
13
14 /// The number of receivers associated with the channel.
15 receivers: AtomicUsize,
16
17 /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
18 destroy: AtomicBool,
19
20 /// The internal channel.
21 chan: C,
22 }
23
24 /// Wraps a channel into the reference counter.
new<C>(chan: C) -> (Sender<C>, Receiver<C>)25 pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
26 let counter = Box::into_raw(Box::new(Counter {
27 senders: AtomicUsize::new(1),
28 receivers: AtomicUsize::new(1),
29 destroy: AtomicBool::new(false),
30 chan,
31 }));
32 let s = Sender { counter };
33 let r = Receiver { counter };
34 (s, r)
35 }
36
37 /// The sending side.
38 pub(crate) struct Sender<C> {
39 counter: *mut Counter<C>,
40 }
41
42 impl<C> Sender<C> {
43 /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>44 fn counter(&self) -> &Counter<C> {
45 unsafe { &*self.counter }
46 }
47
48 /// Acquires another sender reference.
acquire(&self) -> Sender<C>49 pub(crate) fn acquire(&self) -> Sender<C> {
50 let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
51
52 // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
53 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
54 // just abort when the count becomes very large.
55 if count > isize::MAX as usize {
56 process::abort();
57 }
58
59 Sender {
60 counter: self.counter,
61 }
62 }
63
64 /// Releases the sender reference.
65 ///
66 /// Function `disconnect` will be called if this is the last sender reference.
release<F: FnOnce(&C) -> bool>(&self, disconnect: F)67 pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
68 if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
69 disconnect(&self.counter().chan);
70
71 if self.counter().destroy.swap(true, Ordering::AcqRel) {
72 drop(Box::from_raw(self.counter));
73 }
74 }
75 }
76 }
77
78 impl<C> ops::Deref for Sender<C> {
79 type Target = C;
80
deref(&self) -> &C81 fn deref(&self) -> &C {
82 &self.counter().chan
83 }
84 }
85
86 impl<C> PartialEq for Sender<C> {
eq(&self, other: &Sender<C>) -> bool87 fn eq(&self, other: &Sender<C>) -> bool {
88 self.counter == other.counter
89 }
90 }
91
92 /// The receiving side.
93 pub(crate) struct Receiver<C> {
94 counter: *mut Counter<C>,
95 }
96
97 impl<C> Receiver<C> {
98 /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>99 fn counter(&self) -> &Counter<C> {
100 unsafe { &*self.counter }
101 }
102
103 /// Acquires another receiver reference.
acquire(&self) -> Receiver<C>104 pub(crate) fn acquire(&self) -> Receiver<C> {
105 let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
106
107 // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
108 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
109 // just abort when the count becomes very large.
110 if count > isize::MAX as usize {
111 process::abort();
112 }
113
114 Receiver {
115 counter: self.counter,
116 }
117 }
118
119 /// Releases the receiver reference.
120 ///
121 /// Function `disconnect` will be called if this is the last receiver reference.
release<F: FnOnce(&C) -> bool>(&self, disconnect: F)122 pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
123 if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
124 disconnect(&self.counter().chan);
125
126 if self.counter().destroy.swap(true, Ordering::AcqRel) {
127 drop(Box::from_raw(self.counter));
128 }
129 }
130 }
131 }
132
133 impl<C> ops::Deref for Receiver<C> {
134 type Target = C;
135
deref(&self) -> &C136 fn deref(&self) -> &C {
137 &self.counter().chan
138 }
139 }
140
141 impl<C> PartialEq for Receiver<C> {
eq(&self, other: &Receiver<C>) -> bool142 fn eq(&self, other: &Receiver<C>) -> bool {
143 self.counter == other.counter
144 }
145 }
146