1 //! Bounded channel based on a preallocated array. 2 //! 3 //! This flavor has a fixed, positive capacity. 4 //! 5 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue. 6 //! 7 //! Source: 8 //! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue> 9 //! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub> 10 11 use std::boxed::Box; 12 use std::cell::UnsafeCell; 13 use std::mem::{self, MaybeUninit}; 14 use std::ptr; 15 use std::sync::atomic::{self, AtomicUsize, Ordering}; 16 use std::time::Instant; 17 18 use crossbeam_utils::{Backoff, CachePadded}; 19 20 use crate::context::Context; 21 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; 22 use crate::select::{Operation, SelectHandle, Selected, Token}; 23 use crate::waker::SyncWaker; 24 25 /// A slot in a channel. 26 struct Slot<T> { 27 /// The current stamp. 28 stamp: AtomicUsize, 29 30 /// The message in this slot. 31 msg: UnsafeCell<MaybeUninit<T>>, 32 } 33 34 /// The token type for the array flavor. 35 #[derive(Debug)] 36 pub(crate) struct ArrayToken { 37 /// Slot to read from or write to. 38 slot: *const u8, 39 40 /// Stamp to store into the slot after reading or writing. 41 stamp: usize, 42 } 43 44 impl Default for ArrayToken { 45 #[inline] default() -> Self46 fn default() -> Self { 47 ArrayToken { 48 slot: ptr::null(), 49 stamp: 0, 50 } 51 } 52 } 53 54 /// Bounded channel based on a preallocated array. 55 pub(crate) struct Channel<T> { 56 /// The head of the channel. 57 /// 58 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but 59 /// packed into a single `usize`. The lower bits represent the index, while the upper bits 60 /// represent the lap. The mark bit in the head is always zero. 61 /// 62 /// Messages are popped from the head of the channel. 63 head: CachePadded<AtomicUsize>, 64 65 /// The tail of the channel. 66 /// 67 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but 68 /// packed into a single `usize`. The lower bits represent the index, while the upper bits 69 /// represent the lap. The mark bit indicates that the channel is disconnected. 70 /// 71 /// Messages are pushed into the tail of the channel. 72 tail: CachePadded<AtomicUsize>, 73 74 /// The buffer holding slots. 75 buffer: Box<[Slot<T>]>, 76 77 /// The channel capacity. 78 cap: usize, 79 80 /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. 81 one_lap: usize, 82 83 /// If this bit is set in the tail, that means the channel is disconnected. 84 mark_bit: usize, 85 86 /// Senders waiting while the channel is full. 87 senders: SyncWaker, 88 89 /// Receivers waiting while the channel is empty and not disconnected. 90 receivers: SyncWaker, 91 } 92 93 impl<T> Channel<T> { 94 /// Creates a bounded channel of capacity `cap`. with_capacity(cap: usize) -> Self95 pub(crate) fn with_capacity(cap: usize) -> Self { 96 assert!(cap > 0, "capacity must be positive"); 97 98 // Compute constants `mark_bit` and `one_lap`. 99 let mark_bit = (cap + 1).next_power_of_two(); 100 let one_lap = mark_bit * 2; 101 102 // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. 103 let head = 0; 104 // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. 105 let tail = 0; 106 107 // Allocate a buffer of `cap` slots initialized 108 // with stamps. 109 let buffer: Box<[Slot<T>]> = (0..cap) 110 .map(|i| { 111 // Set the stamp to `{ lap: 0, mark: 0, index: i }`. 112 Slot { 113 stamp: AtomicUsize::new(i), 114 msg: UnsafeCell::new(MaybeUninit::uninit()), 115 } 116 }) 117 .collect(); 118 119 Channel { 120 buffer, 121 cap, 122 one_lap, 123 mark_bit, 124 head: CachePadded::new(AtomicUsize::new(head)), 125 tail: CachePadded::new(AtomicUsize::new(tail)), 126 senders: SyncWaker::new(), 127 receivers: SyncWaker::new(), 128 } 129 } 130 131 /// Returns a receiver handle to the channel. receiver(&self) -> Receiver<'_, T>132 pub(crate) fn receiver(&self) -> Receiver<'_, T> { 133 Receiver(self) 134 } 135 136 /// Returns a sender handle to the channel. sender(&self) -> Sender<'_, T>137 pub(crate) fn sender(&self) -> Sender<'_, T> { 138 Sender(self) 139 } 140 141 /// Attempts to reserve a slot for sending a message. start_send(&self, token: &mut Token) -> bool142 fn start_send(&self, token: &mut Token) -> bool { 143 let backoff = Backoff::new(); 144 let mut tail = self.tail.load(Ordering::Relaxed); 145 146 loop { 147 // Check if the channel is disconnected. 148 if tail & self.mark_bit != 0 { 149 token.array.slot = ptr::null(); 150 token.array.stamp = 0; 151 return true; 152 } 153 154 // Deconstruct the tail. 155 let index = tail & (self.mark_bit - 1); 156 let lap = tail & !(self.one_lap - 1); 157 158 // Inspect the corresponding slot. 159 debug_assert!(index < self.buffer.len()); 160 let slot = unsafe { self.buffer.get_unchecked(index) }; 161 let stamp = slot.stamp.load(Ordering::Acquire); 162 163 // If the tail and the stamp match, we may attempt to push. 164 if tail == stamp { 165 let new_tail = if index + 1 < self.cap { 166 // Same lap, incremented index. 167 // Set to `{ lap: lap, mark: 0, index: index + 1 }`. 168 tail + 1 169 } else { 170 // One lap forward, index wraps around to zero. 171 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. 172 lap.wrapping_add(self.one_lap) 173 }; 174 175 // Try moving the tail. 176 match self.tail.compare_exchange_weak( 177 tail, 178 new_tail, 179 Ordering::SeqCst, 180 Ordering::Relaxed, 181 ) { 182 Ok(_) => { 183 // Prepare the token for the follow-up call to `write`. 184 token.array.slot = slot as *const Slot<T> as *const u8; 185 token.array.stamp = tail + 1; 186 return true; 187 } 188 Err(t) => { 189 tail = t; 190 backoff.spin(); 191 } 192 } 193 } else if stamp.wrapping_add(self.one_lap) == tail + 1 { 194 atomic::fence(Ordering::SeqCst); 195 let head = self.head.load(Ordering::Relaxed); 196 197 // If the head lags one lap behind the tail as well... 198 if head.wrapping_add(self.one_lap) == tail { 199 // ...then the channel is full. 200 return false; 201 } 202 203 backoff.spin(); 204 tail = self.tail.load(Ordering::Relaxed); 205 } else { 206 // Snooze because we need to wait for the stamp to get updated. 207 backoff.snooze(); 208 tail = self.tail.load(Ordering::Relaxed); 209 } 210 } 211 } 212 213 /// Writes a message into the channel. write(&self, token: &mut Token, msg: T) -> Result<(), T>214 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { 215 // If there is no slot, the channel is disconnected. 216 if token.array.slot.is_null() { 217 return Err(msg); 218 } 219 220 let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>(); 221 222 // Write the message into the slot and update the stamp. 223 slot.msg.get().write(MaybeUninit::new(msg)); 224 slot.stamp.store(token.array.stamp, Ordering::Release); 225 226 // Wake a sleeping receiver. 227 self.receivers.notify(); 228 Ok(()) 229 } 230 231 /// Attempts to reserve a slot for receiving a message. start_recv(&self, token: &mut Token) -> bool232 fn start_recv(&self, token: &mut Token) -> bool { 233 let backoff = Backoff::new(); 234 let mut head = self.head.load(Ordering::Relaxed); 235 236 loop { 237 // Deconstruct the head. 238 let index = head & (self.mark_bit - 1); 239 let lap = head & !(self.one_lap - 1); 240 241 // Inspect the corresponding slot. 242 debug_assert!(index < self.buffer.len()); 243 let slot = unsafe { self.buffer.get_unchecked(index) }; 244 let stamp = slot.stamp.load(Ordering::Acquire); 245 246 // If the stamp is ahead of the head by 1, we may attempt to pop. 247 if head + 1 == stamp { 248 let new = if index + 1 < self.cap { 249 // Same lap, incremented index. 250 // Set to `{ lap: lap, mark: 0, index: index + 1 }`. 251 head + 1 252 } else { 253 // One lap forward, index wraps around to zero. 254 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. 255 lap.wrapping_add(self.one_lap) 256 }; 257 258 // Try moving the head. 259 match self.head.compare_exchange_weak( 260 head, 261 new, 262 Ordering::SeqCst, 263 Ordering::Relaxed, 264 ) { 265 Ok(_) => { 266 // Prepare the token for the follow-up call to `read`. 267 token.array.slot = slot as *const Slot<T> as *const u8; 268 token.array.stamp = head.wrapping_add(self.one_lap); 269 return true; 270 } 271 Err(h) => { 272 head = h; 273 backoff.spin(); 274 } 275 } 276 } else if stamp == head { 277 atomic::fence(Ordering::SeqCst); 278 let tail = self.tail.load(Ordering::Relaxed); 279 280 // If the tail equals the head, that means the channel is empty. 281 if (tail & !self.mark_bit) == head { 282 // If the channel is disconnected... 283 if tail & self.mark_bit != 0 { 284 // ...then receive an error. 285 token.array.slot = ptr::null(); 286 token.array.stamp = 0; 287 return true; 288 } else { 289 // Otherwise, the receive operation is not ready. 290 return false; 291 } 292 } 293 294 backoff.spin(); 295 head = self.head.load(Ordering::Relaxed); 296 } else { 297 // Snooze because we need to wait for the stamp to get updated. 298 backoff.snooze(); 299 head = self.head.load(Ordering::Relaxed); 300 } 301 } 302 } 303 304 /// Reads a message from the channel. read(&self, token: &mut Token) -> Result<T, ()>305 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { 306 if token.array.slot.is_null() { 307 // The channel is disconnected. 308 return Err(()); 309 } 310 311 let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>(); 312 313 // Read the message from the slot and update the stamp. 314 let msg = slot.msg.get().read().assume_init(); 315 slot.stamp.store(token.array.stamp, Ordering::Release); 316 317 // Wake a sleeping sender. 318 self.senders.notify(); 319 Ok(msg) 320 } 321 322 /// Attempts to send a message into the channel. try_send(&self, msg: T) -> Result<(), TrySendError<T>>323 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { 324 let token = &mut Token::default(); 325 if self.start_send(token) { 326 unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) } 327 } else { 328 Err(TrySendError::Full(msg)) 329 } 330 } 331 332 /// Sends a message into the channel. send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>333 pub(crate) fn send( 334 &self, 335 msg: T, 336 deadline: Option<Instant>, 337 ) -> Result<(), SendTimeoutError<T>> { 338 let token = &mut Token::default(); 339 loop { 340 // Try sending a message several times. 341 let backoff = Backoff::new(); 342 loop { 343 if self.start_send(token) { 344 let res = unsafe { self.write(token, msg) }; 345 return res.map_err(SendTimeoutError::Disconnected); 346 } 347 348 if backoff.is_completed() { 349 break; 350 } else { 351 backoff.snooze(); 352 } 353 } 354 355 if let Some(d) = deadline { 356 if Instant::now() >= d { 357 return Err(SendTimeoutError::Timeout(msg)); 358 } 359 } 360 361 Context::with(|cx| { 362 // Prepare for blocking until a receiver wakes us up. 363 let oper = Operation::hook(token); 364 self.senders.register(oper, cx); 365 366 // Has the channel become ready just now? 367 if !self.is_full() || self.is_disconnected() { 368 let _ = cx.try_select(Selected::Aborted); 369 } 370 371 // Block the current thread. 372 let sel = cx.wait_until(deadline); 373 374 match sel { 375 Selected::Waiting => unreachable!(), 376 Selected::Aborted | Selected::Disconnected => { 377 self.senders.unregister(oper).unwrap(); 378 } 379 Selected::Operation(_) => {} 380 } 381 }); 382 } 383 } 384 385 /// Attempts to receive a message without blocking. try_recv(&self) -> Result<T, TryRecvError>386 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { 387 let token = &mut Token::default(); 388 389 if self.start_recv(token) { 390 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } 391 } else { 392 Err(TryRecvError::Empty) 393 } 394 } 395 396 /// Receives a message from the channel. recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>397 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { 398 let token = &mut Token::default(); 399 loop { 400 // Try receiving a message several times. 401 let backoff = Backoff::new(); 402 loop { 403 if self.start_recv(token) { 404 let res = unsafe { self.read(token) }; 405 return res.map_err(|_| RecvTimeoutError::Disconnected); 406 } 407 408 if backoff.is_completed() { 409 break; 410 } else { 411 backoff.snooze(); 412 } 413 } 414 415 if let Some(d) = deadline { 416 if Instant::now() >= d { 417 return Err(RecvTimeoutError::Timeout); 418 } 419 } 420 421 Context::with(|cx| { 422 // Prepare for blocking until a sender wakes us up. 423 let oper = Operation::hook(token); 424 self.receivers.register(oper, cx); 425 426 // Has the channel become ready just now? 427 if !self.is_empty() || self.is_disconnected() { 428 let _ = cx.try_select(Selected::Aborted); 429 } 430 431 // Block the current thread. 432 let sel = cx.wait_until(deadline); 433 434 match sel { 435 Selected::Waiting => unreachable!(), 436 Selected::Aborted | Selected::Disconnected => { 437 self.receivers.unregister(oper).unwrap(); 438 // If the channel was disconnected, we still have to check for remaining 439 // messages. 440 } 441 Selected::Operation(_) => {} 442 } 443 }); 444 } 445 } 446 447 /// Returns the current number of messages inside the channel. len(&self) -> usize448 pub(crate) fn len(&self) -> usize { 449 loop { 450 // Load the tail, then load the head. 451 let tail = self.tail.load(Ordering::SeqCst); 452 let head = self.head.load(Ordering::SeqCst); 453 454 // If the tail didn't change, we've got consistent values to work with. 455 if self.tail.load(Ordering::SeqCst) == tail { 456 let hix = head & (self.mark_bit - 1); 457 let tix = tail & (self.mark_bit - 1); 458 459 return if hix < tix { 460 tix - hix 461 } else if hix > tix { 462 self.cap - hix + tix 463 } else if (tail & !self.mark_bit) == head { 464 0 465 } else { 466 self.cap 467 }; 468 } 469 } 470 } 471 472 /// Returns the capacity of the channel. capacity(&self) -> Option<usize>473 pub(crate) fn capacity(&self) -> Option<usize> { 474 Some(self.cap) 475 } 476 477 /// Disconnects the channel and wakes up all blocked senders and receivers. 478 /// 479 /// Returns `true` if this call disconnected the channel. disconnect(&self) -> bool480 pub(crate) fn disconnect(&self) -> bool { 481 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); 482 483 if tail & self.mark_bit == 0 { 484 self.senders.disconnect(); 485 self.receivers.disconnect(); 486 true 487 } else { 488 false 489 } 490 } 491 492 /// Returns `true` if the channel is disconnected. is_disconnected(&self) -> bool493 pub(crate) fn is_disconnected(&self) -> bool { 494 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 495 } 496 497 /// Returns `true` if the channel is empty. is_empty(&self) -> bool498 pub(crate) fn is_empty(&self) -> bool { 499 let head = self.head.load(Ordering::SeqCst); 500 let tail = self.tail.load(Ordering::SeqCst); 501 502 // Is the tail equal to the head? 503 // 504 // Note: If the head changes just before we load the tail, that means there was a moment 505 // when the channel was not empty, so it is safe to just return `false`. 506 (tail & !self.mark_bit) == head 507 } 508 509 /// Returns `true` if the channel is full. is_full(&self) -> bool510 pub(crate) fn is_full(&self) -> bool { 511 let tail = self.tail.load(Ordering::SeqCst); 512 let head = self.head.load(Ordering::SeqCst); 513 514 // Is the head lagging one lap behind tail? 515 // 516 // Note: If the tail changes just before we load the head, that means there was a moment 517 // when the channel was not full, so it is safe to just return `false`. 518 head.wrapping_add(self.one_lap) == tail & !self.mark_bit 519 } 520 } 521 522 impl<T> Drop for Channel<T> { drop(&mut self)523 fn drop(&mut self) { 524 if mem::needs_drop::<T>() { 525 // Get the index of the head. 526 let head = *self.head.get_mut(); 527 let tail = *self.tail.get_mut(); 528 529 let hix = head & (self.mark_bit - 1); 530 let tix = tail & (self.mark_bit - 1); 531 532 let len = if hix < tix { 533 tix - hix 534 } else if hix > tix { 535 self.cap - hix + tix 536 } else if (tail & !self.mark_bit) == head { 537 0 538 } else { 539 self.cap 540 }; 541 542 // Loop over all slots that hold a message and drop them. 543 for i in 0..len { 544 // Compute the index of the next slot holding a message. 545 let index = if hix + i < self.cap { 546 hix + i 547 } else { 548 hix + i - self.cap 549 }; 550 551 unsafe { 552 debug_assert!(index < self.buffer.len()); 553 let slot = self.buffer.get_unchecked_mut(index); 554 (*slot.msg.get()).assume_init_drop(); 555 } 556 } 557 } 558 } 559 } 560 561 /// Receiver handle to a channel. 562 pub(crate) struct Receiver<'a, T>(&'a Channel<T>); 563 564 /// Sender handle to a channel. 565 pub(crate) struct Sender<'a, T>(&'a Channel<T>); 566 567 impl<T> SelectHandle for Receiver<'_, T> { try_select(&self, token: &mut Token) -> bool568 fn try_select(&self, token: &mut Token) -> bool { 569 self.0.start_recv(token) 570 } 571 deadline(&self) -> Option<Instant>572 fn deadline(&self) -> Option<Instant> { 573 None 574 } 575 register(&self, oper: Operation, cx: &Context) -> bool576 fn register(&self, oper: Operation, cx: &Context) -> bool { 577 self.0.receivers.register(oper, cx); 578 self.is_ready() 579 } 580 unregister(&self, oper: Operation)581 fn unregister(&self, oper: Operation) { 582 self.0.receivers.unregister(oper); 583 } 584 accept(&self, token: &mut Token, _cx: &Context) -> bool585 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 586 self.try_select(token) 587 } 588 is_ready(&self) -> bool589 fn is_ready(&self) -> bool { 590 !self.0.is_empty() || self.0.is_disconnected() 591 } 592 watch(&self, oper: Operation, cx: &Context) -> bool593 fn watch(&self, oper: Operation, cx: &Context) -> bool { 594 self.0.receivers.watch(oper, cx); 595 self.is_ready() 596 } 597 unwatch(&self, oper: Operation)598 fn unwatch(&self, oper: Operation) { 599 self.0.receivers.unwatch(oper); 600 } 601 } 602 603 impl<T> SelectHandle for Sender<'_, T> { try_select(&self, token: &mut Token) -> bool604 fn try_select(&self, token: &mut Token) -> bool { 605 self.0.start_send(token) 606 } 607 deadline(&self) -> Option<Instant>608 fn deadline(&self) -> Option<Instant> { 609 None 610 } 611 register(&self, oper: Operation, cx: &Context) -> bool612 fn register(&self, oper: Operation, cx: &Context) -> bool { 613 self.0.senders.register(oper, cx); 614 self.is_ready() 615 } 616 unregister(&self, oper: Operation)617 fn unregister(&self, oper: Operation) { 618 self.0.senders.unregister(oper); 619 } 620 accept(&self, token: &mut Token, _cx: &Context) -> bool621 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 622 self.try_select(token) 623 } 624 is_ready(&self) -> bool625 fn is_ready(&self) -> bool { 626 !self.0.is_full() || self.0.is_disconnected() 627 } 628 watch(&self, oper: Operation, cx: &Context) -> bool629 fn watch(&self, oper: Operation, cx: &Context) -> bool { 630 self.0.senders.watch(oper, cx); 631 self.is_ready() 632 } 633 unwatch(&self, oper: Operation)634 fn unwatch(&self, oper: Operation) { 635 self.0.senders.unwatch(oper); 636 } 637 } 638