1 use crate::sync::batch_semaphore::Semaphore;
2 use tokio_test::*;
3 
4 const MAX_PERMITS: usize = crate::sync::Semaphore::MAX_PERMITS;
5 
6 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
7 use wasm_bindgen_test::wasm_bindgen_test as test;
8 
9 #[test]
poll_acquire_one_available()10 fn poll_acquire_one_available() {
11     let s = Semaphore::new(100);
12     assert_eq!(s.available_permits(), 100);
13 
14     // Polling for a permit succeeds immediately
15     assert_ready_ok!(task::spawn(s.acquire(1)).poll());
16     assert_eq!(s.available_permits(), 99);
17 }
18 
19 #[test]
poll_acquire_many_available()20 fn poll_acquire_many_available() {
21     let s = Semaphore::new(100);
22     assert_eq!(s.available_permits(), 100);
23 
24     // Polling for a permit succeeds immediately
25     assert_ready_ok!(task::spawn(s.acquire(5)).poll());
26     assert_eq!(s.available_permits(), 95);
27 
28     assert_ready_ok!(task::spawn(s.acquire(5)).poll());
29     assert_eq!(s.available_permits(), 90);
30 }
31 
32 #[test]
try_acquire_one_available()33 fn try_acquire_one_available() {
34     let s = Semaphore::new(100);
35     assert_eq!(s.available_permits(), 100);
36 
37     assert_ok!(s.try_acquire(1));
38     assert_eq!(s.available_permits(), 99);
39 
40     assert_ok!(s.try_acquire(1));
41     assert_eq!(s.available_permits(), 98);
42 }
43 
44 #[test]
try_acquire_many_available()45 fn try_acquire_many_available() {
46     let s = Semaphore::new(100);
47     assert_eq!(s.available_permits(), 100);
48 
49     assert_ok!(s.try_acquire(5));
50     assert_eq!(s.available_permits(), 95);
51 
52     assert_ok!(s.try_acquire(5));
53     assert_eq!(s.available_permits(), 90);
54 }
55 
56 #[test]
poll_acquire_one_unavailable()57 fn poll_acquire_one_unavailable() {
58     let s = Semaphore::new(1);
59 
60     // Acquire the first permit
61     assert_ready_ok!(task::spawn(s.acquire(1)).poll());
62     assert_eq!(s.available_permits(), 0);
63 
64     let mut acquire_2 = task::spawn(s.acquire(1));
65     // Try to acquire the second permit
66     assert_pending!(acquire_2.poll());
67     assert_eq!(s.available_permits(), 0);
68 
69     s.release(1);
70 
71     assert_eq!(s.available_permits(), 0);
72     assert!(acquire_2.is_woken());
73     assert_ready_ok!(acquire_2.poll());
74     assert_eq!(s.available_permits(), 0);
75 
76     s.release(1);
77     assert_eq!(s.available_permits(), 1);
78 }
79 
80 #[test]
poll_acquire_many_unavailable()81 fn poll_acquire_many_unavailable() {
82     let s = Semaphore::new(5);
83 
84     // Acquire the first permit
85     assert_ready_ok!(task::spawn(s.acquire(1)).poll());
86     assert_eq!(s.available_permits(), 4);
87 
88     // Try to acquire the second permit
89     let mut acquire_2 = task::spawn(s.acquire(5));
90     assert_pending!(acquire_2.poll());
91     assert_eq!(s.available_permits(), 0);
92 
93     // Try to acquire the third permit
94     let mut acquire_3 = task::spawn(s.acquire(3));
95     assert_pending!(acquire_3.poll());
96     assert_eq!(s.available_permits(), 0);
97 
98     s.release(1);
99 
100     assert_eq!(s.available_permits(), 0);
101     assert!(acquire_2.is_woken());
102     assert_ready_ok!(acquire_2.poll());
103 
104     assert!(!acquire_3.is_woken());
105     assert_eq!(s.available_permits(), 0);
106 
107     s.release(1);
108     assert!(!acquire_3.is_woken());
109     assert_eq!(s.available_permits(), 0);
110 
111     s.release(2);
112     assert!(acquire_3.is_woken());
113 
114     assert_ready_ok!(acquire_3.poll());
115 }
116 
117 #[test]
try_acquire_one_unavailable()118 fn try_acquire_one_unavailable() {
119     let s = Semaphore::new(1);
120 
121     // Acquire the first permit
122     assert_ok!(s.try_acquire(1));
123     assert_eq!(s.available_permits(), 0);
124 
125     assert_err!(s.try_acquire(1));
126 
127     s.release(1);
128 
129     assert_eq!(s.available_permits(), 1);
130     assert_ok!(s.try_acquire(1));
131 
132     s.release(1);
133     assert_eq!(s.available_permits(), 1);
134 }
135 
136 #[test]
try_acquire_many_unavailable()137 fn try_acquire_many_unavailable() {
138     let s = Semaphore::new(5);
139 
140     // Acquire the first permit
141     assert_ok!(s.try_acquire(1));
142     assert_eq!(s.available_permits(), 4);
143 
144     assert_err!(s.try_acquire(5));
145 
146     s.release(1);
147     assert_eq!(s.available_permits(), 5);
148 
149     assert_ok!(s.try_acquire(5));
150 
151     s.release(1);
152     assert_eq!(s.available_permits(), 1);
153 
154     s.release(1);
155     assert_eq!(s.available_permits(), 2);
156 }
157 
158 #[test]
poll_acquire_one_zero_permits()159 fn poll_acquire_one_zero_permits() {
160     let s = Semaphore::new(0);
161     assert_eq!(s.available_permits(), 0);
162 
163     // Try to acquire the permit
164     let mut acquire = task::spawn(s.acquire(1));
165     assert_pending!(acquire.poll());
166 
167     s.release(1);
168 
169     assert!(acquire.is_woken());
170     assert_ready_ok!(acquire.poll());
171 }
172 
173 #[test]
max_permits_doesnt_panic()174 fn max_permits_doesnt_panic() {
175     Semaphore::new(MAX_PERMITS);
176 }
177 
178 #[test]
179 #[should_panic]
180 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
validates_max_permits()181 fn validates_max_permits() {
182     Semaphore::new(MAX_PERMITS + 1);
183 }
184 
185 #[test]
close_semaphore_prevents_acquire()186 fn close_semaphore_prevents_acquire() {
187     let s = Semaphore::new(5);
188     s.close();
189 
190     assert_eq!(5, s.available_permits());
191 
192     assert_ready_err!(task::spawn(s.acquire(1)).poll());
193     assert_eq!(5, s.available_permits());
194 
195     assert_ready_err!(task::spawn(s.acquire(1)).poll());
196     assert_eq!(5, s.available_permits());
197 }
198 
199 #[test]
close_semaphore_notifies_permit1()200 fn close_semaphore_notifies_permit1() {
201     let s = Semaphore::new(0);
202     let mut acquire = task::spawn(s.acquire(1));
203 
204     assert_pending!(acquire.poll());
205 
206     s.close();
207 
208     assert!(acquire.is_woken());
209     assert_ready_err!(acquire.poll());
210 }
211 
212 #[test]
close_semaphore_notifies_permit2()213 fn close_semaphore_notifies_permit2() {
214     let s = Semaphore::new(2);
215 
216     // Acquire a couple of permits
217     assert_ready_ok!(task::spawn(s.acquire(1)).poll());
218     assert_ready_ok!(task::spawn(s.acquire(1)).poll());
219 
220     let mut acquire3 = task::spawn(s.acquire(1));
221     let mut acquire4 = task::spawn(s.acquire(1));
222     assert_pending!(acquire3.poll());
223     assert_pending!(acquire4.poll());
224 
225     s.close();
226 
227     assert!(acquire3.is_woken());
228     assert!(acquire4.is_woken());
229 
230     assert_ready_err!(acquire3.poll());
231     assert_ready_err!(acquire4.poll());
232 
233     assert_eq!(0, s.available_permits());
234 
235     s.release(1);
236 
237     assert_eq!(1, s.available_permits());
238 
239     assert_ready_err!(task::spawn(s.acquire(1)).poll());
240 
241     s.release(1);
242 
243     assert_eq!(2, s.available_permits());
244 }
245 
246 #[test]
cancel_acquire_releases_permits()247 fn cancel_acquire_releases_permits() {
248     let s = Semaphore::new(10);
249     s.try_acquire(4).expect("uncontended try_acquire succeeds");
250     assert_eq!(6, s.available_permits());
251 
252     let mut acquire = task::spawn(s.acquire(8));
253     assert_pending!(acquire.poll());
254 
255     assert_eq!(0, s.available_permits());
256     drop(acquire);
257 
258     assert_eq!(6, s.available_permits());
259     assert_ok!(s.try_acquire(6));
260 }
261 
262 #[test]
release_permits_at_drop()263 fn release_permits_at_drop() {
264     use crate::sync::semaphore::*;
265     use futures::task::ArcWake;
266     use std::future::Future;
267     use std::sync::Arc;
268 
269     let sem = Arc::new(Semaphore::new(1));
270 
271     struct ReleaseOnDrop(#[allow(dead_code)] Option<OwnedSemaphorePermit>);
272 
273     impl ArcWake for ReleaseOnDrop {
274         fn wake_by_ref(_arc_self: &Arc<Self>) {}
275     }
276 
277     let mut fut = Box::pin(async {
278         let _permit = sem.acquire().await.unwrap();
279     });
280 
281     // Second iteration shouldn't deadlock.
282     for _ in 0..=1 {
283         let waker = futures::task::waker(Arc::new(ReleaseOnDrop(
284             sem.clone().try_acquire_owned().ok(),
285         )));
286         let mut cx = std::task::Context::from_waker(&waker);
287         assert!(fut.as_mut().poll(&mut cx).is_pending());
288     }
289 }
290 
291 #[test]
forget_permits_basic()292 fn forget_permits_basic() {
293     let s = Semaphore::new(10);
294     assert_eq!(s.forget_permits(4), 4);
295     assert_eq!(s.available_permits(), 6);
296     assert_eq!(s.forget_permits(10), 6);
297     assert_eq!(s.available_permits(), 0);
298 }
299 
300 #[test]
update_permits_many_times()301 fn update_permits_many_times() {
302     let s = Semaphore::new(5);
303     let mut acquire = task::spawn(s.acquire(7));
304     assert_pending!(acquire.poll());
305     s.release(5);
306     assert_ready_ok!(acquire.poll());
307     assert_eq!(s.available_permits(), 3);
308     assert_eq!(s.forget_permits(3), 3);
309     assert_eq!(s.available_permits(), 0);
310 }
311