1 //! Zero-capacity channel. 2 //! 3 //! This kind of channel is also known as *rendezvous* channel. 4 5 use std::boxed::Box; 6 use std::cell::UnsafeCell; 7 use std::marker::PhantomData; 8 use std::sync::atomic::{AtomicBool, Ordering}; 9 use std::sync::Mutex; 10 use std::time::Instant; 11 use std::{fmt, ptr}; 12 13 use crossbeam_utils::Backoff; 14 15 use crate::context::Context; 16 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; 17 use crate::select::{Operation, SelectHandle, Selected, Token}; 18 use crate::waker::Waker; 19 20 /// A pointer to a packet. 21 pub(crate) struct ZeroToken(*mut ()); 22 23 impl Default for ZeroToken { default() -> Self24 fn default() -> Self { 25 Self(ptr::null_mut()) 26 } 27 } 28 29 impl fmt::Debug for ZeroToken { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 31 fmt::Debug::fmt(&(self.0 as usize), f) 32 } 33 } 34 35 /// A slot for passing one message from a sender to a receiver. 36 struct Packet<T> { 37 /// Equals `true` if the packet is allocated on the stack. 38 on_stack: bool, 39 40 /// Equals `true` once the packet is ready for reading or writing. 41 ready: AtomicBool, 42 43 /// The message. 44 msg: UnsafeCell<Option<T>>, 45 } 46 47 impl<T> Packet<T> { 48 /// Creates an empty packet on the stack. empty_on_stack() -> Packet<T>49 fn empty_on_stack() -> Packet<T> { 50 Packet { 51 on_stack: true, 52 ready: AtomicBool::new(false), 53 msg: UnsafeCell::new(None), 54 } 55 } 56 57 /// Creates an empty packet on the heap. empty_on_heap() -> Box<Packet<T>>58 fn empty_on_heap() -> Box<Packet<T>> { 59 Box::new(Packet { 60 on_stack: false, 61 ready: AtomicBool::new(false), 62 msg: UnsafeCell::new(None), 63 }) 64 } 65 66 /// Creates a packet on the stack, containing a message. message_on_stack(msg: T) -> Packet<T>67 fn message_on_stack(msg: T) -> Packet<T> { 68 Packet { 69 on_stack: true, 70 ready: AtomicBool::new(false), 71 msg: UnsafeCell::new(Some(msg)), 72 } 73 } 74 75 /// Waits until the packet becomes ready for reading or writing. wait_ready(&self)76 fn wait_ready(&self) { 77 let backoff = Backoff::new(); 78 while !self.ready.load(Ordering::Acquire) { 79 backoff.snooze(); 80 } 81 } 82 } 83 84 /// Inner representation of a zero-capacity channel. 85 struct Inner { 86 /// Senders waiting to pair up with a receive operation. 87 senders: Waker, 88 89 /// Receivers waiting to pair up with a send operation. 90 receivers: Waker, 91 92 /// Equals `true` when the channel is disconnected. 93 is_disconnected: bool, 94 } 95 96 /// Zero-capacity channel. 97 pub(crate) struct Channel<T> { 98 /// Inner representation of the channel. 99 inner: Mutex<Inner>, 100 101 /// Indicates that dropping a `Channel<T>` may drop values of type `T`. 102 _marker: PhantomData<T>, 103 } 104 105 impl<T> Channel<T> { 106 /// Constructs a new zero-capacity channel. new() -> Self107 pub(crate) fn new() -> Self { 108 Channel { 109 inner: Mutex::new(Inner { 110 senders: Waker::new(), 111 receivers: Waker::new(), 112 is_disconnected: false, 113 }), 114 _marker: PhantomData, 115 } 116 } 117 118 /// Returns a receiver handle to the channel. receiver(&self) -> Receiver<'_, T>119 pub(crate) fn receiver(&self) -> Receiver<'_, T> { 120 Receiver(self) 121 } 122 123 /// Returns a sender handle to the channel. sender(&self) -> Sender<'_, T>124 pub(crate) fn sender(&self) -> Sender<'_, T> { 125 Sender(self) 126 } 127 128 /// Attempts to reserve a slot for sending a message. start_send(&self, token: &mut Token) -> bool129 fn start_send(&self, token: &mut Token) -> bool { 130 let mut inner = self.inner.lock().unwrap(); 131 132 // If there's a waiting receiver, pair up with it. 133 if let Some(operation) = inner.receivers.try_select() { 134 token.zero.0 = operation.packet; 135 true 136 } else if inner.is_disconnected { 137 token.zero.0 = ptr::null_mut(); 138 true 139 } else { 140 false 141 } 142 } 143 144 /// Writes a message into the packet. write(&self, token: &mut Token, msg: T) -> Result<(), T>145 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { 146 // If there is no packet, the channel is disconnected. 147 if token.zero.0.is_null() { 148 return Err(msg); 149 } 150 151 let packet = &*(token.zero.0 as *const Packet<T>); 152 packet.msg.get().write(Some(msg)); 153 packet.ready.store(true, Ordering::Release); 154 Ok(()) 155 } 156 157 /// Attempts to pair up with a sender. start_recv(&self, token: &mut Token) -> bool158 fn start_recv(&self, token: &mut Token) -> bool { 159 let mut inner = self.inner.lock().unwrap(); 160 161 // If there's a waiting sender, pair up with it. 162 if let Some(operation) = inner.senders.try_select() { 163 token.zero.0 = operation.packet; 164 true 165 } else if inner.is_disconnected { 166 token.zero.0 = ptr::null_mut(); 167 true 168 } else { 169 false 170 } 171 } 172 173 /// Reads a message from the packet. read(&self, token: &mut Token) -> Result<T, ()>174 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { 175 // If there is no packet, the channel is disconnected. 176 if token.zero.0.is_null() { 177 return Err(()); 178 } 179 180 let packet = &*(token.zero.0 as *const Packet<T>); 181 182 if packet.on_stack { 183 // The message has been in the packet from the beginning, so there is no need to wait 184 // for it. However, after reading the message, we need to set `ready` to `true` in 185 // order to signal that the packet can be destroyed. 186 let msg = packet.msg.get().replace(None).unwrap(); 187 packet.ready.store(true, Ordering::Release); 188 Ok(msg) 189 } else { 190 // Wait until the message becomes available, then read it and destroy the 191 // heap-allocated packet. 192 packet.wait_ready(); 193 let msg = packet.msg.get().replace(None).unwrap(); 194 drop(Box::from_raw(token.zero.0.cast::<Packet<T>>())); 195 Ok(msg) 196 } 197 } 198 199 /// Attempts to send a message into the channel. try_send(&self, msg: T) -> Result<(), TrySendError<T>>200 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { 201 let token = &mut Token::default(); 202 let mut inner = self.inner.lock().unwrap(); 203 204 // If there's a waiting receiver, pair up with it. 205 if let Some(operation) = inner.receivers.try_select() { 206 token.zero.0 = operation.packet; 207 drop(inner); 208 unsafe { 209 self.write(token, msg).ok().unwrap(); 210 } 211 Ok(()) 212 } else if inner.is_disconnected { 213 Err(TrySendError::Disconnected(msg)) 214 } else { 215 Err(TrySendError::Full(msg)) 216 } 217 } 218 219 /// Sends a message into the channel. send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>220 pub(crate) fn send( 221 &self, 222 msg: T, 223 deadline: Option<Instant>, 224 ) -> Result<(), SendTimeoutError<T>> { 225 let token = &mut Token::default(); 226 let mut inner = self.inner.lock().unwrap(); 227 228 // If there's a waiting receiver, pair up with it. 229 if let Some(operation) = inner.receivers.try_select() { 230 token.zero.0 = operation.packet; 231 drop(inner); 232 unsafe { 233 self.write(token, msg).ok().unwrap(); 234 } 235 return Ok(()); 236 } 237 238 if inner.is_disconnected { 239 return Err(SendTimeoutError::Disconnected(msg)); 240 } 241 242 Context::with(|cx| { 243 // Prepare for blocking until a receiver wakes us up. 244 let oper = Operation::hook(token); 245 let mut packet = Packet::<T>::message_on_stack(msg); 246 inner 247 .senders 248 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx); 249 inner.receivers.notify(); 250 drop(inner); 251 252 // Block the current thread. 253 let sel = cx.wait_until(deadline); 254 255 match sel { 256 Selected::Waiting => unreachable!(), 257 Selected::Aborted => { 258 self.inner.lock().unwrap().senders.unregister(oper).unwrap(); 259 let msg = unsafe { packet.msg.get().replace(None).unwrap() }; 260 Err(SendTimeoutError::Timeout(msg)) 261 } 262 Selected::Disconnected => { 263 self.inner.lock().unwrap().senders.unregister(oper).unwrap(); 264 let msg = unsafe { packet.msg.get().replace(None).unwrap() }; 265 Err(SendTimeoutError::Disconnected(msg)) 266 } 267 Selected::Operation(_) => { 268 // Wait until the message is read, then drop the packet. 269 packet.wait_ready(); 270 Ok(()) 271 } 272 } 273 }) 274 } 275 276 /// Attempts to receive a message without blocking. try_recv(&self) -> Result<T, TryRecvError>277 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { 278 let token = &mut Token::default(); 279 let mut inner = self.inner.lock().unwrap(); 280 281 // If there's a waiting sender, pair up with it. 282 if let Some(operation) = inner.senders.try_select() { 283 token.zero.0 = operation.packet; 284 drop(inner); 285 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } 286 } else if inner.is_disconnected { 287 Err(TryRecvError::Disconnected) 288 } else { 289 Err(TryRecvError::Empty) 290 } 291 } 292 293 /// Receives a message from the channel. recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>294 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { 295 let token = &mut Token::default(); 296 let mut inner = self.inner.lock().unwrap(); 297 298 // If there's a waiting sender, pair up with it. 299 if let Some(operation) = inner.senders.try_select() { 300 token.zero.0 = operation.packet; 301 drop(inner); 302 unsafe { 303 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); 304 } 305 } 306 307 if inner.is_disconnected { 308 return Err(RecvTimeoutError::Disconnected); 309 } 310 311 Context::with(|cx| { 312 // Prepare for blocking until a sender wakes us up. 313 let oper = Operation::hook(token); 314 let mut packet = Packet::<T>::empty_on_stack(); 315 inner.receivers.register_with_packet( 316 oper, 317 &mut packet as *mut Packet<T> as *mut (), 318 cx, 319 ); 320 inner.senders.notify(); 321 drop(inner); 322 323 // Block the current thread. 324 let sel = cx.wait_until(deadline); 325 326 match sel { 327 Selected::Waiting => unreachable!(), 328 Selected::Aborted => { 329 self.inner 330 .lock() 331 .unwrap() 332 .receivers 333 .unregister(oper) 334 .unwrap(); 335 Err(RecvTimeoutError::Timeout) 336 } 337 Selected::Disconnected => { 338 self.inner 339 .lock() 340 .unwrap() 341 .receivers 342 .unregister(oper) 343 .unwrap(); 344 Err(RecvTimeoutError::Disconnected) 345 } 346 Selected::Operation(_) => { 347 // Wait until the message is provided, then read it. 348 packet.wait_ready(); 349 unsafe { Ok(packet.msg.get().replace(None).unwrap()) } 350 } 351 } 352 }) 353 } 354 355 /// Disconnects the channel and wakes up all blocked senders and receivers. 356 /// 357 /// Returns `true` if this call disconnected the channel. disconnect(&self) -> bool358 pub(crate) fn disconnect(&self) -> bool { 359 let mut inner = self.inner.lock().unwrap(); 360 361 if !inner.is_disconnected { 362 inner.is_disconnected = true; 363 inner.senders.disconnect(); 364 inner.receivers.disconnect(); 365 true 366 } else { 367 false 368 } 369 } 370 371 /// Returns the current number of messages inside the channel. len(&self) -> usize372 pub(crate) fn len(&self) -> usize { 373 0 374 } 375 376 /// Returns the capacity of the channel. capacity(&self) -> Option<usize>377 pub(crate) fn capacity(&self) -> Option<usize> { 378 Some(0) 379 } 380 381 /// Returns `true` if the channel is empty. is_empty(&self) -> bool382 pub(crate) fn is_empty(&self) -> bool { 383 true 384 } 385 386 /// Returns `true` if the channel is full. is_full(&self) -> bool387 pub(crate) fn is_full(&self) -> bool { 388 true 389 } 390 } 391 392 /// Receiver handle to a channel. 393 pub(crate) struct Receiver<'a, T>(&'a Channel<T>); 394 395 /// Sender handle to a channel. 396 pub(crate) struct Sender<'a, T>(&'a Channel<T>); 397 398 impl<T> SelectHandle for Receiver<'_, T> { try_select(&self, token: &mut Token) -> bool399 fn try_select(&self, token: &mut Token) -> bool { 400 self.0.start_recv(token) 401 } 402 deadline(&self) -> Option<Instant>403 fn deadline(&self) -> Option<Instant> { 404 None 405 } 406 register(&self, oper: Operation, cx: &Context) -> bool407 fn register(&self, oper: Operation, cx: &Context) -> bool { 408 let packet = Box::into_raw(Packet::<T>::empty_on_heap()); 409 410 let mut inner = self.0.inner.lock().unwrap(); 411 inner 412 .receivers 413 .register_with_packet(oper, packet.cast::<()>(), cx); 414 inner.senders.notify(); 415 inner.senders.can_select() || inner.is_disconnected 416 } 417 unregister(&self, oper: Operation)418 fn unregister(&self, oper: Operation) { 419 if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) { 420 unsafe { 421 drop(Box::from_raw(operation.packet.cast::<Packet<T>>())); 422 } 423 } 424 } 425 accept(&self, token: &mut Token, cx: &Context) -> bool426 fn accept(&self, token: &mut Token, cx: &Context) -> bool { 427 token.zero.0 = cx.wait_packet(); 428 true 429 } 430 is_ready(&self) -> bool431 fn is_ready(&self) -> bool { 432 let inner = self.0.inner.lock().unwrap(); 433 inner.senders.can_select() || inner.is_disconnected 434 } 435 watch(&self, oper: Operation, cx: &Context) -> bool436 fn watch(&self, oper: Operation, cx: &Context) -> bool { 437 let mut inner = self.0.inner.lock().unwrap(); 438 inner.receivers.watch(oper, cx); 439 inner.senders.can_select() || inner.is_disconnected 440 } 441 unwatch(&self, oper: Operation)442 fn unwatch(&self, oper: Operation) { 443 let mut inner = self.0.inner.lock().unwrap(); 444 inner.receivers.unwatch(oper); 445 } 446 } 447 448 impl<T> SelectHandle for Sender<'_, T> { try_select(&self, token: &mut Token) -> bool449 fn try_select(&self, token: &mut Token) -> bool { 450 self.0.start_send(token) 451 } 452 deadline(&self) -> Option<Instant>453 fn deadline(&self) -> Option<Instant> { 454 None 455 } 456 register(&self, oper: Operation, cx: &Context) -> bool457 fn register(&self, oper: Operation, cx: &Context) -> bool { 458 let packet = Box::into_raw(Packet::<T>::empty_on_heap()); 459 460 let mut inner = self.0.inner.lock().unwrap(); 461 inner 462 .senders 463 .register_with_packet(oper, packet.cast::<()>(), cx); 464 inner.receivers.notify(); 465 inner.receivers.can_select() || inner.is_disconnected 466 } 467 unregister(&self, oper: Operation)468 fn unregister(&self, oper: Operation) { 469 if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) { 470 unsafe { 471 drop(Box::from_raw(operation.packet.cast::<Packet<T>>())); 472 } 473 } 474 } 475 accept(&self, token: &mut Token, cx: &Context) -> bool476 fn accept(&self, token: &mut Token, cx: &Context) -> bool { 477 token.zero.0 = cx.wait_packet(); 478 true 479 } 480 is_ready(&self) -> bool481 fn is_ready(&self) -> bool { 482 let inner = self.0.inner.lock().unwrap(); 483 inner.receivers.can_select() || inner.is_disconnected 484 } 485 watch(&self, oper: Operation, cx: &Context) -> bool486 fn watch(&self, oper: Operation, cx: &Context) -> bool { 487 let mut inner = self.0.inner.lock().unwrap(); 488 inner.senders.watch(oper, cx); 489 inner.receivers.can_select() || inner.is_disconnected 490 } 491 unwatch(&self, oper: Operation)492 fn unwatch(&self, oper: Operation) { 493 let mut inner = self.0.inner.lock().unwrap(); 494 inner.senders.unwatch(oper); 495 } 496 } 497