1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4
5 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
8 use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
9
10 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
11 use tokio::test as maybe_tokio_test;
12
13 use std::fmt;
14 use std::panic;
15 use std::sync::Arc;
16 use tokio::sync::mpsc;
17 use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
18 use tokio_test::*;
19
20 #[cfg(not(target_family = "wasm"))]
21 mod support {
22 pub(crate) mod mpsc_stream;
23 }
24
25 #[allow(unused)]
26 trait AssertRefUnwindSafe: panic::RefUnwindSafe {}
27 impl<T> AssertRefUnwindSafe for mpsc::OwnedPermit<T> {}
28 impl<'a, T> AssertRefUnwindSafe for mpsc::Permit<'a, T> {}
29 impl<'a, T> AssertRefUnwindSafe for mpsc::PermitIterator<'a, T> {}
30 impl<T> AssertRefUnwindSafe for mpsc::Receiver<T> {}
31 impl<T> AssertRefUnwindSafe for mpsc::Sender<T> {}
32 impl<T> AssertRefUnwindSafe for mpsc::UnboundedReceiver<T> {}
33 impl<T> AssertRefUnwindSafe for mpsc::UnboundedSender<T> {}
34 impl<T> AssertRefUnwindSafe for mpsc::WeakSender<T> {}
35 impl<T> AssertRefUnwindSafe for mpsc::WeakUnboundedSender<T> {}
36
37 #[allow(unused)]
38 trait AssertUnwindSafe: panic::UnwindSafe {}
39 impl<T> AssertUnwindSafe for mpsc::OwnedPermit<T> {}
40 impl<'a, T> AssertUnwindSafe for mpsc::Permit<'a, T> {}
41 impl<'a, T> AssertUnwindSafe for mpsc::PermitIterator<'a, T> {}
42 impl<T> AssertUnwindSafe for mpsc::Receiver<T> {}
43 impl<T> AssertUnwindSafe for mpsc::Sender<T> {}
44 impl<T> AssertUnwindSafe for mpsc::UnboundedReceiver<T> {}
45 impl<T> AssertUnwindSafe for mpsc::UnboundedSender<T> {}
46 impl<T> AssertUnwindSafe for mpsc::WeakSender<T> {}
47 impl<T> AssertUnwindSafe for mpsc::WeakUnboundedSender<T> {}
48
49 #[maybe_tokio_test]
send_recv_with_buffer()50 async fn send_recv_with_buffer() {
51 let (tx, mut rx) = mpsc::channel::<i32>(16);
52
53 // Using poll_ready / try_send
54 // let permit assert_ready_ok!(tx.reserve());
55 let permit = tx.reserve().await.unwrap();
56 permit.send(1);
57
58 // Without poll_ready
59 tx.try_send(2).unwrap();
60
61 drop(tx);
62
63 let val = rx.recv().await;
64 assert_eq!(val, Some(1));
65
66 let val = rx.recv().await;
67 assert_eq!(val, Some(2));
68
69 let val = rx.recv().await;
70 assert!(val.is_none());
71 }
72
73 #[tokio::test]
74 #[cfg(feature = "full")]
reserve_disarm()75 async fn reserve_disarm() {
76 let (tx, mut rx) = mpsc::channel::<i32>(2);
77 let tx1 = tx.clone();
78 let tx2 = tx.clone();
79 let tx3 = tx.clone();
80 let tx4 = tx;
81
82 // We should be able to `poll_ready` two handles without problem
83 let permit1 = assert_ok!(tx1.reserve().await);
84 let permit2 = assert_ok!(tx2.reserve().await);
85
86 // But a third should not be ready
87 let mut r3 = tokio_test::task::spawn(tx3.reserve());
88 assert_pending!(r3.poll());
89
90 let mut r4 = tokio_test::task::spawn(tx4.reserve());
91 assert_pending!(r4.poll());
92
93 // Using one of the reserved slots should allow a new handle to become ready
94 permit1.send(1);
95
96 // We also need to receive for the slot to be free
97 assert!(!r3.is_woken());
98 rx.recv().await.unwrap();
99 // Now there's a free slot!
100 assert!(r3.is_woken());
101 assert!(!r4.is_woken());
102
103 // Dropping a permit should also open up a slot
104 drop(permit2);
105 assert!(r4.is_woken());
106
107 let mut r1 = tokio_test::task::spawn(tx1.reserve());
108 assert_pending!(r1.poll());
109 }
110
111 #[tokio::test]
112 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
send_recv_stream_with_buffer()113 async fn send_recv_stream_with_buffer() {
114 use tokio_stream::StreamExt;
115
116 let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
117 let mut rx = Box::pin(rx);
118
119 tokio::spawn(async move {
120 assert_ok!(tx.send(1).await);
121 assert_ok!(tx.send(2).await);
122 });
123
124 assert_eq!(Some(1), rx.next().await);
125 assert_eq!(Some(2), rx.next().await);
126 assert_eq!(None, rx.next().await);
127 }
128
129 #[tokio::test]
130 #[cfg(feature = "full")]
async_send_recv_with_buffer()131 async fn async_send_recv_with_buffer() {
132 let (tx, mut rx) = mpsc::channel(16);
133
134 tokio::spawn(async move {
135 assert_ok!(tx.send(1).await);
136 assert_ok!(tx.send(2).await);
137 });
138
139 assert_eq!(Some(1), rx.recv().await);
140 assert_eq!(Some(2), rx.recv().await);
141 assert_eq!(None, rx.recv().await);
142 }
143
144 #[tokio::test]
145 #[cfg(feature = "full")]
async_send_recv_many_with_buffer()146 async fn async_send_recv_many_with_buffer() {
147 let (tx, mut rx) = mpsc::channel(2);
148 let mut buffer = Vec::<i32>::with_capacity(3);
149
150 // With `limit=0` does not sleep, returns immediately
151 assert_eq!(0, rx.recv_many(&mut buffer, 0).await);
152
153 let handle = tokio::spawn(async move {
154 assert_ok!(tx.send(1).await);
155 assert_ok!(tx.send(2).await);
156 assert_ok!(tx.send(7).await);
157 assert_ok!(tx.send(0).await);
158 });
159
160 let limit = 3;
161 let mut recv_count = 0usize;
162 while recv_count < 4 {
163 recv_count += rx.recv_many(&mut buffer, limit).await;
164 assert_eq!(buffer.len(), recv_count);
165 }
166
167 assert_eq!(vec![1, 2, 7, 0], buffer);
168 assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
169 handle.await.unwrap();
170 }
171
172 #[tokio::test]
173 #[cfg(feature = "full")]
start_send_past_cap()174 async fn start_send_past_cap() {
175 use std::future::Future;
176
177 let mut t1 = tokio_test::task::spawn(());
178
179 let (tx1, mut rx) = mpsc::channel(1);
180 let tx2 = tx1.clone();
181
182 assert_ok!(tx1.try_send(()));
183
184 let mut r1 = Box::pin(tx1.reserve());
185 t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
186
187 {
188 let mut r2 = tokio_test::task::spawn(tx2.reserve());
189 assert_pending!(r2.poll());
190
191 drop(r1);
192
193 assert!(rx.recv().await.is_some());
194
195 assert!(r2.is_woken());
196 assert!(!t1.is_woken());
197 }
198
199 drop(tx1);
200 drop(tx2);
201
202 assert!(rx.recv().await.is_none());
203 }
204
205 #[test]
206 #[should_panic]
207 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
buffer_gteq_one()208 fn buffer_gteq_one() {
209 mpsc::channel::<i32>(0);
210 }
211
212 #[maybe_tokio_test]
send_recv_unbounded()213 async fn send_recv_unbounded() {
214 let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
215
216 // Using `try_send`
217 assert_ok!(tx.send(1));
218 assert_ok!(tx.send(2));
219
220 assert_eq!(rx.recv().await, Some(1));
221 assert_eq!(rx.recv().await, Some(2));
222
223 drop(tx);
224
225 assert!(rx.recv().await.is_none());
226 }
227
228 #[maybe_tokio_test]
send_recv_many_unbounded()229 async fn send_recv_many_unbounded() {
230 let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
231
232 let mut buffer: Vec<i32> = Vec::new();
233
234 // With `limit=0` does not sleep, returns immediately
235 rx.recv_many(&mut buffer, 0).await;
236 assert_eq!(0, buffer.len());
237
238 assert_ok!(tx.send(7));
239 assert_ok!(tx.send(13));
240 assert_ok!(tx.send(100));
241 assert_ok!(tx.send(1002));
242
243 rx.recv_many(&mut buffer, 0).await;
244 assert_eq!(0, buffer.len());
245
246 let mut count = 0;
247 while count < 4 {
248 count += rx.recv_many(&mut buffer, 1).await;
249 }
250 assert_eq!(count, 4);
251 assert_eq!(vec![7, 13, 100, 1002], buffer);
252 let final_capacity = buffer.capacity();
253 assert!(final_capacity > 0);
254
255 buffer.clear();
256
257 assert_ok!(tx.send(5));
258 assert_ok!(tx.send(6));
259 assert_ok!(tx.send(7));
260 assert_ok!(tx.send(2));
261
262 // Re-use existing capacity
263 count = rx.recv_many(&mut buffer, 32).await;
264
265 assert_eq!(final_capacity, buffer.capacity());
266 assert_eq!(count, 4);
267 assert_eq!(vec![5, 6, 7, 2], buffer);
268
269 drop(tx);
270
271 // recv_many will immediately return zero if the channel
272 // is closed and no more messages are waiting
273 assert_eq!(0, rx.recv_many(&mut buffer, 4).await);
274 assert!(rx.recv().await.is_none());
275 }
276
277 #[tokio::test]
278 #[cfg(feature = "full")]
send_recv_many_bounded_capacity()279 async fn send_recv_many_bounded_capacity() {
280 let mut buffer: Vec<String> = Vec::with_capacity(9);
281 let limit = buffer.capacity();
282 let (tx, mut rx) = mpsc::channel(100);
283
284 let mut expected: Vec<String> = (0..limit)
285 .map(|x: usize| format!("{x}"))
286 .collect::<Vec<_>>();
287 for x in expected.clone() {
288 tx.send(x).await.unwrap()
289 }
290 tx.send("one more".to_string()).await.unwrap();
291
292 // Here `recv_many` receives all but the last value;
293 // the initial capacity is adequate, so the buffer does
294 // not increase in side.
295 assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await);
296 assert_eq!(expected, buffer);
297 assert_eq!(limit, buffer.capacity());
298
299 // Receive up more values:
300 assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
301 assert!(buffer.capacity() > limit);
302 expected.push("one more".to_string());
303 assert_eq!(expected, buffer);
304
305 tokio::spawn(async move {
306 tx.send("final".to_string()).await.unwrap();
307 });
308
309 // 'tx' is dropped, but `recv_many` is guaranteed not
310 // to return 0 as the channel has outstanding permits
311 assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
312 expected.push("final".to_string());
313 assert_eq!(expected, buffer);
314 // The channel is now closed and `recv_many` returns 0.
315 assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
316 assert_eq!(expected, buffer);
317 }
318
319 #[tokio::test]
320 #[cfg(feature = "full")]
send_recv_many_unbounded_capacity()321 async fn send_recv_many_unbounded_capacity() {
322 let mut buffer: Vec<String> = Vec::with_capacity(9); // capacity >= 9
323 let limit = buffer.capacity();
324 let (tx, mut rx) = mpsc::unbounded_channel();
325
326 let mut expected: Vec<String> = (0..limit)
327 .map(|x: usize| format!("{x}"))
328 .collect::<Vec<_>>();
329 for x in expected.clone() {
330 tx.send(x).unwrap()
331 }
332 tx.send("one more".to_string()).unwrap();
333
334 // Here `recv_many` receives all but the last value;
335 // the initial capacity is adequate, so the buffer does
336 // not increase in side.
337 assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await);
338 assert_eq!(expected, buffer);
339 assert_eq!(limit, buffer.capacity());
340
341 // Receive up more values:
342 assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
343 assert!(buffer.capacity() > limit);
344 expected.push("one more".to_string());
345 assert_eq!(expected, buffer);
346
347 tokio::spawn(async move {
348 tx.send("final".to_string()).unwrap();
349 });
350
351 // 'tx' is dropped, but `recv_many` is guaranteed not
352 // to return 0 as the channel has outstanding permits
353 assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
354 expected.push("final".to_string());
355 assert_eq!(expected, buffer);
356 // The channel is now closed and `recv_many` returns 0.
357 assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
358 assert_eq!(expected, buffer);
359 }
360
361 #[tokio::test]
362 #[cfg(feature = "full")]
async_send_recv_unbounded()363 async fn async_send_recv_unbounded() {
364 let (tx, mut rx) = mpsc::unbounded_channel();
365
366 tokio::spawn(async move {
367 assert_ok!(tx.send(1));
368 assert_ok!(tx.send(2));
369 });
370
371 assert_eq!(Some(1), rx.recv().await);
372 assert_eq!(Some(2), rx.recv().await);
373 assert_eq!(None, rx.recv().await);
374 }
375
376 #[tokio::test]
377 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
send_recv_stream_unbounded()378 async fn send_recv_stream_unbounded() {
379 use tokio_stream::StreamExt;
380
381 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();
382
383 let mut rx = Box::pin(rx);
384
385 tokio::spawn(async move {
386 assert_ok!(tx.send(1));
387 assert_ok!(tx.send(2));
388 });
389
390 assert_eq!(Some(1), rx.next().await);
391 assert_eq!(Some(2), rx.next().await);
392 assert_eq!(None, rx.next().await);
393 }
394
395 #[maybe_tokio_test]
no_t_bounds_buffer()396 async fn no_t_bounds_buffer() {
397 struct NoImpls;
398
399 let (tx, mut rx) = mpsc::channel(100);
400
401 // sender should be Debug even though T isn't Debug
402 is_debug(&tx);
403 // same with Receiver
404 is_debug(&rx);
405 // and sender should be Clone even though T isn't Clone
406 assert!(tx.clone().try_send(NoImpls).is_ok());
407
408 assert!(rx.recv().await.is_some());
409 }
410
411 #[maybe_tokio_test]
no_t_bounds_unbounded()412 async fn no_t_bounds_unbounded() {
413 struct NoImpls;
414
415 let (tx, mut rx) = mpsc::unbounded_channel();
416
417 // sender should be Debug even though T isn't Debug
418 is_debug(&tx);
419 // same with Receiver
420 is_debug(&rx);
421 // and sender should be Clone even though T isn't Clone
422 assert!(tx.clone().send(NoImpls).is_ok());
423
424 assert!(rx.recv().await.is_some());
425 }
426
427 #[tokio::test]
428 #[cfg(feature = "full")]
send_recv_buffer_limited()429 async fn send_recv_buffer_limited() {
430 let (tx, mut rx) = mpsc::channel::<i32>(1);
431
432 // Reserve capacity
433 let p1 = assert_ok!(tx.reserve().await);
434
435 // Send first message
436 p1.send(1);
437
438 // Not ready
439 let mut p2 = tokio_test::task::spawn(tx.reserve());
440 assert_pending!(p2.poll());
441
442 // Take the value
443 assert!(rx.recv().await.is_some());
444
445 // Notified
446 assert!(p2.is_woken());
447
448 // Trying to send fails
449 assert_err!(tx.try_send(1337));
450
451 // Send second
452 let permit = assert_ready_ok!(p2.poll());
453 permit.send(2);
454
455 assert!(rx.recv().await.is_some());
456 }
457
458 #[maybe_tokio_test]
recv_close_gets_none_idle()459 async fn recv_close_gets_none_idle() {
460 let (tx, mut rx) = mpsc::channel::<i32>(10);
461
462 rx.close();
463
464 assert!(rx.recv().await.is_none());
465
466 assert_err!(tx.send(1).await);
467 }
468
469 #[tokio::test]
470 #[cfg(feature = "full")]
recv_close_gets_none_reserved()471 async fn recv_close_gets_none_reserved() {
472 let (tx1, mut rx) = mpsc::channel::<i32>(1);
473 let tx2 = tx1.clone();
474
475 let permit1 = assert_ok!(tx1.reserve().await);
476 let mut permit2 = tokio_test::task::spawn(tx2.reserve());
477 assert_pending!(permit2.poll());
478
479 rx.close();
480
481 assert!(permit2.is_woken());
482 assert_ready_err!(permit2.poll());
483
484 {
485 let mut recv = tokio_test::task::spawn(rx.recv());
486 assert_pending!(recv.poll());
487
488 permit1.send(123);
489 assert!(recv.is_woken());
490
491 let v = assert_ready!(recv.poll());
492 assert_eq!(v, Some(123));
493 }
494
495 assert!(rx.recv().await.is_none());
496 }
497
498 #[maybe_tokio_test]
tx_close_gets_none()499 async fn tx_close_gets_none() {
500 let (_, mut rx) = mpsc::channel::<i32>(10);
501 assert!(rx.recv().await.is_none());
502 }
503
504 #[maybe_tokio_test]
try_send_fail()505 async fn try_send_fail() {
506 let (tx, mut rx) = mpsc::channel(1);
507
508 tx.try_send("hello").unwrap();
509
510 // This should fail
511 match assert_err!(tx.try_send("fail")) {
512 TrySendError::Full(..) => {}
513 _ => panic!(),
514 }
515
516 assert_eq!(rx.recv().await, Some("hello"));
517
518 assert_ok!(tx.try_send("goodbye"));
519 drop(tx);
520
521 assert_eq!(rx.recv().await, Some("goodbye"));
522 assert!(rx.recv().await.is_none());
523 }
524
525 #[maybe_tokio_test]
try_send_fail_with_try_recv()526 async fn try_send_fail_with_try_recv() {
527 let (tx, mut rx) = mpsc::channel(1);
528
529 tx.try_send("hello").unwrap();
530
531 // This should fail
532 match assert_err!(tx.try_send("fail")) {
533 TrySendError::Full(..) => {}
534 _ => panic!(),
535 }
536
537 assert_eq!(rx.try_recv(), Ok("hello"));
538
539 assert_ok!(tx.try_send("goodbye"));
540 drop(tx);
541
542 assert_eq!(rx.try_recv(), Ok("goodbye"));
543 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
544 }
545
546 #[maybe_tokio_test]
reserve_many_above_cap()547 async fn reserve_many_above_cap() {
548 const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS;
549 let (tx, _rx) = mpsc::channel::<()>(1);
550
551 assert_err!(tx.reserve_many(2).await);
552 assert_err!(tx.reserve_many(MAX_PERMITS + 1).await);
553 assert_err!(tx.reserve_many(usize::MAX).await);
554 }
555
556 #[test]
try_reserve_many_zero()557 fn try_reserve_many_zero() {
558 let (tx, rx) = mpsc::channel::<()>(1);
559
560 // Succeeds when not closed.
561 assert!(assert_ok!(tx.try_reserve_many(0)).next().is_none());
562
563 // Even when channel is full.
564 tx.try_send(()).unwrap();
565 assert!(assert_ok!(tx.try_reserve_many(0)).next().is_none());
566
567 drop(rx);
568
569 // Closed error when closed.
570 assert_eq!(
571 assert_err!(tx.try_reserve_many(0)),
572 TrySendError::Closed(())
573 );
574 }
575
576 #[maybe_tokio_test]
reserve_many_zero()577 async fn reserve_many_zero() {
578 let (tx, rx) = mpsc::channel::<()>(1);
579
580 // Succeeds when not closed.
581 assert!(assert_ok!(tx.reserve_many(0).await).next().is_none());
582
583 // Even when channel is full.
584 tx.send(()).await.unwrap();
585 assert!(assert_ok!(tx.reserve_many(0).await).next().is_none());
586
587 drop(rx);
588
589 // Closed error when closed.
590 assert_err!(tx.reserve_many(0).await);
591 }
592
593 #[maybe_tokio_test]
try_reserve_many_edge_cases()594 async fn try_reserve_many_edge_cases() {
595 const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS;
596
597 let (tx, rx) = mpsc::channel::<()>(1);
598
599 let mut permit = assert_ok!(tx.try_reserve_many(0));
600 assert!(permit.next().is_none());
601
602 let permit = tx.try_reserve_many(MAX_PERMITS + 1);
603 match assert_err!(permit) {
604 TrySendError::Full(..) => {}
605 _ => panic!(),
606 }
607
608 let permit = tx.try_reserve_many(usize::MAX);
609 match assert_err!(permit) {
610 TrySendError::Full(..) => {}
611 _ => panic!(),
612 }
613
614 // Dropping the receiver should close the channel
615 drop(rx);
616 assert_err!(tx.reserve_many(0).await);
617 }
618
619 #[maybe_tokio_test]
try_reserve_fails()620 async fn try_reserve_fails() {
621 let (tx, mut rx) = mpsc::channel(1);
622
623 let permit = tx.try_reserve().unwrap();
624
625 // This should fail
626 match assert_err!(tx.try_reserve()) {
627 TrySendError::Full(()) => {}
628 _ => panic!(),
629 }
630
631 permit.send("foo");
632
633 assert_eq!(rx.recv().await, Some("foo"));
634
635 // Dropping permit releases the slot.
636 let permit = tx.try_reserve().unwrap();
637 drop(permit);
638
639 let _permit = tx.try_reserve().unwrap();
640 }
641
642 #[maybe_tokio_test]
reserve_many_and_send()643 async fn reserve_many_and_send() {
644 let (tx, mut rx) = mpsc::channel(100);
645 for i in 0..100 {
646 for permit in assert_ok!(tx.reserve_many(i).await) {
647 permit.send("foo");
648 assert_eq!(rx.recv().await, Some("foo"));
649 }
650 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
651 }
652 }
653 #[maybe_tokio_test]
try_reserve_many_and_send()654 async fn try_reserve_many_and_send() {
655 let (tx, mut rx) = mpsc::channel(100);
656 for i in 0..100 {
657 for permit in assert_ok!(tx.try_reserve_many(i)) {
658 permit.send("foo");
659 assert_eq!(rx.recv().await, Some("foo"));
660 }
661 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
662 }
663 }
664
665 #[maybe_tokio_test]
reserve_many_on_closed_channel()666 async fn reserve_many_on_closed_channel() {
667 let (tx, rx) = mpsc::channel::<()>(100);
668 drop(rx);
669 assert_err!(tx.reserve_many(10).await);
670 }
671
672 #[maybe_tokio_test]
try_reserve_many_on_closed_channel()673 async fn try_reserve_many_on_closed_channel() {
674 let (tx, rx) = mpsc::channel::<usize>(100);
675 drop(rx);
676 match assert_err!(tx.try_reserve_many(10)) {
677 TrySendError::Closed(()) => {}
678 _ => panic!(),
679 };
680 }
681
682 #[maybe_tokio_test]
683 #[cfg_attr(miri, ignore)] // Too slow on miri.
try_reserve_many_full()684 async fn try_reserve_many_full() {
685 // Reserve n capacity and send k messages
686 for n in 1..100 {
687 for k in 0..n {
688 let (tx, mut rx) = mpsc::channel::<usize>(n);
689 let permits = assert_ok!(tx.try_reserve_many(n));
690
691 assert_eq!(permits.len(), n);
692 assert_eq!(tx.capacity(), 0);
693
694 match assert_err!(tx.try_reserve_many(1)) {
695 TrySendError::Full(..) => {}
696 _ => panic!(),
697 };
698
699 for permit in permits.take(k) {
700 permit.send(0);
701 }
702 // We only used k permits on the n reserved
703 assert_eq!(tx.capacity(), n - k);
704
705 // We can reserve more permits
706 assert_ok!(tx.try_reserve_many(1));
707
708 // But not more than the current capacity
709 match assert_err!(tx.try_reserve_many(n - k + 1)) {
710 TrySendError::Full(..) => {}
711 _ => panic!(),
712 };
713
714 for _i in 0..k {
715 assert_eq!(rx.recv().await, Some(0));
716 }
717
718 // Now that we've received everything, capacity should be back to n
719 assert_eq!(tx.capacity(), n);
720 }
721 }
722 }
723
724 #[tokio::test]
725 #[cfg(feature = "full")]
drop_permit_releases_permit()726 async fn drop_permit_releases_permit() {
727 // poll_ready reserves capacity, ensure that the capacity is released if tx
728 // is dropped w/o sending a value.
729 let (tx1, _rx) = mpsc::channel::<i32>(1);
730 let tx2 = tx1.clone();
731
732 let permit = assert_ok!(tx1.reserve().await);
733
734 let mut reserve2 = tokio_test::task::spawn(tx2.reserve());
735 assert_pending!(reserve2.poll());
736
737 drop(permit);
738
739 assert!(reserve2.is_woken());
740 assert_ready_ok!(reserve2.poll());
741 }
742
743 #[maybe_tokio_test]
drop_permit_iterator_releases_permits()744 async fn drop_permit_iterator_releases_permits() {
745 // poll_ready reserves capacity, ensure that the capacity is released if tx
746 // is dropped w/o sending a value.
747 for n in 1..100 {
748 let (tx1, _rx) = mpsc::channel::<i32>(n);
749 let tx2 = tx1.clone();
750
751 let permits = assert_ok!(tx1.reserve_many(n).await);
752
753 let mut reserve2 = tokio_test::task::spawn(tx2.reserve_many(n));
754 assert_pending!(reserve2.poll());
755
756 drop(permits);
757
758 assert!(reserve2.is_woken());
759
760 let permits = assert_ready_ok!(reserve2.poll());
761 drop(permits);
762
763 assert_eq!(tx1.capacity(), n);
764 }
765 }
766
767 #[maybe_tokio_test]
dropping_rx_closes_channel()768 async fn dropping_rx_closes_channel() {
769 let (tx, rx) = mpsc::channel(100);
770
771 let msg = Arc::new(());
772 assert_ok!(tx.try_send(msg.clone()));
773
774 drop(rx);
775 assert_err!(tx.reserve().await);
776 assert_err!(tx.reserve_many(10).await);
777 assert_eq!(1, Arc::strong_count(&msg));
778 }
779
780 #[test]
dropping_rx_closes_channel_for_try()781 fn dropping_rx_closes_channel_for_try() {
782 let (tx, rx) = mpsc::channel(100);
783
784 let msg = Arc::new(());
785 tx.try_send(msg.clone()).unwrap();
786
787 drop(rx);
788
789 assert!(matches!(
790 tx.try_send(msg.clone()),
791 Err(TrySendError::Closed(_))
792 ));
793 assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
794 assert!(matches!(
795 tx.try_reserve_owned(),
796 Err(TrySendError::Closed(_))
797 ));
798
799 assert_eq!(1, Arc::strong_count(&msg));
800 }
801
802 #[test]
unconsumed_messages_are_dropped()803 fn unconsumed_messages_are_dropped() {
804 let msg = Arc::new(());
805
806 let (tx, rx) = mpsc::channel(100);
807
808 tx.try_send(msg.clone()).unwrap();
809
810 assert_eq!(2, Arc::strong_count(&msg));
811
812 drop((tx, rx));
813
814 assert_eq!(1, Arc::strong_count(&msg));
815 }
816
817 #[test]
818 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
blocking_recv()819 fn blocking_recv() {
820 let (tx, mut rx) = mpsc::channel::<u8>(1);
821
822 let sync_code = std::thread::spawn(move || {
823 assert_eq!(Some(10), rx.blocking_recv());
824 });
825
826 tokio::runtime::Runtime::new()
827 .unwrap()
828 .block_on(async move {
829 let _ = tx.send(10).await;
830 });
831 sync_code.join().unwrap()
832 }
833
834 #[tokio::test]
835 #[should_panic]
836 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
blocking_recv_async()837 async fn blocking_recv_async() {
838 let (_tx, mut rx) = mpsc::channel::<()>(1);
839 let _ = rx.blocking_recv();
840 }
841
842 #[test]
843 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
blocking_send()844 fn blocking_send() {
845 let (tx, mut rx) = mpsc::channel::<u8>(1);
846
847 let sync_code = std::thread::spawn(move || {
848 tx.blocking_send(10).unwrap();
849 });
850
851 tokio::runtime::Runtime::new()
852 .unwrap()
853 .block_on(async move {
854 assert_eq!(Some(10), rx.recv().await);
855 });
856 sync_code.join().unwrap()
857 }
858
859 #[tokio::test]
860 #[should_panic]
861 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
blocking_send_async()862 async fn blocking_send_async() {
863 let (tx, _rx) = mpsc::channel::<()>(1);
864 let _ = tx.blocking_send(());
865 }
866
867 #[tokio::test]
868 #[cfg(feature = "full")]
ready_close_cancel_bounded()869 async fn ready_close_cancel_bounded() {
870 let (tx, mut rx) = mpsc::channel::<()>(100);
871 let _tx2 = tx.clone();
872
873 let permit = assert_ok!(tx.reserve().await);
874
875 rx.close();
876
877 let mut recv = tokio_test::task::spawn(rx.recv());
878 assert_pending!(recv.poll());
879
880 drop(permit);
881
882 assert!(recv.is_woken());
883 let val = assert_ready!(recv.poll());
884 assert!(val.is_none());
885 }
886
887 #[tokio::test]
888 #[cfg(feature = "full")]
permit_available_not_acquired_close()889 async fn permit_available_not_acquired_close() {
890 let (tx1, mut rx) = mpsc::channel::<()>(1);
891 let tx2 = tx1.clone();
892
893 let permit1 = assert_ok!(tx1.reserve().await);
894
895 let mut permit2 = tokio_test::task::spawn(tx2.reserve());
896 assert_pending!(permit2.poll());
897
898 rx.close();
899
900 drop(permit1);
901 assert!(permit2.is_woken());
902
903 drop(permit2);
904 assert!(rx.recv().await.is_none());
905 }
906
907 #[test]
try_recv_bounded()908 fn try_recv_bounded() {
909 let (tx, mut rx) = mpsc::channel(5);
910
911 tx.try_send("hello").unwrap();
912 tx.try_send("hello").unwrap();
913 tx.try_send("hello").unwrap();
914 tx.try_send("hello").unwrap();
915 tx.try_send("hello").unwrap();
916 assert!(tx.try_send("hello").is_err());
917
918 assert_eq!(Ok("hello"), rx.try_recv());
919 assert_eq!(Ok("hello"), rx.try_recv());
920 assert_eq!(Ok("hello"), rx.try_recv());
921 assert_eq!(Ok("hello"), rx.try_recv());
922 assert_eq!(Ok("hello"), rx.try_recv());
923 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
924
925 tx.try_send("hello").unwrap();
926 tx.try_send("hello").unwrap();
927 tx.try_send("hello").unwrap();
928 tx.try_send("hello").unwrap();
929 assert_eq!(Ok("hello"), rx.try_recv());
930 tx.try_send("hello").unwrap();
931 tx.try_send("hello").unwrap();
932 assert!(tx.try_send("hello").is_err());
933 assert_eq!(Ok("hello"), rx.try_recv());
934 assert_eq!(Ok("hello"), rx.try_recv());
935 assert_eq!(Ok("hello"), rx.try_recv());
936 assert_eq!(Ok("hello"), rx.try_recv());
937 assert_eq!(Ok("hello"), rx.try_recv());
938 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
939
940 tx.try_send("hello").unwrap();
941 tx.try_send("hello").unwrap();
942 tx.try_send("hello").unwrap();
943 drop(tx);
944 assert_eq!(Ok("hello"), rx.try_recv());
945 assert_eq!(Ok("hello"), rx.try_recv());
946 assert_eq!(Ok("hello"), rx.try_recv());
947 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
948 }
949
950 #[test]
try_recv_unbounded()951 fn try_recv_unbounded() {
952 for num in 0..100 {
953 let (tx, mut rx) = mpsc::unbounded_channel();
954
955 for i in 0..num {
956 tx.send(i).unwrap();
957 }
958
959 for i in 0..num {
960 assert_eq!(rx.try_recv(), Ok(i));
961 }
962
963 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
964 drop(tx);
965 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
966 }
967 }
968
969 #[test]
try_recv_close_while_empty_bounded()970 fn try_recv_close_while_empty_bounded() {
971 let (tx, mut rx) = mpsc::channel::<()>(5);
972
973 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
974 drop(tx);
975 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
976 }
977
978 #[test]
try_recv_close_while_empty_unbounded()979 fn try_recv_close_while_empty_unbounded() {
980 let (tx, mut rx) = mpsc::unbounded_channel::<()>();
981
982 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
983 drop(tx);
984 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
985 }
986
987 #[tokio::test(start_paused = true)]
988 #[cfg(feature = "full")]
recv_timeout()989 async fn recv_timeout() {
990 use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout};
991 use tokio::time::Duration;
992
993 let (tx, rx) = mpsc::channel(5);
994
995 assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(()));
996 assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(()));
997 assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(()));
998 assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(()));
999 assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(()));
1000 assert_eq!(
1001 tx.send_timeout(60, Duration::from_secs(1)).await,
1002 Err(Timeout(60))
1003 );
1004
1005 drop(rx);
1006 assert_eq!(
1007 tx.send_timeout(70, Duration::from_secs(1)).await,
1008 Err(Closed(70))
1009 );
1010 }
1011
1012 #[test]
1013 #[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"]
1014 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
recv_timeout_panic()1015 fn recv_timeout_panic() {
1016 use futures::future::FutureExt;
1017 use tokio::time::Duration;
1018
1019 let (tx, _rx) = mpsc::channel(5);
1020 tx.send_timeout(10, Duration::from_secs(1)).now_or_never();
1021 }
1022
1023 // Tests that channel `capacity` changes and `max_capacity` stays the same
1024 #[tokio::test]
test_tx_capacity()1025 async fn test_tx_capacity() {
1026 let (tx, _rx) = mpsc::channel::<()>(10);
1027 // both capacities are same before
1028 assert_eq!(tx.capacity(), 10);
1029 assert_eq!(tx.max_capacity(), 10);
1030
1031 let _permit = tx.reserve().await.unwrap();
1032 // after reserve, only capacity should drop by one
1033 assert_eq!(tx.capacity(), 9);
1034 assert_eq!(tx.max_capacity(), 10);
1035
1036 tx.send(()).await.unwrap();
1037 // after send, capacity should drop by one again
1038 assert_eq!(tx.capacity(), 8);
1039 assert_eq!(tx.max_capacity(), 10);
1040 }
1041
1042 #[tokio::test]
test_rx_is_closed_when_calling_close_with_sender()1043 async fn test_rx_is_closed_when_calling_close_with_sender() {
1044 // is_closed should return true after calling close but still has a sender
1045 let (_tx, mut rx) = mpsc::channel::<()>(10);
1046 rx.close();
1047
1048 assert!(rx.is_closed());
1049 }
1050
1051 #[tokio::test]
test_rx_is_closed_when_dropping_all_senders()1052 async fn test_rx_is_closed_when_dropping_all_senders() {
1053 // is_closed should return true after dropping all senders
1054 let (tx, rx) = mpsc::channel::<()>(10);
1055 let another_tx = tx.clone();
1056 let task = tokio::spawn(async move {
1057 drop(another_tx);
1058 });
1059
1060 drop(tx);
1061 let _ = task.await;
1062
1063 assert!(rx.is_closed());
1064 }
1065
1066 #[tokio::test]
test_rx_is_not_closed_when_there_are_senders()1067 async fn test_rx_is_not_closed_when_there_are_senders() {
1068 // is_closed should return false when there is a sender
1069 let (_tx, rx) = mpsc::channel::<()>(10);
1070 assert!(!rx.is_closed());
1071 }
1072
1073 #[tokio::test]
test_rx_is_not_closed_when_there_are_senders_and_buffer_filled()1074 async fn test_rx_is_not_closed_when_there_are_senders_and_buffer_filled() {
1075 // is_closed should return false when there is a sender, even if enough messages have been sent to fill the channel
1076 let (tx, rx) = mpsc::channel(10);
1077 for i in 0..10 {
1078 assert!(tx.send(i).await.is_ok());
1079 }
1080 assert!(!rx.is_closed());
1081 }
1082
1083 #[tokio::test]
test_rx_is_closed_when_there_are_no_senders_and_there_are_messages()1084 async fn test_rx_is_closed_when_there_are_no_senders_and_there_are_messages() {
1085 // is_closed should return true when there are messages in the buffer, but no senders
1086 let (tx, rx) = mpsc::channel(10);
1087 for i in 0..10 {
1088 assert!(tx.send(i).await.is_ok());
1089 }
1090 drop(tx);
1091 assert!(rx.is_closed());
1092 }
1093
1094 #[tokio::test]
test_rx_is_closed_when_there_are_messages_and_close_is_called()1095 async fn test_rx_is_closed_when_there_are_messages_and_close_is_called() {
1096 // is_closed should return true when there are messages in the buffer, and close is called
1097 let (tx, mut rx) = mpsc::channel(10);
1098 for i in 0..10 {
1099 assert!(tx.send(i).await.is_ok());
1100 }
1101 rx.close();
1102 assert!(rx.is_closed());
1103 }
1104
1105 #[tokio::test]
test_rx_is_not_closed_when_there_are_permits_but_not_senders()1106 async fn test_rx_is_not_closed_when_there_are_permits_but_not_senders() {
1107 // is_closed should return false when there is a permit (but no senders)
1108 let (tx, rx) = mpsc::channel::<()>(10);
1109 let _permit = tx.reserve_owned().await.expect("Failed to reserve permit");
1110 assert!(!rx.is_closed());
1111 }
1112
1113 #[tokio::test]
test_rx_is_empty_when_no_messages_were_sent()1114 async fn test_rx_is_empty_when_no_messages_were_sent() {
1115 let (_tx, rx) = mpsc::channel::<()>(10);
1116 assert!(rx.is_empty())
1117 }
1118
1119 #[tokio::test]
test_rx_is_not_empty_when_there_are_messages_in_the_buffer()1120 async fn test_rx_is_not_empty_when_there_are_messages_in_the_buffer() {
1121 let (tx, rx) = mpsc::channel::<()>(10);
1122 assert!(tx.send(()).await.is_ok());
1123 assert!(!rx.is_empty())
1124 }
1125
1126 #[tokio::test]
test_rx_is_not_empty_when_the_buffer_is_full()1127 async fn test_rx_is_not_empty_when_the_buffer_is_full() {
1128 let (tx, rx) = mpsc::channel(10);
1129 for i in 0..10 {
1130 assert!(tx.send(i).await.is_ok());
1131 }
1132 assert!(!rx.is_empty())
1133 }
1134
1135 #[tokio::test]
test_rx_is_not_empty_when_all_but_one_messages_are_consumed()1136 async fn test_rx_is_not_empty_when_all_but_one_messages_are_consumed() {
1137 let (tx, mut rx) = mpsc::channel(10);
1138 for i in 0..10 {
1139 assert!(tx.send(i).await.is_ok());
1140 }
1141
1142 for _ in 0..9 {
1143 assert!(rx.recv().await.is_some());
1144 }
1145
1146 assert!(!rx.is_empty())
1147 }
1148
1149 #[tokio::test]
test_rx_is_empty_when_all_messages_are_consumed()1150 async fn test_rx_is_empty_when_all_messages_are_consumed() {
1151 let (tx, mut rx) = mpsc::channel(10);
1152 for i in 0..10 {
1153 assert!(tx.send(i).await.is_ok());
1154 }
1155 while rx.try_recv().is_ok() {}
1156 assert!(rx.is_empty())
1157 }
1158
1159 #[tokio::test]
test_rx_is_empty_all_senders_are_dropped_and_messages_consumed()1160 async fn test_rx_is_empty_all_senders_are_dropped_and_messages_consumed() {
1161 let (tx, mut rx) = mpsc::channel(10);
1162 for i in 0..10 {
1163 assert!(tx.send(i).await.is_ok());
1164 }
1165 drop(tx);
1166
1167 for _ in 0..10 {
1168 assert!(rx.recv().await.is_some());
1169 }
1170
1171 assert!(rx.is_empty())
1172 }
1173
1174 #[tokio::test]
test_rx_len_on_empty_channel()1175 async fn test_rx_len_on_empty_channel() {
1176 let (_tx, rx) = mpsc::channel::<()>(100);
1177 assert_eq!(rx.len(), 0);
1178 }
1179
1180 #[tokio::test]
test_rx_len_on_empty_channel_without_senders()1181 async fn test_rx_len_on_empty_channel_without_senders() {
1182 // when all senders are dropped, a "closed" value is added to the end of the linked list.
1183 // here we test that the "closed" value does not change the len of the channel.
1184
1185 let (tx, rx) = mpsc::channel::<()>(100);
1186 drop(tx);
1187 assert_eq!(rx.len(), 0);
1188 }
1189
1190 #[tokio::test]
test_rx_len_on_filled_channel()1191 async fn test_rx_len_on_filled_channel() {
1192 let (tx, rx) = mpsc::channel(100);
1193
1194 for i in 0..100 {
1195 assert!(tx.send(i).await.is_ok());
1196 }
1197 assert_eq!(rx.len(), 100);
1198 }
1199
1200 #[tokio::test]
test_rx_len_on_filled_channel_without_senders()1201 async fn test_rx_len_on_filled_channel_without_senders() {
1202 let (tx, rx) = mpsc::channel(100);
1203
1204 for i in 0..100 {
1205 assert!(tx.send(i).await.is_ok());
1206 }
1207 drop(tx);
1208 assert_eq!(rx.len(), 100);
1209 }
1210
1211 #[tokio::test]
test_rx_len_when_consuming_all_messages()1212 async fn test_rx_len_when_consuming_all_messages() {
1213 let (tx, mut rx) = mpsc::channel(100);
1214
1215 for i in 0..100 {
1216 assert!(tx.send(i).await.is_ok());
1217 assert_eq!(rx.len(), i + 1);
1218 }
1219
1220 drop(tx);
1221
1222 for i in (0..100).rev() {
1223 assert!(rx.recv().await.is_some());
1224 assert_eq!(rx.len(), i);
1225 }
1226 }
1227
1228 #[tokio::test]
test_rx_len_when_close_is_called()1229 async fn test_rx_len_when_close_is_called() {
1230 let (tx, mut rx) = mpsc::channel(100);
1231 tx.send(()).await.unwrap();
1232 rx.close();
1233
1234 assert_eq!(rx.len(), 1);
1235 }
1236
1237 #[tokio::test]
test_rx_len_when_close_is_called_before_dropping_sender()1238 async fn test_rx_len_when_close_is_called_before_dropping_sender() {
1239 let (tx, mut rx) = mpsc::channel(100);
1240 tx.send(()).await.unwrap();
1241 rx.close();
1242 drop(tx);
1243
1244 assert_eq!(rx.len(), 1);
1245 }
1246
1247 #[tokio::test]
test_rx_len_when_close_is_called_after_dropping_sender()1248 async fn test_rx_len_when_close_is_called_after_dropping_sender() {
1249 let (tx, mut rx) = mpsc::channel(100);
1250 tx.send(()).await.unwrap();
1251 drop(tx);
1252 rx.close();
1253
1254 assert_eq!(rx.len(), 1);
1255 }
1256
1257 #[tokio::test]
test_rx_unbounded_is_closed_when_calling_close_with_sender()1258 async fn test_rx_unbounded_is_closed_when_calling_close_with_sender() {
1259 // is_closed should return true after calling close but still has a sender
1260 let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
1261 rx.close();
1262
1263 assert!(rx.is_closed());
1264 }
1265
1266 #[tokio::test]
test_rx_unbounded_is_closed_when_dropping_all_senders()1267 async fn test_rx_unbounded_is_closed_when_dropping_all_senders() {
1268 // is_closed should return true after dropping all senders
1269 let (tx, rx) = mpsc::unbounded_channel::<()>();
1270 let another_tx = tx.clone();
1271 let task = tokio::spawn(async move {
1272 drop(another_tx);
1273 });
1274
1275 drop(tx);
1276 let _ = task.await;
1277
1278 assert!(rx.is_closed());
1279 }
1280
1281 #[tokio::test]
test_rx_unbounded_is_not_closed_when_there_are_senders()1282 async fn test_rx_unbounded_is_not_closed_when_there_are_senders() {
1283 // is_closed should return false when there is a sender
1284 let (_tx, rx) = mpsc::unbounded_channel::<()>();
1285 assert!(!rx.is_closed());
1286 }
1287
1288 #[tokio::test]
test_rx_unbounded_is_closed_when_there_are_no_senders_and_there_are_messages()1289 async fn test_rx_unbounded_is_closed_when_there_are_no_senders_and_there_are_messages() {
1290 // is_closed should return true when there are messages in the buffer, but no senders
1291 let (tx, rx) = mpsc::unbounded_channel();
1292 for i in 0..10 {
1293 assert!(tx.send(i).is_ok());
1294 }
1295 drop(tx);
1296 assert!(rx.is_closed());
1297 }
1298
1299 #[tokio::test]
test_rx_unbounded_is_closed_when_there_are_messages_and_close_is_called()1300 async fn test_rx_unbounded_is_closed_when_there_are_messages_and_close_is_called() {
1301 // is_closed should return true when there are messages in the buffer, and close is called
1302 let (tx, mut rx) = mpsc::unbounded_channel();
1303 for i in 0..10 {
1304 assert!(tx.send(i).is_ok());
1305 }
1306 rx.close();
1307 assert!(rx.is_closed());
1308 }
1309
1310 #[tokio::test]
test_rx_unbounded_is_empty_when_no_messages_were_sent()1311 async fn test_rx_unbounded_is_empty_when_no_messages_were_sent() {
1312 let (_tx, rx) = mpsc::unbounded_channel::<()>();
1313 assert!(rx.is_empty())
1314 }
1315
1316 #[tokio::test]
test_rx_unbounded_is_not_empty_when_there_are_messages_in_the_buffer()1317 async fn test_rx_unbounded_is_not_empty_when_there_are_messages_in_the_buffer() {
1318 let (tx, rx) = mpsc::unbounded_channel();
1319 assert!(tx.send(()).is_ok());
1320 assert!(!rx.is_empty())
1321 }
1322
1323 #[tokio::test]
test_rx_unbounded_is_not_empty_when_all_but_one_messages_are_consumed()1324 async fn test_rx_unbounded_is_not_empty_when_all_but_one_messages_are_consumed() {
1325 let (tx, mut rx) = mpsc::unbounded_channel();
1326 for i in 0..10 {
1327 assert!(tx.send(i).is_ok());
1328 }
1329
1330 for _ in 0..9 {
1331 assert!(rx.recv().await.is_some());
1332 }
1333
1334 assert!(!rx.is_empty())
1335 }
1336
1337 #[tokio::test]
test_rx_unbounded_is_empty_when_all_messages_are_consumed()1338 async fn test_rx_unbounded_is_empty_when_all_messages_are_consumed() {
1339 let (tx, mut rx) = mpsc::unbounded_channel();
1340 for i in 0..10 {
1341 assert!(tx.send(i).is_ok());
1342 }
1343 while rx.try_recv().is_ok() {}
1344 assert!(rx.is_empty())
1345 }
1346
1347 #[tokio::test]
test_rx_unbounded_is_empty_all_senders_are_dropped_and_messages_consumed()1348 async fn test_rx_unbounded_is_empty_all_senders_are_dropped_and_messages_consumed() {
1349 let (tx, mut rx) = mpsc::unbounded_channel();
1350 for i in 0..10 {
1351 assert!(tx.send(i).is_ok());
1352 }
1353 drop(tx);
1354
1355 for _ in 0..10 {
1356 assert!(rx.recv().await.is_some());
1357 }
1358
1359 assert!(rx.is_empty())
1360 }
1361
1362 #[tokio::test]
test_rx_unbounded_len_on_empty_channel()1363 async fn test_rx_unbounded_len_on_empty_channel() {
1364 let (_tx, rx) = mpsc::unbounded_channel::<()>();
1365 assert_eq!(rx.len(), 0);
1366 }
1367
1368 #[tokio::test]
test_rx_unbounded_len_on_empty_channel_without_senders()1369 async fn test_rx_unbounded_len_on_empty_channel_without_senders() {
1370 // when all senders are dropped, a "closed" value is added to the end of the linked list.
1371 // here we test that the "closed" value does not change the len of the channel.
1372
1373 let (tx, rx) = mpsc::unbounded_channel::<()>();
1374 drop(tx);
1375 assert_eq!(rx.len(), 0);
1376 }
1377
1378 #[tokio::test]
test_rx_unbounded_len_with_multiple_messages()1379 async fn test_rx_unbounded_len_with_multiple_messages() {
1380 let (tx, rx) = mpsc::unbounded_channel();
1381
1382 for i in 0..100 {
1383 assert!(tx.send(i).is_ok());
1384 }
1385 assert_eq!(rx.len(), 100);
1386 }
1387
1388 #[tokio::test]
test_rx_unbounded_len_with_multiple_messages_and_dropped_senders()1389 async fn test_rx_unbounded_len_with_multiple_messages_and_dropped_senders() {
1390 let (tx, rx) = mpsc::unbounded_channel();
1391
1392 for i in 0..100 {
1393 assert!(tx.send(i).is_ok());
1394 }
1395 drop(tx);
1396 assert_eq!(rx.len(), 100);
1397 }
1398
1399 #[tokio::test]
test_rx_unbounded_len_when_consuming_all_messages()1400 async fn test_rx_unbounded_len_when_consuming_all_messages() {
1401 let (tx, mut rx) = mpsc::unbounded_channel();
1402
1403 for i in 0..100 {
1404 assert!(tx.send(i).is_ok());
1405 assert_eq!(rx.len(), i + 1);
1406 }
1407
1408 drop(tx);
1409
1410 for i in (0..100).rev() {
1411 assert!(rx.recv().await.is_some());
1412 assert_eq!(rx.len(), i);
1413 }
1414 }
1415
1416 #[tokio::test]
test_rx_unbounded_len_when_close_is_called()1417 async fn test_rx_unbounded_len_when_close_is_called() {
1418 let (tx, mut rx) = mpsc::unbounded_channel();
1419 tx.send(()).unwrap();
1420 rx.close();
1421
1422 assert_eq!(rx.len(), 1);
1423 }
1424
1425 #[tokio::test]
test_rx_unbounded_len_when_close_is_called_before_dropping_sender()1426 async fn test_rx_unbounded_len_when_close_is_called_before_dropping_sender() {
1427 let (tx, mut rx) = mpsc::unbounded_channel();
1428 tx.send(()).unwrap();
1429 rx.close();
1430 drop(tx);
1431
1432 assert_eq!(rx.len(), 1);
1433 }
1434
1435 #[tokio::test]
test_rx_unbounded_len_when_close_is_called_after_dropping_sender()1436 async fn test_rx_unbounded_len_when_close_is_called_after_dropping_sender() {
1437 let (tx, mut rx) = mpsc::unbounded_channel();
1438 tx.send(()).unwrap();
1439 drop(tx);
1440 rx.close();
1441
1442 assert_eq!(rx.len(), 1);
1443 }
1444
1445 // Regression test for https://github.com/tokio-rs/tokio/issues/6602
1446 #[tokio::test]
test_is_empty_32_msgs()1447 async fn test_is_empty_32_msgs() {
1448 let (sender, mut receiver) = mpsc::channel(33);
1449
1450 for value in 1..257 {
1451 sender.send(value).await.unwrap();
1452 receiver.recv().await.unwrap();
1453 assert!(receiver.is_empty(), "{value}. len: {}", receiver.len());
1454 }
1455 }
1456
is_debug<T: fmt::Debug>(_: &T)1457 fn is_debug<T: fmt::Debug>(_: &T) {}
1458