1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4 
5 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
8 use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
9 
10 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
11 use tokio::test as maybe_tokio_test;
12 
13 use std::fmt;
14 use std::panic;
15 use std::sync::Arc;
16 use tokio::sync::mpsc;
17 use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
18 use tokio_test::*;
19 
20 #[cfg(not(target_family = "wasm"))]
21 mod support {
22     pub(crate) mod mpsc_stream;
23 }
24 
25 #[allow(unused)]
26 trait AssertRefUnwindSafe: panic::RefUnwindSafe {}
27 impl<T> AssertRefUnwindSafe for mpsc::OwnedPermit<T> {}
28 impl<'a, T> AssertRefUnwindSafe for mpsc::Permit<'a, T> {}
29 impl<'a, T> AssertRefUnwindSafe for mpsc::PermitIterator<'a, T> {}
30 impl<T> AssertRefUnwindSafe for mpsc::Receiver<T> {}
31 impl<T> AssertRefUnwindSafe for mpsc::Sender<T> {}
32 impl<T> AssertRefUnwindSafe for mpsc::UnboundedReceiver<T> {}
33 impl<T> AssertRefUnwindSafe for mpsc::UnboundedSender<T> {}
34 impl<T> AssertRefUnwindSafe for mpsc::WeakSender<T> {}
35 impl<T> AssertRefUnwindSafe for mpsc::WeakUnboundedSender<T> {}
36 
37 #[allow(unused)]
38 trait AssertUnwindSafe: panic::UnwindSafe {}
39 impl<T> AssertUnwindSafe for mpsc::OwnedPermit<T> {}
40 impl<'a, T> AssertUnwindSafe for mpsc::Permit<'a, T> {}
41 impl<'a, T> AssertUnwindSafe for mpsc::PermitIterator<'a, T> {}
42 impl<T> AssertUnwindSafe for mpsc::Receiver<T> {}
43 impl<T> AssertUnwindSafe for mpsc::Sender<T> {}
44 impl<T> AssertUnwindSafe for mpsc::UnboundedReceiver<T> {}
45 impl<T> AssertUnwindSafe for mpsc::UnboundedSender<T> {}
46 impl<T> AssertUnwindSafe for mpsc::WeakSender<T> {}
47 impl<T> AssertUnwindSafe for mpsc::WeakUnboundedSender<T> {}
48 
49 #[maybe_tokio_test]
send_recv_with_buffer()50 async fn send_recv_with_buffer() {
51     let (tx, mut rx) = mpsc::channel::<i32>(16);
52 
53     // Using poll_ready / try_send
54     // let permit assert_ready_ok!(tx.reserve());
55     let permit = tx.reserve().await.unwrap();
56     permit.send(1);
57 
58     // Without poll_ready
59     tx.try_send(2).unwrap();
60 
61     drop(tx);
62 
63     let val = rx.recv().await;
64     assert_eq!(val, Some(1));
65 
66     let val = rx.recv().await;
67     assert_eq!(val, Some(2));
68 
69     let val = rx.recv().await;
70     assert!(val.is_none());
71 }
72 
73 #[tokio::test]
74 #[cfg(feature = "full")]
reserve_disarm()75 async fn reserve_disarm() {
76     let (tx, mut rx) = mpsc::channel::<i32>(2);
77     let tx1 = tx.clone();
78     let tx2 = tx.clone();
79     let tx3 = tx.clone();
80     let tx4 = tx;
81 
82     // We should be able to `poll_ready` two handles without problem
83     let permit1 = assert_ok!(tx1.reserve().await);
84     let permit2 = assert_ok!(tx2.reserve().await);
85 
86     // But a third should not be ready
87     let mut r3 = tokio_test::task::spawn(tx3.reserve());
88     assert_pending!(r3.poll());
89 
90     let mut r4 = tokio_test::task::spawn(tx4.reserve());
91     assert_pending!(r4.poll());
92 
93     // Using one of the reserved slots should allow a new handle to become ready
94     permit1.send(1);
95 
96     // We also need to receive for the slot to be free
97     assert!(!r3.is_woken());
98     rx.recv().await.unwrap();
99     // Now there's a free slot!
100     assert!(r3.is_woken());
101     assert!(!r4.is_woken());
102 
103     // Dropping a permit should also open up a slot
104     drop(permit2);
105     assert!(r4.is_woken());
106 
107     let mut r1 = tokio_test::task::spawn(tx1.reserve());
108     assert_pending!(r1.poll());
109 }
110 
111 #[tokio::test]
112 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
send_recv_stream_with_buffer()113 async fn send_recv_stream_with_buffer() {
114     use tokio_stream::StreamExt;
115 
116     let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
117     let mut rx = Box::pin(rx);
118 
119     tokio::spawn(async move {
120         assert_ok!(tx.send(1).await);
121         assert_ok!(tx.send(2).await);
122     });
123 
124     assert_eq!(Some(1), rx.next().await);
125     assert_eq!(Some(2), rx.next().await);
126     assert_eq!(None, rx.next().await);
127 }
128 
129 #[tokio::test]
130 #[cfg(feature = "full")]
async_send_recv_with_buffer()131 async fn async_send_recv_with_buffer() {
132     let (tx, mut rx) = mpsc::channel(16);
133 
134     tokio::spawn(async move {
135         assert_ok!(tx.send(1).await);
136         assert_ok!(tx.send(2).await);
137     });
138 
139     assert_eq!(Some(1), rx.recv().await);
140     assert_eq!(Some(2), rx.recv().await);
141     assert_eq!(None, rx.recv().await);
142 }
143 
144 #[tokio::test]
145 #[cfg(feature = "full")]
async_send_recv_many_with_buffer()146 async fn async_send_recv_many_with_buffer() {
147     let (tx, mut rx) = mpsc::channel(2);
148     let mut buffer = Vec::<i32>::with_capacity(3);
149 
150     // With `limit=0` does not sleep, returns immediately
151     assert_eq!(0, rx.recv_many(&mut buffer, 0).await);
152 
153     let handle = tokio::spawn(async move {
154         assert_ok!(tx.send(1).await);
155         assert_ok!(tx.send(2).await);
156         assert_ok!(tx.send(7).await);
157         assert_ok!(tx.send(0).await);
158     });
159 
160     let limit = 3;
161     let mut recv_count = 0usize;
162     while recv_count < 4 {
163         recv_count += rx.recv_many(&mut buffer, limit).await;
164         assert_eq!(buffer.len(), recv_count);
165     }
166 
167     assert_eq!(vec![1, 2, 7, 0], buffer);
168     assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
169     handle.await.unwrap();
170 }
171 
172 #[tokio::test]
173 #[cfg(feature = "full")]
start_send_past_cap()174 async fn start_send_past_cap() {
175     use std::future::Future;
176 
177     let mut t1 = tokio_test::task::spawn(());
178 
179     let (tx1, mut rx) = mpsc::channel(1);
180     let tx2 = tx1.clone();
181 
182     assert_ok!(tx1.try_send(()));
183 
184     let mut r1 = Box::pin(tx1.reserve());
185     t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
186 
187     {
188         let mut r2 = tokio_test::task::spawn(tx2.reserve());
189         assert_pending!(r2.poll());
190 
191         drop(r1);
192 
193         assert!(rx.recv().await.is_some());
194 
195         assert!(r2.is_woken());
196         assert!(!t1.is_woken());
197     }
198 
199     drop(tx1);
200     drop(tx2);
201 
202     assert!(rx.recv().await.is_none());
203 }
204 
205 #[test]
206 #[should_panic]
207 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
buffer_gteq_one()208 fn buffer_gteq_one() {
209     mpsc::channel::<i32>(0);
210 }
211 
212 #[maybe_tokio_test]
send_recv_unbounded()213 async fn send_recv_unbounded() {
214     let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
215 
216     // Using `try_send`
217     assert_ok!(tx.send(1));
218     assert_ok!(tx.send(2));
219 
220     assert_eq!(rx.recv().await, Some(1));
221     assert_eq!(rx.recv().await, Some(2));
222 
223     drop(tx);
224 
225     assert!(rx.recv().await.is_none());
226 }
227 
228 #[maybe_tokio_test]
send_recv_many_unbounded()229 async fn send_recv_many_unbounded() {
230     let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
231 
232     let mut buffer: Vec<i32> = Vec::new();
233 
234     // With `limit=0` does not sleep, returns immediately
235     rx.recv_many(&mut buffer, 0).await;
236     assert_eq!(0, buffer.len());
237 
238     assert_ok!(tx.send(7));
239     assert_ok!(tx.send(13));
240     assert_ok!(tx.send(100));
241     assert_ok!(tx.send(1002));
242 
243     rx.recv_many(&mut buffer, 0).await;
244     assert_eq!(0, buffer.len());
245 
246     let mut count = 0;
247     while count < 4 {
248         count += rx.recv_many(&mut buffer, 1).await;
249     }
250     assert_eq!(count, 4);
251     assert_eq!(vec![7, 13, 100, 1002], buffer);
252     let final_capacity = buffer.capacity();
253     assert!(final_capacity > 0);
254 
255     buffer.clear();
256 
257     assert_ok!(tx.send(5));
258     assert_ok!(tx.send(6));
259     assert_ok!(tx.send(7));
260     assert_ok!(tx.send(2));
261 
262     // Re-use existing capacity
263     count = rx.recv_many(&mut buffer, 32).await;
264 
265     assert_eq!(final_capacity, buffer.capacity());
266     assert_eq!(count, 4);
267     assert_eq!(vec![5, 6, 7, 2], buffer);
268 
269     drop(tx);
270 
271     // recv_many will immediately return zero if the channel
272     // is closed and no more messages are waiting
273     assert_eq!(0, rx.recv_many(&mut buffer, 4).await);
274     assert!(rx.recv().await.is_none());
275 }
276 
277 #[tokio::test]
278 #[cfg(feature = "full")]
send_recv_many_bounded_capacity()279 async fn send_recv_many_bounded_capacity() {
280     let mut buffer: Vec<String> = Vec::with_capacity(9);
281     let limit = buffer.capacity();
282     let (tx, mut rx) = mpsc::channel(100);
283 
284     let mut expected: Vec<String> = (0..limit)
285         .map(|x: usize| format!("{x}"))
286         .collect::<Vec<_>>();
287     for x in expected.clone() {
288         tx.send(x).await.unwrap()
289     }
290     tx.send("one more".to_string()).await.unwrap();
291 
292     // Here `recv_many` receives all but the last value;
293     // the initial capacity is adequate, so the buffer does
294     // not increase in side.
295     assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await);
296     assert_eq!(expected, buffer);
297     assert_eq!(limit, buffer.capacity());
298 
299     // Receive up more values:
300     assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
301     assert!(buffer.capacity() > limit);
302     expected.push("one more".to_string());
303     assert_eq!(expected, buffer);
304 
305     tokio::spawn(async move {
306         tx.send("final".to_string()).await.unwrap();
307     });
308 
309     // 'tx' is dropped, but `recv_many` is guaranteed not
310     // to return 0 as the channel has outstanding permits
311     assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
312     expected.push("final".to_string());
313     assert_eq!(expected, buffer);
314     // The channel is now closed and `recv_many` returns 0.
315     assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
316     assert_eq!(expected, buffer);
317 }
318 
319 #[tokio::test]
320 #[cfg(feature = "full")]
send_recv_many_unbounded_capacity()321 async fn send_recv_many_unbounded_capacity() {
322     let mut buffer: Vec<String> = Vec::with_capacity(9); // capacity >= 9
323     let limit = buffer.capacity();
324     let (tx, mut rx) = mpsc::unbounded_channel();
325 
326     let mut expected: Vec<String> = (0..limit)
327         .map(|x: usize| format!("{x}"))
328         .collect::<Vec<_>>();
329     for x in expected.clone() {
330         tx.send(x).unwrap()
331     }
332     tx.send("one more".to_string()).unwrap();
333 
334     // Here `recv_many` receives all but the last value;
335     // the initial capacity is adequate, so the buffer does
336     // not increase in side.
337     assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await);
338     assert_eq!(expected, buffer);
339     assert_eq!(limit, buffer.capacity());
340 
341     // Receive up more values:
342     assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
343     assert!(buffer.capacity() > limit);
344     expected.push("one more".to_string());
345     assert_eq!(expected, buffer);
346 
347     tokio::spawn(async move {
348         tx.send("final".to_string()).unwrap();
349     });
350 
351     // 'tx' is dropped, but `recv_many` is guaranteed not
352     // to return 0 as the channel has outstanding permits
353     assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
354     expected.push("final".to_string());
355     assert_eq!(expected, buffer);
356     // The channel is now closed and `recv_many` returns 0.
357     assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
358     assert_eq!(expected, buffer);
359 }
360 
361 #[tokio::test]
362 #[cfg(feature = "full")]
async_send_recv_unbounded()363 async fn async_send_recv_unbounded() {
364     let (tx, mut rx) = mpsc::unbounded_channel();
365 
366     tokio::spawn(async move {
367         assert_ok!(tx.send(1));
368         assert_ok!(tx.send(2));
369     });
370 
371     assert_eq!(Some(1), rx.recv().await);
372     assert_eq!(Some(2), rx.recv().await);
373     assert_eq!(None, rx.recv().await);
374 }
375 
376 #[tokio::test]
377 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
send_recv_stream_unbounded()378 async fn send_recv_stream_unbounded() {
379     use tokio_stream::StreamExt;
380 
381     let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();
382 
383     let mut rx = Box::pin(rx);
384 
385     tokio::spawn(async move {
386         assert_ok!(tx.send(1));
387         assert_ok!(tx.send(2));
388     });
389 
390     assert_eq!(Some(1), rx.next().await);
391     assert_eq!(Some(2), rx.next().await);
392     assert_eq!(None, rx.next().await);
393 }
394 
395 #[maybe_tokio_test]
no_t_bounds_buffer()396 async fn no_t_bounds_buffer() {
397     struct NoImpls;
398 
399     let (tx, mut rx) = mpsc::channel(100);
400 
401     // sender should be Debug even though T isn't Debug
402     is_debug(&tx);
403     // same with Receiver
404     is_debug(&rx);
405     // and sender should be Clone even though T isn't Clone
406     assert!(tx.clone().try_send(NoImpls).is_ok());
407 
408     assert!(rx.recv().await.is_some());
409 }
410 
411 #[maybe_tokio_test]
no_t_bounds_unbounded()412 async fn no_t_bounds_unbounded() {
413     struct NoImpls;
414 
415     let (tx, mut rx) = mpsc::unbounded_channel();
416 
417     // sender should be Debug even though T isn't Debug
418     is_debug(&tx);
419     // same with Receiver
420     is_debug(&rx);
421     // and sender should be Clone even though T isn't Clone
422     assert!(tx.clone().send(NoImpls).is_ok());
423 
424     assert!(rx.recv().await.is_some());
425 }
426 
427 #[tokio::test]
428 #[cfg(feature = "full")]
send_recv_buffer_limited()429 async fn send_recv_buffer_limited() {
430     let (tx, mut rx) = mpsc::channel::<i32>(1);
431 
432     // Reserve capacity
433     let p1 = assert_ok!(tx.reserve().await);
434 
435     // Send first message
436     p1.send(1);
437 
438     // Not ready
439     let mut p2 = tokio_test::task::spawn(tx.reserve());
440     assert_pending!(p2.poll());
441 
442     // Take the value
443     assert!(rx.recv().await.is_some());
444 
445     // Notified
446     assert!(p2.is_woken());
447 
448     // Trying to send fails
449     assert_err!(tx.try_send(1337));
450 
451     // Send second
452     let permit = assert_ready_ok!(p2.poll());
453     permit.send(2);
454 
455     assert!(rx.recv().await.is_some());
456 }
457 
458 #[maybe_tokio_test]
recv_close_gets_none_idle()459 async fn recv_close_gets_none_idle() {
460     let (tx, mut rx) = mpsc::channel::<i32>(10);
461 
462     rx.close();
463 
464     assert!(rx.recv().await.is_none());
465 
466     assert_err!(tx.send(1).await);
467 }
468 
469 #[tokio::test]
470 #[cfg(feature = "full")]
recv_close_gets_none_reserved()471 async fn recv_close_gets_none_reserved() {
472     let (tx1, mut rx) = mpsc::channel::<i32>(1);
473     let tx2 = tx1.clone();
474 
475     let permit1 = assert_ok!(tx1.reserve().await);
476     let mut permit2 = tokio_test::task::spawn(tx2.reserve());
477     assert_pending!(permit2.poll());
478 
479     rx.close();
480 
481     assert!(permit2.is_woken());
482     assert_ready_err!(permit2.poll());
483 
484     {
485         let mut recv = tokio_test::task::spawn(rx.recv());
486         assert_pending!(recv.poll());
487 
488         permit1.send(123);
489         assert!(recv.is_woken());
490 
491         let v = assert_ready!(recv.poll());
492         assert_eq!(v, Some(123));
493     }
494 
495     assert!(rx.recv().await.is_none());
496 }
497 
498 #[maybe_tokio_test]
tx_close_gets_none()499 async fn tx_close_gets_none() {
500     let (_, mut rx) = mpsc::channel::<i32>(10);
501     assert!(rx.recv().await.is_none());
502 }
503 
504 #[maybe_tokio_test]
try_send_fail()505 async fn try_send_fail() {
506     let (tx, mut rx) = mpsc::channel(1);
507 
508     tx.try_send("hello").unwrap();
509 
510     // This should fail
511     match assert_err!(tx.try_send("fail")) {
512         TrySendError::Full(..) => {}
513         _ => panic!(),
514     }
515 
516     assert_eq!(rx.recv().await, Some("hello"));
517 
518     assert_ok!(tx.try_send("goodbye"));
519     drop(tx);
520 
521     assert_eq!(rx.recv().await, Some("goodbye"));
522     assert!(rx.recv().await.is_none());
523 }
524 
525 #[maybe_tokio_test]
try_send_fail_with_try_recv()526 async fn try_send_fail_with_try_recv() {
527     let (tx, mut rx) = mpsc::channel(1);
528 
529     tx.try_send("hello").unwrap();
530 
531     // This should fail
532     match assert_err!(tx.try_send("fail")) {
533         TrySendError::Full(..) => {}
534         _ => panic!(),
535     }
536 
537     assert_eq!(rx.try_recv(), Ok("hello"));
538 
539     assert_ok!(tx.try_send("goodbye"));
540     drop(tx);
541 
542     assert_eq!(rx.try_recv(), Ok("goodbye"));
543     assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
544 }
545 
546 #[maybe_tokio_test]
reserve_many_above_cap()547 async fn reserve_many_above_cap() {
548     const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS;
549     let (tx, _rx) = mpsc::channel::<()>(1);
550 
551     assert_err!(tx.reserve_many(2).await);
552     assert_err!(tx.reserve_many(MAX_PERMITS + 1).await);
553     assert_err!(tx.reserve_many(usize::MAX).await);
554 }
555 
556 #[test]
try_reserve_many_zero()557 fn try_reserve_many_zero() {
558     let (tx, rx) = mpsc::channel::<()>(1);
559 
560     // Succeeds when not closed.
561     assert!(assert_ok!(tx.try_reserve_many(0)).next().is_none());
562 
563     // Even when channel is full.
564     tx.try_send(()).unwrap();
565     assert!(assert_ok!(tx.try_reserve_many(0)).next().is_none());
566 
567     drop(rx);
568 
569     // Closed error when closed.
570     assert_eq!(
571         assert_err!(tx.try_reserve_many(0)),
572         TrySendError::Closed(())
573     );
574 }
575 
576 #[maybe_tokio_test]
reserve_many_zero()577 async fn reserve_many_zero() {
578     let (tx, rx) = mpsc::channel::<()>(1);
579 
580     // Succeeds when not closed.
581     assert!(assert_ok!(tx.reserve_many(0).await).next().is_none());
582 
583     // Even when channel is full.
584     tx.send(()).await.unwrap();
585     assert!(assert_ok!(tx.reserve_many(0).await).next().is_none());
586 
587     drop(rx);
588 
589     // Closed error when closed.
590     assert_err!(tx.reserve_many(0).await);
591 }
592 
593 #[maybe_tokio_test]
try_reserve_many_edge_cases()594 async fn try_reserve_many_edge_cases() {
595     const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS;
596 
597     let (tx, rx) = mpsc::channel::<()>(1);
598 
599     let mut permit = assert_ok!(tx.try_reserve_many(0));
600     assert!(permit.next().is_none());
601 
602     let permit = tx.try_reserve_many(MAX_PERMITS + 1);
603     match assert_err!(permit) {
604         TrySendError::Full(..) => {}
605         _ => panic!(),
606     }
607 
608     let permit = tx.try_reserve_many(usize::MAX);
609     match assert_err!(permit) {
610         TrySendError::Full(..) => {}
611         _ => panic!(),
612     }
613 
614     // Dropping the receiver should close the channel
615     drop(rx);
616     assert_err!(tx.reserve_many(0).await);
617 }
618 
619 #[maybe_tokio_test]
try_reserve_fails()620 async fn try_reserve_fails() {
621     let (tx, mut rx) = mpsc::channel(1);
622 
623     let permit = tx.try_reserve().unwrap();
624 
625     // This should fail
626     match assert_err!(tx.try_reserve()) {
627         TrySendError::Full(()) => {}
628         _ => panic!(),
629     }
630 
631     permit.send("foo");
632 
633     assert_eq!(rx.recv().await, Some("foo"));
634 
635     // Dropping permit releases the slot.
636     let permit = tx.try_reserve().unwrap();
637     drop(permit);
638 
639     let _permit = tx.try_reserve().unwrap();
640 }
641 
642 #[maybe_tokio_test]
reserve_many_and_send()643 async fn reserve_many_and_send() {
644     let (tx, mut rx) = mpsc::channel(100);
645     for i in 0..100 {
646         for permit in assert_ok!(tx.reserve_many(i).await) {
647             permit.send("foo");
648             assert_eq!(rx.recv().await, Some("foo"));
649         }
650         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
651     }
652 }
653 #[maybe_tokio_test]
try_reserve_many_and_send()654 async fn try_reserve_many_and_send() {
655     let (tx, mut rx) = mpsc::channel(100);
656     for i in 0..100 {
657         for permit in assert_ok!(tx.try_reserve_many(i)) {
658             permit.send("foo");
659             assert_eq!(rx.recv().await, Some("foo"));
660         }
661         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
662     }
663 }
664 
665 #[maybe_tokio_test]
reserve_many_on_closed_channel()666 async fn reserve_many_on_closed_channel() {
667     let (tx, rx) = mpsc::channel::<()>(100);
668     drop(rx);
669     assert_err!(tx.reserve_many(10).await);
670 }
671 
672 #[maybe_tokio_test]
try_reserve_many_on_closed_channel()673 async fn try_reserve_many_on_closed_channel() {
674     let (tx, rx) = mpsc::channel::<usize>(100);
675     drop(rx);
676     match assert_err!(tx.try_reserve_many(10)) {
677         TrySendError::Closed(()) => {}
678         _ => panic!(),
679     };
680 }
681 
682 #[maybe_tokio_test]
683 #[cfg_attr(miri, ignore)] // Too slow on miri.
try_reserve_many_full()684 async fn try_reserve_many_full() {
685     // Reserve n capacity and send k messages
686     for n in 1..100 {
687         for k in 0..n {
688             let (tx, mut rx) = mpsc::channel::<usize>(n);
689             let permits = assert_ok!(tx.try_reserve_many(n));
690 
691             assert_eq!(permits.len(), n);
692             assert_eq!(tx.capacity(), 0);
693 
694             match assert_err!(tx.try_reserve_many(1)) {
695                 TrySendError::Full(..) => {}
696                 _ => panic!(),
697             };
698 
699             for permit in permits.take(k) {
700                 permit.send(0);
701             }
702             // We only used k permits on the n reserved
703             assert_eq!(tx.capacity(), n - k);
704 
705             // We can reserve more permits
706             assert_ok!(tx.try_reserve_many(1));
707 
708             // But not more than the current capacity
709             match assert_err!(tx.try_reserve_many(n - k + 1)) {
710                 TrySendError::Full(..) => {}
711                 _ => panic!(),
712             };
713 
714             for _i in 0..k {
715                 assert_eq!(rx.recv().await, Some(0));
716             }
717 
718             // Now that we've received everything, capacity should be back to n
719             assert_eq!(tx.capacity(), n);
720         }
721     }
722 }
723 
724 #[tokio::test]
725 #[cfg(feature = "full")]
drop_permit_releases_permit()726 async fn drop_permit_releases_permit() {
727     // poll_ready reserves capacity, ensure that the capacity is released if tx
728     // is dropped w/o sending a value.
729     let (tx1, _rx) = mpsc::channel::<i32>(1);
730     let tx2 = tx1.clone();
731 
732     let permit = assert_ok!(tx1.reserve().await);
733 
734     let mut reserve2 = tokio_test::task::spawn(tx2.reserve());
735     assert_pending!(reserve2.poll());
736 
737     drop(permit);
738 
739     assert!(reserve2.is_woken());
740     assert_ready_ok!(reserve2.poll());
741 }
742 
743 #[maybe_tokio_test]
drop_permit_iterator_releases_permits()744 async fn drop_permit_iterator_releases_permits() {
745     // poll_ready reserves capacity, ensure that the capacity is released if tx
746     // is dropped w/o sending a value.
747     for n in 1..100 {
748         let (tx1, _rx) = mpsc::channel::<i32>(n);
749         let tx2 = tx1.clone();
750 
751         let permits = assert_ok!(tx1.reserve_many(n).await);
752 
753         let mut reserve2 = tokio_test::task::spawn(tx2.reserve_many(n));
754         assert_pending!(reserve2.poll());
755 
756         drop(permits);
757 
758         assert!(reserve2.is_woken());
759 
760         let permits = assert_ready_ok!(reserve2.poll());
761         drop(permits);
762 
763         assert_eq!(tx1.capacity(), n);
764     }
765 }
766 
767 #[maybe_tokio_test]
dropping_rx_closes_channel()768 async fn dropping_rx_closes_channel() {
769     let (tx, rx) = mpsc::channel(100);
770 
771     let msg = Arc::new(());
772     assert_ok!(tx.try_send(msg.clone()));
773 
774     drop(rx);
775     assert_err!(tx.reserve().await);
776     assert_err!(tx.reserve_many(10).await);
777     assert_eq!(1, Arc::strong_count(&msg));
778 }
779 
780 #[test]
dropping_rx_closes_channel_for_try()781 fn dropping_rx_closes_channel_for_try() {
782     let (tx, rx) = mpsc::channel(100);
783 
784     let msg = Arc::new(());
785     tx.try_send(msg.clone()).unwrap();
786 
787     drop(rx);
788 
789     assert!(matches!(
790         tx.try_send(msg.clone()),
791         Err(TrySendError::Closed(_))
792     ));
793     assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
794     assert!(matches!(
795         tx.try_reserve_owned(),
796         Err(TrySendError::Closed(_))
797     ));
798 
799     assert_eq!(1, Arc::strong_count(&msg));
800 }
801 
802 #[test]
unconsumed_messages_are_dropped()803 fn unconsumed_messages_are_dropped() {
804     let msg = Arc::new(());
805 
806     let (tx, rx) = mpsc::channel(100);
807 
808     tx.try_send(msg.clone()).unwrap();
809 
810     assert_eq!(2, Arc::strong_count(&msg));
811 
812     drop((tx, rx));
813 
814     assert_eq!(1, Arc::strong_count(&msg));
815 }
816 
817 #[test]
818 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
blocking_recv()819 fn blocking_recv() {
820     let (tx, mut rx) = mpsc::channel::<u8>(1);
821 
822     let sync_code = std::thread::spawn(move || {
823         assert_eq!(Some(10), rx.blocking_recv());
824     });
825 
826     tokio::runtime::Runtime::new()
827         .unwrap()
828         .block_on(async move {
829             let _ = tx.send(10).await;
830         });
831     sync_code.join().unwrap()
832 }
833 
834 #[tokio::test]
835 #[should_panic]
836 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
blocking_recv_async()837 async fn blocking_recv_async() {
838     let (_tx, mut rx) = mpsc::channel::<()>(1);
839     let _ = rx.blocking_recv();
840 }
841 
842 #[test]
843 #[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
blocking_send()844 fn blocking_send() {
845     let (tx, mut rx) = mpsc::channel::<u8>(1);
846 
847     let sync_code = std::thread::spawn(move || {
848         tx.blocking_send(10).unwrap();
849     });
850 
851     tokio::runtime::Runtime::new()
852         .unwrap()
853         .block_on(async move {
854             assert_eq!(Some(10), rx.recv().await);
855         });
856     sync_code.join().unwrap()
857 }
858 
859 #[tokio::test]
860 #[should_panic]
861 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
blocking_send_async()862 async fn blocking_send_async() {
863     let (tx, _rx) = mpsc::channel::<()>(1);
864     let _ = tx.blocking_send(());
865 }
866 
867 #[tokio::test]
868 #[cfg(feature = "full")]
ready_close_cancel_bounded()869 async fn ready_close_cancel_bounded() {
870     let (tx, mut rx) = mpsc::channel::<()>(100);
871     let _tx2 = tx.clone();
872 
873     let permit = assert_ok!(tx.reserve().await);
874 
875     rx.close();
876 
877     let mut recv = tokio_test::task::spawn(rx.recv());
878     assert_pending!(recv.poll());
879 
880     drop(permit);
881 
882     assert!(recv.is_woken());
883     let val = assert_ready!(recv.poll());
884     assert!(val.is_none());
885 }
886 
887 #[tokio::test]
888 #[cfg(feature = "full")]
permit_available_not_acquired_close()889 async fn permit_available_not_acquired_close() {
890     let (tx1, mut rx) = mpsc::channel::<()>(1);
891     let tx2 = tx1.clone();
892 
893     let permit1 = assert_ok!(tx1.reserve().await);
894 
895     let mut permit2 = tokio_test::task::spawn(tx2.reserve());
896     assert_pending!(permit2.poll());
897 
898     rx.close();
899 
900     drop(permit1);
901     assert!(permit2.is_woken());
902 
903     drop(permit2);
904     assert!(rx.recv().await.is_none());
905 }
906 
907 #[test]
try_recv_bounded()908 fn try_recv_bounded() {
909     let (tx, mut rx) = mpsc::channel(5);
910 
911     tx.try_send("hello").unwrap();
912     tx.try_send("hello").unwrap();
913     tx.try_send("hello").unwrap();
914     tx.try_send("hello").unwrap();
915     tx.try_send("hello").unwrap();
916     assert!(tx.try_send("hello").is_err());
917 
918     assert_eq!(Ok("hello"), rx.try_recv());
919     assert_eq!(Ok("hello"), rx.try_recv());
920     assert_eq!(Ok("hello"), rx.try_recv());
921     assert_eq!(Ok("hello"), rx.try_recv());
922     assert_eq!(Ok("hello"), rx.try_recv());
923     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
924 
925     tx.try_send("hello").unwrap();
926     tx.try_send("hello").unwrap();
927     tx.try_send("hello").unwrap();
928     tx.try_send("hello").unwrap();
929     assert_eq!(Ok("hello"), rx.try_recv());
930     tx.try_send("hello").unwrap();
931     tx.try_send("hello").unwrap();
932     assert!(tx.try_send("hello").is_err());
933     assert_eq!(Ok("hello"), rx.try_recv());
934     assert_eq!(Ok("hello"), rx.try_recv());
935     assert_eq!(Ok("hello"), rx.try_recv());
936     assert_eq!(Ok("hello"), rx.try_recv());
937     assert_eq!(Ok("hello"), rx.try_recv());
938     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
939 
940     tx.try_send("hello").unwrap();
941     tx.try_send("hello").unwrap();
942     tx.try_send("hello").unwrap();
943     drop(tx);
944     assert_eq!(Ok("hello"), rx.try_recv());
945     assert_eq!(Ok("hello"), rx.try_recv());
946     assert_eq!(Ok("hello"), rx.try_recv());
947     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
948 }
949 
950 #[test]
try_recv_unbounded()951 fn try_recv_unbounded() {
952     for num in 0..100 {
953         let (tx, mut rx) = mpsc::unbounded_channel();
954 
955         for i in 0..num {
956             tx.send(i).unwrap();
957         }
958 
959         for i in 0..num {
960             assert_eq!(rx.try_recv(), Ok(i));
961         }
962 
963         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
964         drop(tx);
965         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
966     }
967 }
968 
969 #[test]
try_recv_close_while_empty_bounded()970 fn try_recv_close_while_empty_bounded() {
971     let (tx, mut rx) = mpsc::channel::<()>(5);
972 
973     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
974     drop(tx);
975     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
976 }
977 
978 #[test]
try_recv_close_while_empty_unbounded()979 fn try_recv_close_while_empty_unbounded() {
980     let (tx, mut rx) = mpsc::unbounded_channel::<()>();
981 
982     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
983     drop(tx);
984     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
985 }
986 
987 #[tokio::test(start_paused = true)]
988 #[cfg(feature = "full")]
recv_timeout()989 async fn recv_timeout() {
990     use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout};
991     use tokio::time::Duration;
992 
993     let (tx, rx) = mpsc::channel(5);
994 
995     assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(()));
996     assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(()));
997     assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(()));
998     assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(()));
999     assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(()));
1000     assert_eq!(
1001         tx.send_timeout(60, Duration::from_secs(1)).await,
1002         Err(Timeout(60))
1003     );
1004 
1005     drop(rx);
1006     assert_eq!(
1007         tx.send_timeout(70, Duration::from_secs(1)).await,
1008         Err(Closed(70))
1009     );
1010 }
1011 
1012 #[test]
1013 #[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"]
1014 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
recv_timeout_panic()1015 fn recv_timeout_panic() {
1016     use futures::future::FutureExt;
1017     use tokio::time::Duration;
1018 
1019     let (tx, _rx) = mpsc::channel(5);
1020     tx.send_timeout(10, Duration::from_secs(1)).now_or_never();
1021 }
1022 
1023 // Tests that channel `capacity` changes and `max_capacity` stays the same
1024 #[tokio::test]
test_tx_capacity()1025 async fn test_tx_capacity() {
1026     let (tx, _rx) = mpsc::channel::<()>(10);
1027     // both capacities are same before
1028     assert_eq!(tx.capacity(), 10);
1029     assert_eq!(tx.max_capacity(), 10);
1030 
1031     let _permit = tx.reserve().await.unwrap();
1032     // after reserve, only capacity should drop by one
1033     assert_eq!(tx.capacity(), 9);
1034     assert_eq!(tx.max_capacity(), 10);
1035 
1036     tx.send(()).await.unwrap();
1037     // after send, capacity should drop by one again
1038     assert_eq!(tx.capacity(), 8);
1039     assert_eq!(tx.max_capacity(), 10);
1040 }
1041 
1042 #[tokio::test]
test_rx_is_closed_when_calling_close_with_sender()1043 async fn test_rx_is_closed_when_calling_close_with_sender() {
1044     // is_closed should return true after calling close but still has a sender
1045     let (_tx, mut rx) = mpsc::channel::<()>(10);
1046     rx.close();
1047 
1048     assert!(rx.is_closed());
1049 }
1050 
1051 #[tokio::test]
test_rx_is_closed_when_dropping_all_senders()1052 async fn test_rx_is_closed_when_dropping_all_senders() {
1053     // is_closed should return true after dropping all senders
1054     let (tx, rx) = mpsc::channel::<()>(10);
1055     let another_tx = tx.clone();
1056     let task = tokio::spawn(async move {
1057         drop(another_tx);
1058     });
1059 
1060     drop(tx);
1061     let _ = task.await;
1062 
1063     assert!(rx.is_closed());
1064 }
1065 
1066 #[tokio::test]
test_rx_is_not_closed_when_there_are_senders()1067 async fn test_rx_is_not_closed_when_there_are_senders() {
1068     // is_closed should return false when there is a sender
1069     let (_tx, rx) = mpsc::channel::<()>(10);
1070     assert!(!rx.is_closed());
1071 }
1072 
1073 #[tokio::test]
test_rx_is_not_closed_when_there_are_senders_and_buffer_filled()1074 async fn test_rx_is_not_closed_when_there_are_senders_and_buffer_filled() {
1075     // is_closed should return false when there is a sender, even if enough messages have been sent to fill the channel
1076     let (tx, rx) = mpsc::channel(10);
1077     for i in 0..10 {
1078         assert!(tx.send(i).await.is_ok());
1079     }
1080     assert!(!rx.is_closed());
1081 }
1082 
1083 #[tokio::test]
test_rx_is_closed_when_there_are_no_senders_and_there_are_messages()1084 async fn test_rx_is_closed_when_there_are_no_senders_and_there_are_messages() {
1085     // is_closed should return true when there are messages in the buffer, but no senders
1086     let (tx, rx) = mpsc::channel(10);
1087     for i in 0..10 {
1088         assert!(tx.send(i).await.is_ok());
1089     }
1090     drop(tx);
1091     assert!(rx.is_closed());
1092 }
1093 
1094 #[tokio::test]
test_rx_is_closed_when_there_are_messages_and_close_is_called()1095 async fn test_rx_is_closed_when_there_are_messages_and_close_is_called() {
1096     // is_closed should return true when there are messages in the buffer, and close is called
1097     let (tx, mut rx) = mpsc::channel(10);
1098     for i in 0..10 {
1099         assert!(tx.send(i).await.is_ok());
1100     }
1101     rx.close();
1102     assert!(rx.is_closed());
1103 }
1104 
1105 #[tokio::test]
test_rx_is_not_closed_when_there_are_permits_but_not_senders()1106 async fn test_rx_is_not_closed_when_there_are_permits_but_not_senders() {
1107     // is_closed should return false when there is a permit (but no senders)
1108     let (tx, rx) = mpsc::channel::<()>(10);
1109     let _permit = tx.reserve_owned().await.expect("Failed to reserve permit");
1110     assert!(!rx.is_closed());
1111 }
1112 
1113 #[tokio::test]
test_rx_is_empty_when_no_messages_were_sent()1114 async fn test_rx_is_empty_when_no_messages_were_sent() {
1115     let (_tx, rx) = mpsc::channel::<()>(10);
1116     assert!(rx.is_empty())
1117 }
1118 
1119 #[tokio::test]
test_rx_is_not_empty_when_there_are_messages_in_the_buffer()1120 async fn test_rx_is_not_empty_when_there_are_messages_in_the_buffer() {
1121     let (tx, rx) = mpsc::channel::<()>(10);
1122     assert!(tx.send(()).await.is_ok());
1123     assert!(!rx.is_empty())
1124 }
1125 
1126 #[tokio::test]
test_rx_is_not_empty_when_the_buffer_is_full()1127 async fn test_rx_is_not_empty_when_the_buffer_is_full() {
1128     let (tx, rx) = mpsc::channel(10);
1129     for i in 0..10 {
1130         assert!(tx.send(i).await.is_ok());
1131     }
1132     assert!(!rx.is_empty())
1133 }
1134 
1135 #[tokio::test]
test_rx_is_not_empty_when_all_but_one_messages_are_consumed()1136 async fn test_rx_is_not_empty_when_all_but_one_messages_are_consumed() {
1137     let (tx, mut rx) = mpsc::channel(10);
1138     for i in 0..10 {
1139         assert!(tx.send(i).await.is_ok());
1140     }
1141 
1142     for _ in 0..9 {
1143         assert!(rx.recv().await.is_some());
1144     }
1145 
1146     assert!(!rx.is_empty())
1147 }
1148 
1149 #[tokio::test]
test_rx_is_empty_when_all_messages_are_consumed()1150 async fn test_rx_is_empty_when_all_messages_are_consumed() {
1151     let (tx, mut rx) = mpsc::channel(10);
1152     for i in 0..10 {
1153         assert!(tx.send(i).await.is_ok());
1154     }
1155     while rx.try_recv().is_ok() {}
1156     assert!(rx.is_empty())
1157 }
1158 
1159 #[tokio::test]
test_rx_is_empty_all_senders_are_dropped_and_messages_consumed()1160 async fn test_rx_is_empty_all_senders_are_dropped_and_messages_consumed() {
1161     let (tx, mut rx) = mpsc::channel(10);
1162     for i in 0..10 {
1163         assert!(tx.send(i).await.is_ok());
1164     }
1165     drop(tx);
1166 
1167     for _ in 0..10 {
1168         assert!(rx.recv().await.is_some());
1169     }
1170 
1171     assert!(rx.is_empty())
1172 }
1173 
1174 #[tokio::test]
test_rx_len_on_empty_channel()1175 async fn test_rx_len_on_empty_channel() {
1176     let (_tx, rx) = mpsc::channel::<()>(100);
1177     assert_eq!(rx.len(), 0);
1178 }
1179 
1180 #[tokio::test]
test_rx_len_on_empty_channel_without_senders()1181 async fn test_rx_len_on_empty_channel_without_senders() {
1182     // when all senders are dropped, a "closed" value is added to the end of the linked list.
1183     // here we test that the "closed" value does not change the len of the channel.
1184 
1185     let (tx, rx) = mpsc::channel::<()>(100);
1186     drop(tx);
1187     assert_eq!(rx.len(), 0);
1188 }
1189 
1190 #[tokio::test]
test_rx_len_on_filled_channel()1191 async fn test_rx_len_on_filled_channel() {
1192     let (tx, rx) = mpsc::channel(100);
1193 
1194     for i in 0..100 {
1195         assert!(tx.send(i).await.is_ok());
1196     }
1197     assert_eq!(rx.len(), 100);
1198 }
1199 
1200 #[tokio::test]
test_rx_len_on_filled_channel_without_senders()1201 async fn test_rx_len_on_filled_channel_without_senders() {
1202     let (tx, rx) = mpsc::channel(100);
1203 
1204     for i in 0..100 {
1205         assert!(tx.send(i).await.is_ok());
1206     }
1207     drop(tx);
1208     assert_eq!(rx.len(), 100);
1209 }
1210 
1211 #[tokio::test]
test_rx_len_when_consuming_all_messages()1212 async fn test_rx_len_when_consuming_all_messages() {
1213     let (tx, mut rx) = mpsc::channel(100);
1214 
1215     for i in 0..100 {
1216         assert!(tx.send(i).await.is_ok());
1217         assert_eq!(rx.len(), i + 1);
1218     }
1219 
1220     drop(tx);
1221 
1222     for i in (0..100).rev() {
1223         assert!(rx.recv().await.is_some());
1224         assert_eq!(rx.len(), i);
1225     }
1226 }
1227 
1228 #[tokio::test]
test_rx_len_when_close_is_called()1229 async fn test_rx_len_when_close_is_called() {
1230     let (tx, mut rx) = mpsc::channel(100);
1231     tx.send(()).await.unwrap();
1232     rx.close();
1233 
1234     assert_eq!(rx.len(), 1);
1235 }
1236 
1237 #[tokio::test]
test_rx_len_when_close_is_called_before_dropping_sender()1238 async fn test_rx_len_when_close_is_called_before_dropping_sender() {
1239     let (tx, mut rx) = mpsc::channel(100);
1240     tx.send(()).await.unwrap();
1241     rx.close();
1242     drop(tx);
1243 
1244     assert_eq!(rx.len(), 1);
1245 }
1246 
1247 #[tokio::test]
test_rx_len_when_close_is_called_after_dropping_sender()1248 async fn test_rx_len_when_close_is_called_after_dropping_sender() {
1249     let (tx, mut rx) = mpsc::channel(100);
1250     tx.send(()).await.unwrap();
1251     drop(tx);
1252     rx.close();
1253 
1254     assert_eq!(rx.len(), 1);
1255 }
1256 
1257 #[tokio::test]
test_rx_unbounded_is_closed_when_calling_close_with_sender()1258 async fn test_rx_unbounded_is_closed_when_calling_close_with_sender() {
1259     // is_closed should return true after calling close but still has a sender
1260     let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
1261     rx.close();
1262 
1263     assert!(rx.is_closed());
1264 }
1265 
1266 #[tokio::test]
test_rx_unbounded_is_closed_when_dropping_all_senders()1267 async fn test_rx_unbounded_is_closed_when_dropping_all_senders() {
1268     // is_closed should return true after dropping all senders
1269     let (tx, rx) = mpsc::unbounded_channel::<()>();
1270     let another_tx = tx.clone();
1271     let task = tokio::spawn(async move {
1272         drop(another_tx);
1273     });
1274 
1275     drop(tx);
1276     let _ = task.await;
1277 
1278     assert!(rx.is_closed());
1279 }
1280 
1281 #[tokio::test]
test_rx_unbounded_is_not_closed_when_there_are_senders()1282 async fn test_rx_unbounded_is_not_closed_when_there_are_senders() {
1283     // is_closed should return false when there is a sender
1284     let (_tx, rx) = mpsc::unbounded_channel::<()>();
1285     assert!(!rx.is_closed());
1286 }
1287 
1288 #[tokio::test]
test_rx_unbounded_is_closed_when_there_are_no_senders_and_there_are_messages()1289 async fn test_rx_unbounded_is_closed_when_there_are_no_senders_and_there_are_messages() {
1290     // is_closed should return true when there are messages in the buffer, but no senders
1291     let (tx, rx) = mpsc::unbounded_channel();
1292     for i in 0..10 {
1293         assert!(tx.send(i).is_ok());
1294     }
1295     drop(tx);
1296     assert!(rx.is_closed());
1297 }
1298 
1299 #[tokio::test]
test_rx_unbounded_is_closed_when_there_are_messages_and_close_is_called()1300 async fn test_rx_unbounded_is_closed_when_there_are_messages_and_close_is_called() {
1301     // is_closed should return true when there are messages in the buffer, and close is called
1302     let (tx, mut rx) = mpsc::unbounded_channel();
1303     for i in 0..10 {
1304         assert!(tx.send(i).is_ok());
1305     }
1306     rx.close();
1307     assert!(rx.is_closed());
1308 }
1309 
1310 #[tokio::test]
test_rx_unbounded_is_empty_when_no_messages_were_sent()1311 async fn test_rx_unbounded_is_empty_when_no_messages_were_sent() {
1312     let (_tx, rx) = mpsc::unbounded_channel::<()>();
1313     assert!(rx.is_empty())
1314 }
1315 
1316 #[tokio::test]
test_rx_unbounded_is_not_empty_when_there_are_messages_in_the_buffer()1317 async fn test_rx_unbounded_is_not_empty_when_there_are_messages_in_the_buffer() {
1318     let (tx, rx) = mpsc::unbounded_channel();
1319     assert!(tx.send(()).is_ok());
1320     assert!(!rx.is_empty())
1321 }
1322 
1323 #[tokio::test]
test_rx_unbounded_is_not_empty_when_all_but_one_messages_are_consumed()1324 async fn test_rx_unbounded_is_not_empty_when_all_but_one_messages_are_consumed() {
1325     let (tx, mut rx) = mpsc::unbounded_channel();
1326     for i in 0..10 {
1327         assert!(tx.send(i).is_ok());
1328     }
1329 
1330     for _ in 0..9 {
1331         assert!(rx.recv().await.is_some());
1332     }
1333 
1334     assert!(!rx.is_empty())
1335 }
1336 
1337 #[tokio::test]
test_rx_unbounded_is_empty_when_all_messages_are_consumed()1338 async fn test_rx_unbounded_is_empty_when_all_messages_are_consumed() {
1339     let (tx, mut rx) = mpsc::unbounded_channel();
1340     for i in 0..10 {
1341         assert!(tx.send(i).is_ok());
1342     }
1343     while rx.try_recv().is_ok() {}
1344     assert!(rx.is_empty())
1345 }
1346 
1347 #[tokio::test]
test_rx_unbounded_is_empty_all_senders_are_dropped_and_messages_consumed()1348 async fn test_rx_unbounded_is_empty_all_senders_are_dropped_and_messages_consumed() {
1349     let (tx, mut rx) = mpsc::unbounded_channel();
1350     for i in 0..10 {
1351         assert!(tx.send(i).is_ok());
1352     }
1353     drop(tx);
1354 
1355     for _ in 0..10 {
1356         assert!(rx.recv().await.is_some());
1357     }
1358 
1359     assert!(rx.is_empty())
1360 }
1361 
1362 #[tokio::test]
test_rx_unbounded_len_on_empty_channel()1363 async fn test_rx_unbounded_len_on_empty_channel() {
1364     let (_tx, rx) = mpsc::unbounded_channel::<()>();
1365     assert_eq!(rx.len(), 0);
1366 }
1367 
1368 #[tokio::test]
test_rx_unbounded_len_on_empty_channel_without_senders()1369 async fn test_rx_unbounded_len_on_empty_channel_without_senders() {
1370     // when all senders are dropped, a "closed" value is added to the end of the linked list.
1371     // here we test that the "closed" value does not change the len of the channel.
1372 
1373     let (tx, rx) = mpsc::unbounded_channel::<()>();
1374     drop(tx);
1375     assert_eq!(rx.len(), 0);
1376 }
1377 
1378 #[tokio::test]
test_rx_unbounded_len_with_multiple_messages()1379 async fn test_rx_unbounded_len_with_multiple_messages() {
1380     let (tx, rx) = mpsc::unbounded_channel();
1381 
1382     for i in 0..100 {
1383         assert!(tx.send(i).is_ok());
1384     }
1385     assert_eq!(rx.len(), 100);
1386 }
1387 
1388 #[tokio::test]
test_rx_unbounded_len_with_multiple_messages_and_dropped_senders()1389 async fn test_rx_unbounded_len_with_multiple_messages_and_dropped_senders() {
1390     let (tx, rx) = mpsc::unbounded_channel();
1391 
1392     for i in 0..100 {
1393         assert!(tx.send(i).is_ok());
1394     }
1395     drop(tx);
1396     assert_eq!(rx.len(), 100);
1397 }
1398 
1399 #[tokio::test]
test_rx_unbounded_len_when_consuming_all_messages()1400 async fn test_rx_unbounded_len_when_consuming_all_messages() {
1401     let (tx, mut rx) = mpsc::unbounded_channel();
1402 
1403     for i in 0..100 {
1404         assert!(tx.send(i).is_ok());
1405         assert_eq!(rx.len(), i + 1);
1406     }
1407 
1408     drop(tx);
1409 
1410     for i in (0..100).rev() {
1411         assert!(rx.recv().await.is_some());
1412         assert_eq!(rx.len(), i);
1413     }
1414 }
1415 
1416 #[tokio::test]
test_rx_unbounded_len_when_close_is_called()1417 async fn test_rx_unbounded_len_when_close_is_called() {
1418     let (tx, mut rx) = mpsc::unbounded_channel();
1419     tx.send(()).unwrap();
1420     rx.close();
1421 
1422     assert_eq!(rx.len(), 1);
1423 }
1424 
1425 #[tokio::test]
test_rx_unbounded_len_when_close_is_called_before_dropping_sender()1426 async fn test_rx_unbounded_len_when_close_is_called_before_dropping_sender() {
1427     let (tx, mut rx) = mpsc::unbounded_channel();
1428     tx.send(()).unwrap();
1429     rx.close();
1430     drop(tx);
1431 
1432     assert_eq!(rx.len(), 1);
1433 }
1434 
1435 #[tokio::test]
test_rx_unbounded_len_when_close_is_called_after_dropping_sender()1436 async fn test_rx_unbounded_len_when_close_is_called_after_dropping_sender() {
1437     let (tx, mut rx) = mpsc::unbounded_channel();
1438     tx.send(()).unwrap();
1439     drop(tx);
1440     rx.close();
1441 
1442     assert_eq!(rx.len(), 1);
1443 }
1444 
1445 // Regression test for https://github.com/tokio-rs/tokio/issues/6602
1446 #[tokio::test]
test_is_empty_32_msgs()1447 async fn test_is_empty_32_msgs() {
1448     let (sender, mut receiver) = mpsc::channel(33);
1449 
1450     for value in 1..257 {
1451         sender.send(value).await.unwrap();
1452         receiver.recv().await.unwrap();
1453         assert!(receiver.is_empty(), "{value}. len: {}", receiver.len());
1454     }
1455 }
1456 
is_debug<T: fmt::Debug>(_: &T)1457 fn is_debug<T: fmt::Debug>(_: &T) {}
1458