1 use crate::sync::batch_semaphore::Semaphore;
2 use tokio_test::*;
3
4 const MAX_PERMITS: usize = crate::sync::Semaphore::MAX_PERMITS;
5
6 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
7 use wasm_bindgen_test::wasm_bindgen_test as test;
8
9 #[test]
poll_acquire_one_available()10 fn poll_acquire_one_available() {
11 let s = Semaphore::new(100);
12 assert_eq!(s.available_permits(), 100);
13
14 // Polling for a permit succeeds immediately
15 assert_ready_ok!(task::spawn(s.acquire(1)).poll());
16 assert_eq!(s.available_permits(), 99);
17 }
18
19 #[test]
poll_acquire_many_available()20 fn poll_acquire_many_available() {
21 let s = Semaphore::new(100);
22 assert_eq!(s.available_permits(), 100);
23
24 // Polling for a permit succeeds immediately
25 assert_ready_ok!(task::spawn(s.acquire(5)).poll());
26 assert_eq!(s.available_permits(), 95);
27
28 assert_ready_ok!(task::spawn(s.acquire(5)).poll());
29 assert_eq!(s.available_permits(), 90);
30 }
31
32 #[test]
try_acquire_one_available()33 fn try_acquire_one_available() {
34 let s = Semaphore::new(100);
35 assert_eq!(s.available_permits(), 100);
36
37 assert_ok!(s.try_acquire(1));
38 assert_eq!(s.available_permits(), 99);
39
40 assert_ok!(s.try_acquire(1));
41 assert_eq!(s.available_permits(), 98);
42 }
43
44 #[test]
try_acquire_many_available()45 fn try_acquire_many_available() {
46 let s = Semaphore::new(100);
47 assert_eq!(s.available_permits(), 100);
48
49 assert_ok!(s.try_acquire(5));
50 assert_eq!(s.available_permits(), 95);
51
52 assert_ok!(s.try_acquire(5));
53 assert_eq!(s.available_permits(), 90);
54 }
55
56 #[test]
poll_acquire_one_unavailable()57 fn poll_acquire_one_unavailable() {
58 let s = Semaphore::new(1);
59
60 // Acquire the first permit
61 assert_ready_ok!(task::spawn(s.acquire(1)).poll());
62 assert_eq!(s.available_permits(), 0);
63
64 let mut acquire_2 = task::spawn(s.acquire(1));
65 // Try to acquire the second permit
66 assert_pending!(acquire_2.poll());
67 assert_eq!(s.available_permits(), 0);
68
69 s.release(1);
70
71 assert_eq!(s.available_permits(), 0);
72 assert!(acquire_2.is_woken());
73 assert_ready_ok!(acquire_2.poll());
74 assert_eq!(s.available_permits(), 0);
75
76 s.release(1);
77 assert_eq!(s.available_permits(), 1);
78 }
79
80 #[test]
poll_acquire_many_unavailable()81 fn poll_acquire_many_unavailable() {
82 let s = Semaphore::new(5);
83
84 // Acquire the first permit
85 assert_ready_ok!(task::spawn(s.acquire(1)).poll());
86 assert_eq!(s.available_permits(), 4);
87
88 // Try to acquire the second permit
89 let mut acquire_2 = task::spawn(s.acquire(5));
90 assert_pending!(acquire_2.poll());
91 assert_eq!(s.available_permits(), 0);
92
93 // Try to acquire the third permit
94 let mut acquire_3 = task::spawn(s.acquire(3));
95 assert_pending!(acquire_3.poll());
96 assert_eq!(s.available_permits(), 0);
97
98 s.release(1);
99
100 assert_eq!(s.available_permits(), 0);
101 assert!(acquire_2.is_woken());
102 assert_ready_ok!(acquire_2.poll());
103
104 assert!(!acquire_3.is_woken());
105 assert_eq!(s.available_permits(), 0);
106
107 s.release(1);
108 assert!(!acquire_3.is_woken());
109 assert_eq!(s.available_permits(), 0);
110
111 s.release(2);
112 assert!(acquire_3.is_woken());
113
114 assert_ready_ok!(acquire_3.poll());
115 }
116
117 #[test]
try_acquire_one_unavailable()118 fn try_acquire_one_unavailable() {
119 let s = Semaphore::new(1);
120
121 // Acquire the first permit
122 assert_ok!(s.try_acquire(1));
123 assert_eq!(s.available_permits(), 0);
124
125 assert_err!(s.try_acquire(1));
126
127 s.release(1);
128
129 assert_eq!(s.available_permits(), 1);
130 assert_ok!(s.try_acquire(1));
131
132 s.release(1);
133 assert_eq!(s.available_permits(), 1);
134 }
135
136 #[test]
try_acquire_many_unavailable()137 fn try_acquire_many_unavailable() {
138 let s = Semaphore::new(5);
139
140 // Acquire the first permit
141 assert_ok!(s.try_acquire(1));
142 assert_eq!(s.available_permits(), 4);
143
144 assert_err!(s.try_acquire(5));
145
146 s.release(1);
147 assert_eq!(s.available_permits(), 5);
148
149 assert_ok!(s.try_acquire(5));
150
151 s.release(1);
152 assert_eq!(s.available_permits(), 1);
153
154 s.release(1);
155 assert_eq!(s.available_permits(), 2);
156 }
157
158 #[test]
poll_acquire_one_zero_permits()159 fn poll_acquire_one_zero_permits() {
160 let s = Semaphore::new(0);
161 assert_eq!(s.available_permits(), 0);
162
163 // Try to acquire the permit
164 let mut acquire = task::spawn(s.acquire(1));
165 assert_pending!(acquire.poll());
166
167 s.release(1);
168
169 assert!(acquire.is_woken());
170 assert_ready_ok!(acquire.poll());
171 }
172
173 #[test]
max_permits_doesnt_panic()174 fn max_permits_doesnt_panic() {
175 Semaphore::new(MAX_PERMITS);
176 }
177
178 #[test]
179 #[should_panic]
180 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
validates_max_permits()181 fn validates_max_permits() {
182 Semaphore::new(MAX_PERMITS + 1);
183 }
184
185 #[test]
close_semaphore_prevents_acquire()186 fn close_semaphore_prevents_acquire() {
187 let s = Semaphore::new(5);
188 s.close();
189
190 assert_eq!(5, s.available_permits());
191
192 assert_ready_err!(task::spawn(s.acquire(1)).poll());
193 assert_eq!(5, s.available_permits());
194
195 assert_ready_err!(task::spawn(s.acquire(1)).poll());
196 assert_eq!(5, s.available_permits());
197 }
198
199 #[test]
close_semaphore_notifies_permit1()200 fn close_semaphore_notifies_permit1() {
201 let s = Semaphore::new(0);
202 let mut acquire = task::spawn(s.acquire(1));
203
204 assert_pending!(acquire.poll());
205
206 s.close();
207
208 assert!(acquire.is_woken());
209 assert_ready_err!(acquire.poll());
210 }
211
212 #[test]
close_semaphore_notifies_permit2()213 fn close_semaphore_notifies_permit2() {
214 let s = Semaphore::new(2);
215
216 // Acquire a couple of permits
217 assert_ready_ok!(task::spawn(s.acquire(1)).poll());
218 assert_ready_ok!(task::spawn(s.acquire(1)).poll());
219
220 let mut acquire3 = task::spawn(s.acquire(1));
221 let mut acquire4 = task::spawn(s.acquire(1));
222 assert_pending!(acquire3.poll());
223 assert_pending!(acquire4.poll());
224
225 s.close();
226
227 assert!(acquire3.is_woken());
228 assert!(acquire4.is_woken());
229
230 assert_ready_err!(acquire3.poll());
231 assert_ready_err!(acquire4.poll());
232
233 assert_eq!(0, s.available_permits());
234
235 s.release(1);
236
237 assert_eq!(1, s.available_permits());
238
239 assert_ready_err!(task::spawn(s.acquire(1)).poll());
240
241 s.release(1);
242
243 assert_eq!(2, s.available_permits());
244 }
245
246 #[test]
cancel_acquire_releases_permits()247 fn cancel_acquire_releases_permits() {
248 let s = Semaphore::new(10);
249 s.try_acquire(4).expect("uncontended try_acquire succeeds");
250 assert_eq!(6, s.available_permits());
251
252 let mut acquire = task::spawn(s.acquire(8));
253 assert_pending!(acquire.poll());
254
255 assert_eq!(0, s.available_permits());
256 drop(acquire);
257
258 assert_eq!(6, s.available_permits());
259 assert_ok!(s.try_acquire(6));
260 }
261
262 #[test]
release_permits_at_drop()263 fn release_permits_at_drop() {
264 use crate::sync::semaphore::*;
265 use futures::task::ArcWake;
266 use std::future::Future;
267 use std::sync::Arc;
268
269 let sem = Arc::new(Semaphore::new(1));
270
271 struct ReleaseOnDrop(#[allow(dead_code)] Option<OwnedSemaphorePermit>);
272
273 impl ArcWake for ReleaseOnDrop {
274 fn wake_by_ref(_arc_self: &Arc<Self>) {}
275 }
276
277 let mut fut = Box::pin(async {
278 let _permit = sem.acquire().await.unwrap();
279 });
280
281 // Second iteration shouldn't deadlock.
282 for _ in 0..=1 {
283 let waker = futures::task::waker(Arc::new(ReleaseOnDrop(
284 sem.clone().try_acquire_owned().ok(),
285 )));
286 let mut cx = std::task::Context::from_waker(&waker);
287 assert!(fut.as_mut().poll(&mut cx).is_pending());
288 }
289 }
290
291 #[test]
forget_permits_basic()292 fn forget_permits_basic() {
293 let s = Semaphore::new(10);
294 assert_eq!(s.forget_permits(4), 4);
295 assert_eq!(s.available_permits(), 6);
296 assert_eq!(s.forget_permits(10), 6);
297 assert_eq!(s.available_permits(), 0);
298 }
299
300 #[test]
update_permits_many_times()301 fn update_permits_many_times() {
302 let s = Semaphore::new(5);
303 let mut acquire = task::spawn(s.acquire(7));
304 assert_pending!(acquire.poll());
305 s.release(5);
306 assert_ready_ok!(acquire.poll());
307 assert_eq!(s.available_permits(), 3);
308 assert_eq!(s.forget_permits(3), 3);
309 assert_eq!(s.available_permits(), 0);
310 }
311