1 //! Zero-capacity channel.
2 //!
3 //! This kind of channel is also known as *rendezvous* channel.
4 
5 use std::boxed::Box;
6 use std::cell::UnsafeCell;
7 use std::marker::PhantomData;
8 use std::sync::atomic::{AtomicBool, Ordering};
9 use std::sync::Mutex;
10 use std::time::Instant;
11 use std::{fmt, ptr};
12 
13 use crossbeam_utils::Backoff;
14 
15 use crate::context::Context;
16 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
17 use crate::select::{Operation, SelectHandle, Selected, Token};
18 use crate::waker::Waker;
19 
20 /// A pointer to a packet.
21 pub(crate) struct ZeroToken(*mut ());
22 
23 impl Default for ZeroToken {
default() -> Self24     fn default() -> Self {
25         Self(ptr::null_mut())
26     }
27 }
28 
29 impl fmt::Debug for ZeroToken {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result30     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31         fmt::Debug::fmt(&(self.0 as usize), f)
32     }
33 }
34 
35 /// A slot for passing one message from a sender to a receiver.
36 struct Packet<T> {
37     /// Equals `true` if the packet is allocated on the stack.
38     on_stack: bool,
39 
40     /// Equals `true` once the packet is ready for reading or writing.
41     ready: AtomicBool,
42 
43     /// The message.
44     msg: UnsafeCell<Option<T>>,
45 }
46 
47 impl<T> Packet<T> {
48     /// Creates an empty packet on the stack.
empty_on_stack() -> Packet<T>49     fn empty_on_stack() -> Packet<T> {
50         Packet {
51             on_stack: true,
52             ready: AtomicBool::new(false),
53             msg: UnsafeCell::new(None),
54         }
55     }
56 
57     /// Creates an empty packet on the heap.
empty_on_heap() -> Box<Packet<T>>58     fn empty_on_heap() -> Box<Packet<T>> {
59         Box::new(Packet {
60             on_stack: false,
61             ready: AtomicBool::new(false),
62             msg: UnsafeCell::new(None),
63         })
64     }
65 
66     /// Creates a packet on the stack, containing a message.
message_on_stack(msg: T) -> Packet<T>67     fn message_on_stack(msg: T) -> Packet<T> {
68         Packet {
69             on_stack: true,
70             ready: AtomicBool::new(false),
71             msg: UnsafeCell::new(Some(msg)),
72         }
73     }
74 
75     /// Waits until the packet becomes ready for reading or writing.
wait_ready(&self)76     fn wait_ready(&self) {
77         let backoff = Backoff::new();
78         while !self.ready.load(Ordering::Acquire) {
79             backoff.snooze();
80         }
81     }
82 }
83 
84 /// Inner representation of a zero-capacity channel.
85 struct Inner {
86     /// Senders waiting to pair up with a receive operation.
87     senders: Waker,
88 
89     /// Receivers waiting to pair up with a send operation.
90     receivers: Waker,
91 
92     /// Equals `true` when the channel is disconnected.
93     is_disconnected: bool,
94 }
95 
96 /// Zero-capacity channel.
97 pub(crate) struct Channel<T> {
98     /// Inner representation of the channel.
99     inner: Mutex<Inner>,
100 
101     /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
102     _marker: PhantomData<T>,
103 }
104 
105 impl<T> Channel<T> {
106     /// Constructs a new zero-capacity channel.
new() -> Self107     pub(crate) fn new() -> Self {
108         Channel {
109             inner: Mutex::new(Inner {
110                 senders: Waker::new(),
111                 receivers: Waker::new(),
112                 is_disconnected: false,
113             }),
114             _marker: PhantomData,
115         }
116     }
117 
118     /// Returns a receiver handle to the channel.
receiver(&self) -> Receiver<'_, T>119     pub(crate) fn receiver(&self) -> Receiver<'_, T> {
120         Receiver(self)
121     }
122 
123     /// Returns a sender handle to the channel.
sender(&self) -> Sender<'_, T>124     pub(crate) fn sender(&self) -> Sender<'_, T> {
125         Sender(self)
126     }
127 
128     /// Attempts to reserve a slot for sending a message.
start_send(&self, token: &mut Token) -> bool129     fn start_send(&self, token: &mut Token) -> bool {
130         let mut inner = self.inner.lock().unwrap();
131 
132         // If there's a waiting receiver, pair up with it.
133         if let Some(operation) = inner.receivers.try_select() {
134             token.zero.0 = operation.packet;
135             true
136         } else if inner.is_disconnected {
137             token.zero.0 = ptr::null_mut();
138             true
139         } else {
140             false
141         }
142     }
143 
144     /// Writes a message into the packet.
write(&self, token: &mut Token, msg: T) -> Result<(), T>145     pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
146         // If there is no packet, the channel is disconnected.
147         if token.zero.0.is_null() {
148             return Err(msg);
149         }
150 
151         let packet = &*(token.zero.0 as *const Packet<T>);
152         packet.msg.get().write(Some(msg));
153         packet.ready.store(true, Ordering::Release);
154         Ok(())
155     }
156 
157     /// Attempts to pair up with a sender.
start_recv(&self, token: &mut Token) -> bool158     fn start_recv(&self, token: &mut Token) -> bool {
159         let mut inner = self.inner.lock().unwrap();
160 
161         // If there's a waiting sender, pair up with it.
162         if let Some(operation) = inner.senders.try_select() {
163             token.zero.0 = operation.packet;
164             true
165         } else if inner.is_disconnected {
166             token.zero.0 = ptr::null_mut();
167             true
168         } else {
169             false
170         }
171     }
172 
173     /// Reads a message from the packet.
read(&self, token: &mut Token) -> Result<T, ()>174     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
175         // If there is no packet, the channel is disconnected.
176         if token.zero.0.is_null() {
177             return Err(());
178         }
179 
180         let packet = &*(token.zero.0 as *const Packet<T>);
181 
182         if packet.on_stack {
183             // The message has been in the packet from the beginning, so there is no need to wait
184             // for it. However, after reading the message, we need to set `ready` to `true` in
185             // order to signal that the packet can be destroyed.
186             let msg = packet.msg.get().replace(None).unwrap();
187             packet.ready.store(true, Ordering::Release);
188             Ok(msg)
189         } else {
190             // Wait until the message becomes available, then read it and destroy the
191             // heap-allocated packet.
192             packet.wait_ready();
193             let msg = packet.msg.get().replace(None).unwrap();
194             drop(Box::from_raw(token.zero.0.cast::<Packet<T>>()));
195             Ok(msg)
196         }
197     }
198 
199     /// Attempts to send a message into the channel.
try_send(&self, msg: T) -> Result<(), TrySendError<T>>200     pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
201         let token = &mut Token::default();
202         let mut inner = self.inner.lock().unwrap();
203 
204         // If there's a waiting receiver, pair up with it.
205         if let Some(operation) = inner.receivers.try_select() {
206             token.zero.0 = operation.packet;
207             drop(inner);
208             unsafe {
209                 self.write(token, msg).ok().unwrap();
210             }
211             Ok(())
212         } else if inner.is_disconnected {
213             Err(TrySendError::Disconnected(msg))
214         } else {
215             Err(TrySendError::Full(msg))
216         }
217     }
218 
219     /// Sends a message into the channel.
send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>220     pub(crate) fn send(
221         &self,
222         msg: T,
223         deadline: Option<Instant>,
224     ) -> Result<(), SendTimeoutError<T>> {
225         let token = &mut Token::default();
226         let mut inner = self.inner.lock().unwrap();
227 
228         // If there's a waiting receiver, pair up with it.
229         if let Some(operation) = inner.receivers.try_select() {
230             token.zero.0 = operation.packet;
231             drop(inner);
232             unsafe {
233                 self.write(token, msg).ok().unwrap();
234             }
235             return Ok(());
236         }
237 
238         if inner.is_disconnected {
239             return Err(SendTimeoutError::Disconnected(msg));
240         }
241 
242         Context::with(|cx| {
243             // Prepare for blocking until a receiver wakes us up.
244             let oper = Operation::hook(token);
245             let mut packet = Packet::<T>::message_on_stack(msg);
246             inner
247                 .senders
248                 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
249             inner.receivers.notify();
250             drop(inner);
251 
252             // Block the current thread.
253             let sel = cx.wait_until(deadline);
254 
255             match sel {
256                 Selected::Waiting => unreachable!(),
257                 Selected::Aborted => {
258                     self.inner.lock().unwrap().senders.unregister(oper).unwrap();
259                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
260                     Err(SendTimeoutError::Timeout(msg))
261                 }
262                 Selected::Disconnected => {
263                     self.inner.lock().unwrap().senders.unregister(oper).unwrap();
264                     let msg = unsafe { packet.msg.get().replace(None).unwrap() };
265                     Err(SendTimeoutError::Disconnected(msg))
266                 }
267                 Selected::Operation(_) => {
268                     // Wait until the message is read, then drop the packet.
269                     packet.wait_ready();
270                     Ok(())
271                 }
272             }
273         })
274     }
275 
276     /// Attempts to receive a message without blocking.
try_recv(&self) -> Result<T, TryRecvError>277     pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
278         let token = &mut Token::default();
279         let mut inner = self.inner.lock().unwrap();
280 
281         // If there's a waiting sender, pair up with it.
282         if let Some(operation) = inner.senders.try_select() {
283             token.zero.0 = operation.packet;
284             drop(inner);
285             unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
286         } else if inner.is_disconnected {
287             Err(TryRecvError::Disconnected)
288         } else {
289             Err(TryRecvError::Empty)
290         }
291     }
292 
293     /// Receives a message from the channel.
recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>294     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
295         let token = &mut Token::default();
296         let mut inner = self.inner.lock().unwrap();
297 
298         // If there's a waiting sender, pair up with it.
299         if let Some(operation) = inner.senders.try_select() {
300             token.zero.0 = operation.packet;
301             drop(inner);
302             unsafe {
303                 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
304             }
305         }
306 
307         if inner.is_disconnected {
308             return Err(RecvTimeoutError::Disconnected);
309         }
310 
311         Context::with(|cx| {
312             // Prepare for blocking until a sender wakes us up.
313             let oper = Operation::hook(token);
314             let mut packet = Packet::<T>::empty_on_stack();
315             inner.receivers.register_with_packet(
316                 oper,
317                 &mut packet as *mut Packet<T> as *mut (),
318                 cx,
319             );
320             inner.senders.notify();
321             drop(inner);
322 
323             // Block the current thread.
324             let sel = cx.wait_until(deadline);
325 
326             match sel {
327                 Selected::Waiting => unreachable!(),
328                 Selected::Aborted => {
329                     self.inner
330                         .lock()
331                         .unwrap()
332                         .receivers
333                         .unregister(oper)
334                         .unwrap();
335                     Err(RecvTimeoutError::Timeout)
336                 }
337                 Selected::Disconnected => {
338                     self.inner
339                         .lock()
340                         .unwrap()
341                         .receivers
342                         .unregister(oper)
343                         .unwrap();
344                     Err(RecvTimeoutError::Disconnected)
345                 }
346                 Selected::Operation(_) => {
347                     // Wait until the message is provided, then read it.
348                     packet.wait_ready();
349                     unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
350                 }
351             }
352         })
353     }
354 
355     /// Disconnects the channel and wakes up all blocked senders and receivers.
356     ///
357     /// Returns `true` if this call disconnected the channel.
disconnect(&self) -> bool358     pub(crate) fn disconnect(&self) -> bool {
359         let mut inner = self.inner.lock().unwrap();
360 
361         if !inner.is_disconnected {
362             inner.is_disconnected = true;
363             inner.senders.disconnect();
364             inner.receivers.disconnect();
365             true
366         } else {
367             false
368         }
369     }
370 
371     /// Returns the current number of messages inside the channel.
len(&self) -> usize372     pub(crate) fn len(&self) -> usize {
373         0
374     }
375 
376     /// Returns the capacity of the channel.
capacity(&self) -> Option<usize>377     pub(crate) fn capacity(&self) -> Option<usize> {
378         Some(0)
379     }
380 
381     /// Returns `true` if the channel is empty.
is_empty(&self) -> bool382     pub(crate) fn is_empty(&self) -> bool {
383         true
384     }
385 
386     /// Returns `true` if the channel is full.
is_full(&self) -> bool387     pub(crate) fn is_full(&self) -> bool {
388         true
389     }
390 }
391 
392 /// Receiver handle to a channel.
393 pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
394 
395 /// Sender handle to a channel.
396 pub(crate) struct Sender<'a, T>(&'a Channel<T>);
397 
398 impl<T> SelectHandle for Receiver<'_, T> {
try_select(&self, token: &mut Token) -> bool399     fn try_select(&self, token: &mut Token) -> bool {
400         self.0.start_recv(token)
401     }
402 
deadline(&self) -> Option<Instant>403     fn deadline(&self) -> Option<Instant> {
404         None
405     }
406 
register(&self, oper: Operation, cx: &Context) -> bool407     fn register(&self, oper: Operation, cx: &Context) -> bool {
408         let packet = Box::into_raw(Packet::<T>::empty_on_heap());
409 
410         let mut inner = self.0.inner.lock().unwrap();
411         inner
412             .receivers
413             .register_with_packet(oper, packet.cast::<()>(), cx);
414         inner.senders.notify();
415         inner.senders.can_select() || inner.is_disconnected
416     }
417 
unregister(&self, oper: Operation)418     fn unregister(&self, oper: Operation) {
419         if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
420             unsafe {
421                 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
422             }
423         }
424     }
425 
accept(&self, token: &mut Token, cx: &Context) -> bool426     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
427         token.zero.0 = cx.wait_packet();
428         true
429     }
430 
is_ready(&self) -> bool431     fn is_ready(&self) -> bool {
432         let inner = self.0.inner.lock().unwrap();
433         inner.senders.can_select() || inner.is_disconnected
434     }
435 
watch(&self, oper: Operation, cx: &Context) -> bool436     fn watch(&self, oper: Operation, cx: &Context) -> bool {
437         let mut inner = self.0.inner.lock().unwrap();
438         inner.receivers.watch(oper, cx);
439         inner.senders.can_select() || inner.is_disconnected
440     }
441 
unwatch(&self, oper: Operation)442     fn unwatch(&self, oper: Operation) {
443         let mut inner = self.0.inner.lock().unwrap();
444         inner.receivers.unwatch(oper);
445     }
446 }
447 
448 impl<T> SelectHandle for Sender<'_, T> {
try_select(&self, token: &mut Token) -> bool449     fn try_select(&self, token: &mut Token) -> bool {
450         self.0.start_send(token)
451     }
452 
deadline(&self) -> Option<Instant>453     fn deadline(&self) -> Option<Instant> {
454         None
455     }
456 
register(&self, oper: Operation, cx: &Context) -> bool457     fn register(&self, oper: Operation, cx: &Context) -> bool {
458         let packet = Box::into_raw(Packet::<T>::empty_on_heap());
459 
460         let mut inner = self.0.inner.lock().unwrap();
461         inner
462             .senders
463             .register_with_packet(oper, packet.cast::<()>(), cx);
464         inner.receivers.notify();
465         inner.receivers.can_select() || inner.is_disconnected
466     }
467 
unregister(&self, oper: Operation)468     fn unregister(&self, oper: Operation) {
469         if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
470             unsafe {
471                 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
472             }
473         }
474     }
475 
accept(&self, token: &mut Token, cx: &Context) -> bool476     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
477         token.zero.0 = cx.wait_packet();
478         true
479     }
480 
is_ready(&self) -> bool481     fn is_ready(&self) -> bool {
482         let inner = self.0.inner.lock().unwrap();
483         inner.receivers.can_select() || inner.is_disconnected
484     }
485 
watch(&self, oper: Operation, cx: &Context) -> bool486     fn watch(&self, oper: Operation, cx: &Context) -> bool {
487         let mut inner = self.0.inner.lock().unwrap();
488         inner.senders.watch(oper, cx);
489         inner.receivers.can_select() || inner.is_disconnected
490     }
491 
unwatch(&self, oper: Operation)492     fn unwatch(&self, oper: Operation) {
493         let mut inner = self.0.inner.lock().unwrap();
494         inner.senders.unwatch(oper);
495     }
496 }
497