1 //! Tests for the list channel flavor.
2
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8
9 use crossbeam_channel::{select, unbounded, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16 Duration::from_millis(ms)
17 }
18
19 #[test]
smoke()20 fn smoke() {
21 let (s, r) = unbounded();
22 s.try_send(7).unwrap();
23 assert_eq!(r.try_recv(), Ok(7));
24
25 s.send(8).unwrap();
26 assert_eq!(r.recv(), Ok(8));
27
28 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
29 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
30 }
31
32 #[test]
capacity()33 fn capacity() {
34 let (s, r) = unbounded::<()>();
35 assert_eq!(s.capacity(), None);
36 assert_eq!(r.capacity(), None);
37 }
38
39 #[test]
len_empty_full()40 fn len_empty_full() {
41 let (s, r) = unbounded();
42
43 assert_eq!(s.len(), 0);
44 assert!(s.is_empty());
45 assert!(!s.is_full());
46 assert_eq!(r.len(), 0);
47 assert!(r.is_empty());
48 assert!(!r.is_full());
49
50 s.send(()).unwrap();
51
52 assert_eq!(s.len(), 1);
53 assert!(!s.is_empty());
54 assert!(!s.is_full());
55 assert_eq!(r.len(), 1);
56 assert!(!r.is_empty());
57 assert!(!r.is_full());
58
59 r.recv().unwrap();
60
61 assert_eq!(s.len(), 0);
62 assert!(s.is_empty());
63 assert!(!s.is_full());
64 assert_eq!(r.len(), 0);
65 assert!(r.is_empty());
66 assert!(!r.is_full());
67 }
68
69 #[test]
70 #[cfg_attr(miri, ignore)] // this test makes timing assumptions, but Miri is so slow it violates them
try_recv()71 fn try_recv() {
72 let (s, r) = unbounded();
73
74 scope(|scope| {
75 scope.spawn(move |_| {
76 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
77 thread::sleep(ms(1500));
78 assert_eq!(r.try_recv(), Ok(7));
79 thread::sleep(ms(500));
80 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
81 });
82 scope.spawn(move |_| {
83 thread::sleep(ms(1000));
84 s.send(7).unwrap();
85 });
86 })
87 .unwrap();
88 }
89
90 #[test]
recv()91 fn recv() {
92 let (s, r) = unbounded();
93
94 scope(|scope| {
95 scope.spawn(move |_| {
96 assert_eq!(r.recv(), Ok(7));
97 thread::sleep(ms(1000));
98 assert_eq!(r.recv(), Ok(8));
99 thread::sleep(ms(1000));
100 assert_eq!(r.recv(), Ok(9));
101 assert_eq!(r.recv(), Err(RecvError));
102 });
103 scope.spawn(move |_| {
104 thread::sleep(ms(1500));
105 s.send(7).unwrap();
106 s.send(8).unwrap();
107 s.send(9).unwrap();
108 });
109 })
110 .unwrap();
111 }
112
113 #[test]
recv_timeout()114 fn recv_timeout() {
115 let (s, r) = unbounded::<i32>();
116
117 scope(|scope| {
118 scope.spawn(move |_| {
119 assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
120 assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
121 assert_eq!(
122 r.recv_timeout(ms(1000)),
123 Err(RecvTimeoutError::Disconnected)
124 );
125 });
126 scope.spawn(move |_| {
127 thread::sleep(ms(1500));
128 s.send(7).unwrap();
129 });
130 })
131 .unwrap();
132 }
133
134 #[test]
try_send()135 fn try_send() {
136 #[cfg(miri)]
137 const COUNT: usize = 50;
138 #[cfg(not(miri))]
139 const COUNT: usize = 1000;
140
141 let (s, r) = unbounded();
142 for i in 0..COUNT {
143 assert_eq!(s.try_send(i), Ok(()));
144 }
145
146 drop(r);
147 assert_eq!(s.try_send(777), Err(TrySendError::Disconnected(777)));
148 }
149
150 #[test]
send()151 fn send() {
152 #[cfg(miri)]
153 const COUNT: usize = 50;
154 #[cfg(not(miri))]
155 const COUNT: usize = 1000;
156
157 let (s, r) = unbounded();
158 for i in 0..COUNT {
159 assert_eq!(s.send(i), Ok(()));
160 }
161
162 drop(r);
163 assert_eq!(s.send(777), Err(SendError(777)));
164 }
165
166 #[test]
send_timeout()167 fn send_timeout() {
168 #[cfg(miri)]
169 const COUNT: usize = 50;
170 #[cfg(not(miri))]
171 const COUNT: usize = 1000;
172
173 let (s, r) = unbounded();
174 for i in 0..COUNT {
175 assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(()));
176 }
177
178 drop(r);
179 assert_eq!(
180 s.send_timeout(777, ms(0)),
181 Err(SendTimeoutError::Disconnected(777))
182 );
183 }
184
185 #[test]
send_after_disconnect()186 fn send_after_disconnect() {
187 let (s, r) = unbounded();
188
189 s.send(1).unwrap();
190 s.send(2).unwrap();
191 s.send(3).unwrap();
192
193 drop(r);
194
195 assert_eq!(s.send(4), Err(SendError(4)));
196 assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
197 assert_eq!(
198 s.send_timeout(6, ms(0)),
199 Err(SendTimeoutError::Disconnected(6))
200 );
201 }
202
203 #[test]
recv_after_disconnect()204 fn recv_after_disconnect() {
205 let (s, r) = unbounded();
206
207 s.send(1).unwrap();
208 s.send(2).unwrap();
209 s.send(3).unwrap();
210
211 drop(s);
212
213 assert_eq!(r.recv(), Ok(1));
214 assert_eq!(r.recv(), Ok(2));
215 assert_eq!(r.recv(), Ok(3));
216 assert_eq!(r.recv(), Err(RecvError));
217 }
218
219 #[test]
len()220 fn len() {
221 let (s, r) = unbounded();
222
223 assert_eq!(s.len(), 0);
224 assert_eq!(r.len(), 0);
225
226 for i in 0..50 {
227 s.send(i).unwrap();
228 assert_eq!(s.len(), i + 1);
229 }
230
231 for i in 0..50 {
232 r.recv().unwrap();
233 assert_eq!(r.len(), 50 - i - 1);
234 }
235
236 assert_eq!(s.len(), 0);
237 assert_eq!(r.len(), 0);
238 }
239
240 #[test]
disconnect_wakes_receiver()241 fn disconnect_wakes_receiver() {
242 let (s, r) = unbounded::<()>();
243
244 scope(|scope| {
245 scope.spawn(move |_| {
246 assert_eq!(r.recv(), Err(RecvError));
247 });
248 scope.spawn(move |_| {
249 thread::sleep(ms(1000));
250 drop(s);
251 });
252 })
253 .unwrap();
254 }
255
256 #[test]
spsc()257 fn spsc() {
258 #[cfg(miri)]
259 const COUNT: usize = 100;
260 #[cfg(not(miri))]
261 const COUNT: usize = 100_000;
262
263 let (s, r) = unbounded();
264
265 scope(|scope| {
266 scope.spawn(move |_| {
267 for i in 0..COUNT {
268 assert_eq!(r.recv(), Ok(i));
269 }
270 assert_eq!(r.recv(), Err(RecvError));
271 });
272 scope.spawn(move |_| {
273 for i in 0..COUNT {
274 s.send(i).unwrap();
275 }
276 });
277 })
278 .unwrap();
279 }
280
281 #[test]
mpmc()282 fn mpmc() {
283 #[cfg(miri)]
284 const COUNT: usize = 100;
285 #[cfg(not(miri))]
286 const COUNT: usize = 25_000;
287 const THREADS: usize = 4;
288
289 let (s, r) = unbounded::<usize>();
290 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
291
292 scope(|scope| {
293 for _ in 0..THREADS {
294 scope.spawn(|_| {
295 for _ in 0..COUNT {
296 let n = r.recv().unwrap();
297 v[n].fetch_add(1, Ordering::SeqCst);
298 }
299 });
300 }
301 for _ in 0..THREADS {
302 scope.spawn(|_| {
303 for i in 0..COUNT {
304 s.send(i).unwrap();
305 }
306 });
307 }
308 })
309 .unwrap();
310
311 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
312
313 for c in v {
314 assert_eq!(c.load(Ordering::SeqCst), THREADS);
315 }
316 }
317
318 #[test]
stress_oneshot()319 fn stress_oneshot() {
320 #[cfg(miri)]
321 const COUNT: usize = 100;
322 #[cfg(not(miri))]
323 const COUNT: usize = 10_000;
324
325 for _ in 0..COUNT {
326 let (s, r) = unbounded();
327
328 scope(|scope| {
329 scope.spawn(|_| r.recv().unwrap());
330 scope.spawn(|_| s.send(0).unwrap());
331 })
332 .unwrap();
333 }
334 }
335
336 #[test]
stress_iter()337 fn stress_iter() {
338 #[cfg(miri)]
339 const COUNT: usize = 100;
340 #[cfg(not(miri))]
341 const COUNT: usize = 100_000;
342
343 let (request_s, request_r) = unbounded();
344 let (response_s, response_r) = unbounded();
345
346 scope(|scope| {
347 scope.spawn(move |_| {
348 let mut count = 0;
349 loop {
350 for x in response_r.try_iter() {
351 count += x;
352 if count == COUNT {
353 return;
354 }
355 }
356 request_s.send(()).unwrap();
357 }
358 });
359
360 for _ in request_r.iter() {
361 if response_s.send(1).is_err() {
362 break;
363 }
364 }
365 })
366 .unwrap();
367 }
368
369 #[test]
stress_timeout_two_threads()370 fn stress_timeout_two_threads() {
371 const COUNT: usize = 100;
372
373 let (s, r) = unbounded();
374
375 scope(|scope| {
376 scope.spawn(|_| {
377 for i in 0..COUNT {
378 if i % 2 == 0 {
379 thread::sleep(ms(50));
380 }
381 s.send(i).unwrap();
382 }
383 });
384
385 scope.spawn(|_| {
386 for i in 0..COUNT {
387 if i % 2 == 0 {
388 thread::sleep(ms(50));
389 }
390 loop {
391 if let Ok(x) = r.recv_timeout(ms(10)) {
392 assert_eq!(x, i);
393 break;
394 }
395 }
396 }
397 });
398 })
399 .unwrap();
400 }
401
402 #[test]
drops()403 fn drops() {
404 #[cfg(miri)]
405 const RUNS: usize = 20;
406 #[cfg(not(miri))]
407 const RUNS: usize = 100;
408 #[cfg(miri)]
409 const STEPS: usize = 100;
410 #[cfg(not(miri))]
411 const STEPS: usize = 10_000;
412
413 static DROPS: AtomicUsize = AtomicUsize::new(0);
414
415 #[derive(Debug, PartialEq)]
416 struct DropCounter;
417
418 impl Drop for DropCounter {
419 fn drop(&mut self) {
420 DROPS.fetch_add(1, Ordering::SeqCst);
421 }
422 }
423
424 let mut rng = thread_rng();
425
426 for _ in 0..RUNS {
427 let steps = rng.gen_range(0..STEPS);
428 let additional = rng.gen_range(0..STEPS / 10);
429
430 DROPS.store(0, Ordering::SeqCst);
431 let (s, r) = unbounded::<DropCounter>();
432
433 scope(|scope| {
434 scope.spawn(|_| {
435 for _ in 0..steps {
436 r.recv().unwrap();
437 }
438 });
439
440 scope.spawn(|_| {
441 for _ in 0..steps {
442 s.send(DropCounter).unwrap();
443 }
444 });
445 })
446 .unwrap();
447
448 for _ in 0..additional {
449 s.try_send(DropCounter).unwrap();
450 }
451
452 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
453 drop(s);
454 drop(r);
455 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
456 }
457 }
458
459 #[test]
linearizable()460 fn linearizable() {
461 #[cfg(miri)]
462 const COUNT: usize = 100;
463 #[cfg(not(miri))]
464 const COUNT: usize = 25_000;
465 const THREADS: usize = 4;
466
467 let (s, r) = unbounded();
468
469 scope(|scope| {
470 for _ in 0..THREADS {
471 scope.spawn(|_| {
472 for _ in 0..COUNT {
473 s.send(0).unwrap();
474 r.try_recv().unwrap();
475 }
476 });
477 }
478 })
479 .unwrap();
480 }
481
482 #[test]
fairness()483 fn fairness() {
484 #[cfg(miri)]
485 const COUNT: usize = 100;
486 #[cfg(not(miri))]
487 const COUNT: usize = 10_000;
488
489 let (s1, r1) = unbounded::<()>();
490 let (s2, r2) = unbounded::<()>();
491
492 for _ in 0..COUNT {
493 s1.send(()).unwrap();
494 s2.send(()).unwrap();
495 }
496
497 let mut hits = [0usize; 2];
498 for _ in 0..COUNT {
499 select! {
500 recv(r1) -> _ => hits[0] += 1,
501 recv(r2) -> _ => hits[1] += 1,
502 }
503 }
504 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
505 }
506
507 #[test]
fairness_duplicates()508 fn fairness_duplicates() {
509 #[cfg(miri)]
510 const COUNT: usize = 100;
511 #[cfg(not(miri))]
512 const COUNT: usize = 10_000;
513
514 let (s, r) = unbounded();
515
516 for _ in 0..COUNT {
517 s.send(()).unwrap();
518 }
519
520 let mut hits = [0usize; 5];
521 for _ in 0..COUNT {
522 select! {
523 recv(r) -> _ => hits[0] += 1,
524 recv(r) -> _ => hits[1] += 1,
525 recv(r) -> _ => hits[2] += 1,
526 recv(r) -> _ => hits[3] += 1,
527 recv(r) -> _ => hits[4] += 1,
528 }
529 }
530 assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
531 }
532
533 #[test]
recv_in_send()534 fn recv_in_send() {
535 let (s, r) = unbounded();
536 s.send(()).unwrap();
537
538 select! {
539 send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
540 }
541 }
542
543 #[test]
channel_through_channel()544 fn channel_through_channel() {
545 #[cfg(miri)]
546 const COUNT: usize = 100;
547 #[cfg(not(miri))]
548 const COUNT: usize = 1000;
549
550 type T = Box<dyn Any + Send>;
551
552 let (s, r) = unbounded::<T>();
553
554 scope(|scope| {
555 scope.spawn(move |_| {
556 let mut s = s;
557
558 for _ in 0..COUNT {
559 let (new_s, new_r) = unbounded();
560 let new_r: T = Box::new(Some(new_r));
561
562 s.send(new_r).unwrap();
563 s = new_s;
564 }
565 });
566
567 scope.spawn(move |_| {
568 let mut r = r;
569
570 for _ in 0..COUNT {
571 r = r
572 .recv()
573 .unwrap()
574 .downcast_mut::<Option<Receiver<T>>>()
575 .unwrap()
576 .take()
577 .unwrap()
578 }
579 });
580 })
581 .unwrap();
582 }
583