1 use super::batch_semaphore as ll; // low level implementation
2 use super::{AcquireError, TryAcquireError};
3 #[cfg(all(tokio_unstable, feature = "tracing"))]
4 use crate::util::trace;
5 use std::sync::Arc;
6 
7 /// Counting semaphore performing asynchronous permit acquisition.
8 ///
9 /// A semaphore maintains a set of permits. Permits are used to synchronize
10 /// access to a shared resource. A semaphore differs from a mutex in that it
11 /// can allow more than one concurrent caller to access the shared resource at a
12 /// time.
13 ///
14 /// When `acquire` is called and the semaphore has remaining permits, the
15 /// function immediately returns a permit. However, if no remaining permits are
16 /// available, `acquire` (asynchronously) waits until an outstanding permit is
17 /// dropped. At this point, the freed permit is assigned to the caller.
18 ///
19 /// This `Semaphore` is fair, which means that permits are given out in the order
20 /// they were requested. This fairness is also applied when `acquire_many` gets
21 /// involved, so if a call to `acquire_many` at the front of the queue requests
22 /// more permits than currently available, this can prevent a call to `acquire`
23 /// from completing, even if the semaphore has enough permits complete the call
24 /// to `acquire`.
25 ///
26 /// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
27 /// utility.
28 ///
29 /// # Examples
30 ///
31 /// Basic usage:
32 ///
33 /// ```
34 /// use tokio::sync::{Semaphore, TryAcquireError};
35 ///
36 /// #[tokio::main]
37 /// async fn main() {
38 ///     let semaphore = Semaphore::new(3);
39 ///
40 ///     let a_permit = semaphore.acquire().await.unwrap();
41 ///     let two_permits = semaphore.acquire_many(2).await.unwrap();
42 ///
43 ///     assert_eq!(semaphore.available_permits(), 0);
44 ///
45 ///     let permit_attempt = semaphore.try_acquire();
46 ///     assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
47 /// }
48 /// ```
49 ///
50 /// ## Limit the number of simultaneously opened files in your program
51 ///
52 /// Most operating systems have limits on the number of open file
53 /// handles. Even in systems without explicit limits, resource constraints
54 /// implicitly set an upper bound on the number of open files. If your
55 /// program attempts to open a large number of files and exceeds this
56 /// limit, it will result in an error.
57 ///
58 /// This example uses a Semaphore with 100 permits. By acquiring a permit from
59 /// the Semaphore before accessing a file, you ensure that your program opens
60 /// no more than 100 files at a time. When trying to open the 101st
61 /// file, the program will wait until a permit becomes available before
62 /// proceeding to open another file.
63 /// ```
64 /// use std::io::Result;
65 /// use tokio::fs::File;
66 /// use tokio::sync::Semaphore;
67 /// use tokio::io::AsyncWriteExt;
68 ///
69 /// static PERMITS: Semaphore = Semaphore::const_new(100);
70 ///
71 /// async fn write_to_file(message: &[u8]) -> Result<()> {
72 ///     let _permit = PERMITS.acquire().await.unwrap();
73 ///     let mut buffer = File::create("example.txt").await?;
74 ///     buffer.write_all(message).await?;
75 ///     Ok(()) // Permit goes out of scope here, and is available again for acquisition
76 /// }
77 /// ```
78 ///
79 /// ## Limit the number of outgoing requests being sent at the same time
80 ///
81 /// In some scenarios, it might be required to limit the number of outgoing
82 /// requests being sent in parallel. This could be due to limits of a consumed
83 /// API or the network resources of the system the application is running on.
84 ///
85 /// This example uses an `Arc<Semaphore>` with 10 permits. Each task spawned is
86 /// given a reference to the semaphore by cloning the `Arc<Semaphore>`. Before
87 /// a task sends a request, it must acquire a permit from the semaphore by
88 /// calling [`Semaphore::acquire`]. This ensures that at most 10 requests are
89 /// sent in parallel at any given time. After a task has sent a request, it
90 /// drops the permit to allow other tasks to send requests.
91 ///
92 /// ```
93 /// use std::sync::Arc;
94 /// use tokio::sync::Semaphore;
95 ///
96 /// #[tokio::main]
97 /// async fn main() {
98 ///     // Define maximum number of parallel requests.
99 ///     let semaphore = Arc::new(Semaphore::new(10));
100 ///     // Spawn many tasks that will send requests.
101 ///     let mut jhs = Vec::new();
102 ///     for task_id in 0..100 {
103 ///         let semaphore = semaphore.clone();
104 ///         let jh = tokio::spawn(async move {
105 ///             // Acquire permit before sending request.
106 ///             let _permit = semaphore.acquire().await.unwrap();
107 ///             // Send the request.
108 ///             let response = send_request(task_id).await;
109 ///             // Drop the permit after the request has been sent.
110 ///             drop(_permit);
111 ///             // Handle response.
112 ///             // ...
113 ///
114 ///             response
115 ///         });
116 ///         jhs.push(jh);
117 ///     }
118 ///     // Collect responses from tasks.
119 ///     let mut responses = Vec::new();
120 ///     for jh in jhs {
121 ///         let response = jh.await.unwrap();
122 ///         responses.push(response);
123 ///     }
124 ///     // Process responses.
125 ///     // ...
126 /// }
127 /// # async fn send_request(task_id: usize) {
128 /// #     // Send request.
129 /// # }
130 /// ```
131 ///
132 /// ## Limit the number of incoming requests being handled at the same time
133 ///
134 /// Similar to limiting the number of simultaneously opened files, network handles
135 /// are a limited resource. Allowing an unbounded amount of requests to be processed
136 /// could result in a denial-of-service, among many other issues.
137 ///
138 /// This example uses an `Arc<Semaphore>` instead of a global variable.
139 /// To limit the number of requests that can be processed at the time,
140 /// we acquire a permit for each task before spawning it. Once acquired,
141 /// a new task is spawned; and once finished, the permit is dropped inside
142 /// of the task to allow others to spawn. Permits must be acquired via
143 /// [`Semaphore::acquire_owned`] to be movable across the task boundary.
144 /// (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.)
145 ///
146 /// ```no_run
147 /// use std::sync::Arc;
148 /// use tokio::sync::Semaphore;
149 /// use tokio::net::TcpListener;
150 ///
151 /// #[tokio::main]
152 /// async fn main() -> std::io::Result<()> {
153 ///     let semaphore = Arc::new(Semaphore::new(3));
154 ///     let listener = TcpListener::bind("127.0.0.1:8080").await?;
155 ///
156 ///     loop {
157 ///         // Acquire permit before accepting the next socket.
158 ///         //
159 ///         // We use `acquire_owned` so that we can move `permit` into
160 ///         // other tasks.
161 ///         let permit = semaphore.clone().acquire_owned().await.unwrap();
162 ///         let (mut socket, _) = listener.accept().await?;
163 ///
164 ///         tokio::spawn(async move {
165 ///             // Do work using the socket.
166 ///             handle_connection(&mut socket).await;
167 ///             // Drop socket while the permit is still live.
168 ///             drop(socket);
169 ///             // Drop the permit, so more tasks can be created.
170 ///             drop(permit);
171 ///         });
172 ///     }
173 /// }
174 /// # async fn handle_connection(_socket: &mut tokio::net::TcpStream) {
175 /// #   // Do work
176 /// # }
177 /// ```
178 ///
179 /// ## Prevent tests from running in parallel
180 ///
181 /// By default, Rust runs tests in the same file in parallel. However, in some
182 /// cases, running two tests in parallel may lead to problems. For example, this
183 /// can happen when tests use the same database.
184 ///
185 /// Consider the following scenario:
186 /// 1. `test_insert`: Inserts a key-value pair into the database, then retrieves
187 ///    the value using the same key to verify the insertion.
188 /// 2. `test_update`: Inserts a key, then updates the key to a new value and
189 ///    verifies that the value has been accurately updated.
190 /// 3. `test_others`: A third test that doesn't modify the database state. It
191 ///    can run in parallel with the other tests.
192 ///
193 /// In this example, `test_insert` and `test_update` need to run in sequence to
194 /// work, but it doesn't matter which test runs first. We can leverage a
195 /// semaphore with a single permit to address this challenge.
196 ///
197 /// ```
198 /// # use tokio::sync::Mutex;
199 /// # use std::collections::BTreeMap;
200 /// # struct Database {
201 /// #   map: Mutex<BTreeMap<String, i32>>,
202 /// # }
203 /// # impl Database {
204 /// #    pub const fn setup() -> Database {
205 /// #        Database {
206 /// #            map: Mutex::const_new(BTreeMap::new()),
207 /// #        }
208 /// #    }
209 /// #    pub async fn insert(&self, key: &str, value: i32) {
210 /// #        self.map.lock().await.insert(key.to_string(), value);
211 /// #    }
212 /// #    pub async fn update(&self, key: &str, value: i32) {
213 /// #        self.map.lock().await
214 /// #            .entry(key.to_string())
215 /// #            .and_modify(|origin| *origin = value);
216 /// #    }
217 /// #    pub async fn delete(&self, key: &str) {
218 /// #        self.map.lock().await.remove(key);
219 /// #    }
220 /// #    pub async fn get(&self, key: &str) -> i32 {
221 /// #        *self.map.lock().await.get(key).unwrap()
222 /// #    }
223 /// # }
224 /// use tokio::sync::Semaphore;
225 ///
226 /// // Initialize a static semaphore with only one permit, which is used to
227 /// // prevent test_insert and test_update from running in parallel.
228 /// static PERMIT: Semaphore = Semaphore::const_new(1);
229 ///
230 /// // Initialize the database that will be used by the subsequent tests.
231 /// static DB: Database = Database::setup();
232 ///
233 /// #[tokio::test]
234 /// # async fn fake_test_insert() {}
235 /// async fn test_insert() {
236 ///     // Acquire permit before proceeding. Since the semaphore has only one permit,
237 ///     // the test will wait if the permit is already acquired by other tests.
238 ///     let permit = PERMIT.acquire().await.unwrap();
239 ///
240 ///     // Do the actual test stuff with database
241 ///
242 ///     // Insert a key-value pair to database
243 ///     let (key, value) = ("name", 0);
244 ///     DB.insert(key, value).await;
245 ///
246 ///     // Verify that the value has been inserted correctly.
247 ///     assert_eq!(DB.get(key).await, value);
248 ///
249 ///     // Undo the insertion, so the database is empty at the end of the test.
250 ///     DB.delete(key).await;
251 ///
252 ///     // Drop permit. This allows the other test to start running.
253 ///     drop(permit);
254 /// }
255 ///
256 /// #[tokio::test]
257 /// # async fn fake_test_update() {}
258 /// async fn test_update() {
259 ///     // Acquire permit before proceeding. Since the semaphore has only one permit,
260 ///     // the test will wait if the permit is already acquired by other tests.
261 ///     let permit = PERMIT.acquire().await.unwrap();
262 ///
263 ///     // Do the same insert.
264 ///     let (key, value) = ("name", 0);
265 ///     DB.insert(key, value).await;
266 ///
267 ///     // Update the existing value with a new one.
268 ///     let new_value = 1;
269 ///     DB.update(key, new_value).await;
270 ///
271 ///     // Verify that the value has been updated correctly.
272 ///     assert_eq!(DB.get(key).await, new_value);
273 ///
274 ///     // Undo any modificattion.
275 ///     DB.delete(key).await;
276 ///
277 ///     // Drop permit. This allows the other test to start running.
278 ///     drop(permit);
279 /// }
280 ///
281 /// #[tokio::test]
282 /// # async fn fake_test_others() {}
283 /// async fn test_others() {
284 ///     // This test can run in parallel with test_insert and test_update,
285 ///     // so it does not use PERMIT.
286 /// }
287 /// # #[tokio::main(flavor = "current_thread")]
288 /// # async fn main() {
289 /// #   test_insert().await;
290 /// #   test_update().await;
291 /// #   test_others().await;
292 /// # }
293 /// ```
294 ///
295 /// ## Rate limiting using a token bucket
296 ///
297 /// This example showcases the [`add_permits`] and [`SemaphorePermit::forget`] methods.
298 ///
299 /// Many applications and systems have constraints on the rate at which certain
300 /// operations should occur. Exceeding this rate can result in suboptimal
301 /// performance or even errors.
302 ///
303 /// This example implements rate limiting using a [token bucket]. A token bucket is a form of rate
304 /// limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that
305 /// arrive at the same time.
306 ///
307 /// With a token bucket, each incoming request consumes a token, and the tokens are refilled at a
308 /// certain rate that defines the rate limit. When a burst of requests arrives, tokens are
309 /// immediately given out until the bucket is empty. Once the bucket is empty, requests will have to
310 /// wait for new tokens to be added.
311 ///
312 /// Unlike the example that limits how many requests can be handled at the same time, we do not add
313 /// tokens back when we finish handling a request. Instead, tokens are added only by a timer task.
314 ///
315 /// Note that this implementation is suboptimal when the duration is small, because it consumes a
316 /// lot of cpu constantly looping and sleeping.
317 ///
318 /// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
319 /// [`add_permits`]: crate::sync::Semaphore::add_permits
320 /// [`SemaphorePermit::forget`]: crate::sync::SemaphorePermit::forget
321 /// ```
322 /// use std::sync::Arc;
323 /// use tokio::sync::Semaphore;
324 /// use tokio::time::{interval, Duration};
325 ///
326 /// struct TokenBucket {
327 ///     sem: Arc<Semaphore>,
328 ///     jh: tokio::task::JoinHandle<()>,
329 /// }
330 ///
331 /// impl TokenBucket {
332 ///     fn new(duration: Duration, capacity: usize) -> Self {
333 ///         let sem = Arc::new(Semaphore::new(capacity));
334 ///
335 ///         // refills the tokens at the end of each interval
336 ///         let jh = tokio::spawn({
337 ///             let sem = sem.clone();
338 ///             let mut interval = interval(duration);
339 ///             interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
340 ///
341 ///             async move {
342 ///                 loop {
343 ///                     interval.tick().await;
344 ///
345 ///                     if sem.available_permits() < capacity {
346 ///                         sem.add_permits(1);
347 ///                     }
348 ///                 }
349 ///             }
350 ///         });
351 ///
352 ///         Self { jh, sem }
353 ///     }
354 ///
355 ///     async fn acquire(&self) {
356 ///         // This can return an error if the semaphore is closed, but we
357 ///         // never close it, so this error can never happen.
358 ///         let permit = self.sem.acquire().await.unwrap();
359 ///         // To avoid releasing the permit back to the semaphore, we use
360 ///         // the `SemaphorePermit::forget` method.
361 ///         permit.forget();
362 ///     }
363 /// }
364 ///
365 /// impl Drop for TokenBucket {
366 ///     fn drop(&mut self) {
367 ///         // Kill the background task so it stops taking up resources when we
368 ///         // don't need it anymore.
369 ///         self.jh.abort();
370 ///     }
371 /// }
372 ///
373 /// #[tokio::main]
374 /// # async fn _hidden() {}
375 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
376 /// async fn main() {
377 ///     let capacity = 5;
378 ///     let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
379 ///     let bucket = TokenBucket::new(update_interval, capacity);
380 ///
381 ///     for _ in 0..5 {
382 ///         bucket.acquire().await;
383 ///
384 ///         // do the operation
385 ///     }
386 /// }
387 /// ```
388 ///
389 /// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html
390 /// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
391 #[derive(Debug)]
392 pub struct Semaphore {
393     /// The low level semaphore
394     ll_sem: ll::Semaphore,
395     #[cfg(all(tokio_unstable, feature = "tracing"))]
396     resource_span: tracing::Span,
397 }
398 
399 /// A permit from the semaphore.
400 ///
401 /// This type is created by the [`acquire`] method.
402 ///
403 /// [`acquire`]: crate::sync::Semaphore::acquire()
404 #[must_use]
405 #[clippy::has_significant_drop]
406 #[derive(Debug)]
407 pub struct SemaphorePermit<'a> {
408     sem: &'a Semaphore,
409     permits: u32,
410 }
411 
412 /// An owned permit from the semaphore.
413 ///
414 /// This type is created by the [`acquire_owned`] method.
415 ///
416 /// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
417 #[must_use]
418 #[clippy::has_significant_drop]
419 #[derive(Debug)]
420 pub struct OwnedSemaphorePermit {
421     sem: Arc<Semaphore>,
422     permits: u32,
423 }
424 
425 #[test]
426 #[cfg(not(loom))]
bounds()427 fn bounds() {
428     fn check_unpin<T: Unpin>() {}
429     // This has to take a value, since the async fn's return type is unnameable.
430     fn check_send_sync_val<T: Send + Sync>(_t: T) {}
431     fn check_send_sync<T: Send + Sync>() {}
432     check_unpin::<Semaphore>();
433     check_unpin::<SemaphorePermit<'_>>();
434     check_send_sync::<Semaphore>();
435 
436     let semaphore = Semaphore::new(0);
437     check_send_sync_val(semaphore.acquire());
438 }
439 
440 impl Semaphore {
441     /// The maximum number of permits which a semaphore can hold. It is `usize::MAX >> 3`.
442     ///
443     /// Exceeding this limit typically results in a panic.
444     pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS;
445 
446     /// Creates a new semaphore with the initial number of permits.
447     ///
448     /// Panics if `permits` exceeds [`Semaphore::MAX_PERMITS`].
449     #[track_caller]
new(permits: usize) -> Self450     pub fn new(permits: usize) -> Self {
451         #[cfg(all(tokio_unstable, feature = "tracing"))]
452         let resource_span = {
453             let location = std::panic::Location::caller();
454 
455             tracing::trace_span!(
456                 parent: None,
457                 "runtime.resource",
458                 concrete_type = "Semaphore",
459                 kind = "Sync",
460                 loc.file = location.file(),
461                 loc.line = location.line(),
462                 loc.col = location.column(),
463                 inherits_child_attrs = true,
464             )
465         };
466 
467         #[cfg(all(tokio_unstable, feature = "tracing"))]
468         let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits));
469 
470         #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
471         let ll_sem = ll::Semaphore::new(permits);
472 
473         Self {
474             ll_sem,
475             #[cfg(all(tokio_unstable, feature = "tracing"))]
476             resource_span,
477         }
478     }
479 
480     /// Creates a new semaphore with the initial number of permits.
481     ///
482     /// When using the `tracing` [unstable feature], a `Semaphore` created with
483     /// `const_new` will not be instrumented. As such, it will not be visible
484     /// in [`tokio-console`]. Instead, [`Semaphore::new`] should be used to
485     /// create an instrumented object if that is needed.
486     ///
487     /// # Examples
488     ///
489     /// ```
490     /// use tokio::sync::Semaphore;
491     ///
492     /// static SEM: Semaphore = Semaphore::const_new(10);
493     /// ```
494     ///
495     /// [`tokio-console`]: https://github.com/tokio-rs/console
496     /// [unstable feature]: crate#unstable-features
497     #[cfg(not(all(loom, test)))]
const_new(permits: usize) -> Self498     pub const fn const_new(permits: usize) -> Self {
499         Self {
500             ll_sem: ll::Semaphore::const_new(permits),
501             #[cfg(all(tokio_unstable, feature = "tracing"))]
502             resource_span: tracing::Span::none(),
503         }
504     }
505 
506     /// Creates a new closed semaphore with 0 permits.
new_closed() -> Self507     pub(crate) fn new_closed() -> Self {
508         Self {
509             ll_sem: ll::Semaphore::new_closed(),
510             #[cfg(all(tokio_unstable, feature = "tracing"))]
511             resource_span: tracing::Span::none(),
512         }
513     }
514 
515     /// Creates a new closed semaphore with 0 permits.
516     #[cfg(not(all(loom, test)))]
const_new_closed() -> Self517     pub(crate) const fn const_new_closed() -> Self {
518         Self {
519             ll_sem: ll::Semaphore::const_new_closed(),
520             #[cfg(all(tokio_unstable, feature = "tracing"))]
521             resource_span: tracing::Span::none(),
522         }
523     }
524 
525     /// Returns the current number of available permits.
available_permits(&self) -> usize526     pub fn available_permits(&self) -> usize {
527         self.ll_sem.available_permits()
528     }
529 
530     /// Adds `n` new permits to the semaphore.
531     ///
532     /// The maximum number of permits is [`Semaphore::MAX_PERMITS`], and this function will panic if the limit is exceeded.
add_permits(&self, n: usize)533     pub fn add_permits(&self, n: usize) {
534         self.ll_sem.release(n);
535     }
536 
537     /// Decrease a semaphore's permits by a maximum of `n`.
538     ///
539     /// If there are insufficient permits and it's not possible to reduce by `n`,
540     /// return the number of permits that were actually reduced.
forget_permits(&self, n: usize) -> usize541     pub fn forget_permits(&self, n: usize) -> usize {
542         self.ll_sem.forget_permits(n)
543     }
544 
545     /// Acquires a permit from the semaphore.
546     ///
547     /// If the semaphore has been closed, this returns an [`AcquireError`].
548     /// Otherwise, this returns a [`SemaphorePermit`] representing the
549     /// acquired permit.
550     ///
551     /// # Cancel safety
552     ///
553     /// This method uses a queue to fairly distribute permits in the order they
554     /// were requested. Cancelling a call to `acquire` makes you lose your place
555     /// in the queue.
556     ///
557     /// # Examples
558     ///
559     /// ```
560     /// use tokio::sync::Semaphore;
561     ///
562     /// #[tokio::main]
563     /// async fn main() {
564     ///     let semaphore = Semaphore::new(2);
565     ///
566     ///     let permit_1 = semaphore.acquire().await.unwrap();
567     ///     assert_eq!(semaphore.available_permits(), 1);
568     ///
569     ///     let permit_2 = semaphore.acquire().await.unwrap();
570     ///     assert_eq!(semaphore.available_permits(), 0);
571     ///
572     ///     drop(permit_1);
573     ///     assert_eq!(semaphore.available_permits(), 1);
574     /// }
575     /// ```
576     ///
577     /// [`AcquireError`]: crate::sync::AcquireError
578     /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError>579     pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
580         #[cfg(all(tokio_unstable, feature = "tracing"))]
581         let inner = trace::async_op(
582             || self.ll_sem.acquire(1),
583             self.resource_span.clone(),
584             "Semaphore::acquire",
585             "poll",
586             true,
587         );
588         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
589         let inner = self.ll_sem.acquire(1);
590 
591         inner.await?;
592         Ok(SemaphorePermit {
593             sem: self,
594             permits: 1,
595         })
596     }
597 
598     /// Acquires `n` permits from the semaphore.
599     ///
600     /// If the semaphore has been closed, this returns an [`AcquireError`].
601     /// Otherwise, this returns a [`SemaphorePermit`] representing the
602     /// acquired permits.
603     ///
604     /// # Cancel safety
605     ///
606     /// This method uses a queue to fairly distribute permits in the order they
607     /// were requested. Cancelling a call to `acquire_many` makes you lose your
608     /// place in the queue.
609     ///
610     /// # Examples
611     ///
612     /// ```
613     /// use tokio::sync::Semaphore;
614     ///
615     /// #[tokio::main]
616     /// async fn main() {
617     ///     let semaphore = Semaphore::new(5);
618     ///
619     ///     let permit = semaphore.acquire_many(3).await.unwrap();
620     ///     assert_eq!(semaphore.available_permits(), 2);
621     /// }
622     /// ```
623     ///
624     /// [`AcquireError`]: crate::sync::AcquireError
625     /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError>626     pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
627         #[cfg(all(tokio_unstable, feature = "tracing"))]
628         trace::async_op(
629             || self.ll_sem.acquire(n as usize),
630             self.resource_span.clone(),
631             "Semaphore::acquire_many",
632             "poll",
633             true,
634         )
635         .await?;
636 
637         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
638         self.ll_sem.acquire(n as usize).await?;
639 
640         Ok(SemaphorePermit {
641             sem: self,
642             permits: n,
643         })
644     }
645 
646     /// Tries to acquire a permit from the semaphore.
647     ///
648     /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
649     /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
650     /// this returns a [`SemaphorePermit`] representing the acquired permits.
651     ///
652     /// # Examples
653     ///
654     /// ```
655     /// use tokio::sync::{Semaphore, TryAcquireError};
656     ///
657     /// # fn main() {
658     /// let semaphore = Semaphore::new(2);
659     ///
660     /// let permit_1 = semaphore.try_acquire().unwrap();
661     /// assert_eq!(semaphore.available_permits(), 1);
662     ///
663     /// let permit_2 = semaphore.try_acquire().unwrap();
664     /// assert_eq!(semaphore.available_permits(), 0);
665     ///
666     /// let permit_3 = semaphore.try_acquire();
667     /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
668     /// # }
669     /// ```
670     ///
671     /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
672     /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
673     /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError>674     pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
675         match self.ll_sem.try_acquire(1) {
676             Ok(()) => Ok(SemaphorePermit {
677                 sem: self,
678                 permits: 1,
679             }),
680             Err(e) => Err(e),
681         }
682     }
683 
684     /// Tries to acquire `n` permits from the semaphore.
685     ///
686     /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
687     /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
688     /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
689     ///
690     /// # Examples
691     ///
692     /// ```
693     /// use tokio::sync::{Semaphore, TryAcquireError};
694     ///
695     /// # fn main() {
696     /// let semaphore = Semaphore::new(4);
697     ///
698     /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
699     /// assert_eq!(semaphore.available_permits(), 1);
700     ///
701     /// let permit_2 = semaphore.try_acquire_many(2);
702     /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
703     /// # }
704     /// ```
705     ///
706     /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
707     /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
708     /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError>709     pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
710         match self.ll_sem.try_acquire(n as usize) {
711             Ok(()) => Ok(SemaphorePermit {
712                 sem: self,
713                 permits: n,
714             }),
715             Err(e) => Err(e),
716         }
717     }
718 
719     /// Acquires a permit from the semaphore.
720     ///
721     /// The semaphore must be wrapped in an [`Arc`] to call this method.
722     /// If the semaphore has been closed, this returns an [`AcquireError`].
723     /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
724     /// acquired permit.
725     ///
726     /// # Cancel safety
727     ///
728     /// This method uses a queue to fairly distribute permits in the order they
729     /// were requested. Cancelling a call to `acquire_owned` makes you lose your
730     /// place in the queue.
731     ///
732     /// # Examples
733     ///
734     /// ```
735     /// use std::sync::Arc;
736     /// use tokio::sync::Semaphore;
737     ///
738     /// #[tokio::main]
739     /// async fn main() {
740     ///     let semaphore = Arc::new(Semaphore::new(3));
741     ///     let mut join_handles = Vec::new();
742     ///
743     ///     for _ in 0..5 {
744     ///         let permit = semaphore.clone().acquire_owned().await.unwrap();
745     ///         join_handles.push(tokio::spawn(async move {
746     ///             // perform task...
747     ///             // explicitly own `permit` in the task
748     ///             drop(permit);
749     ///         }));
750     ///     }
751     ///
752     ///     for handle in join_handles {
753     ///         handle.await.unwrap();
754     ///     }
755     /// }
756     /// ```
757     ///
758     /// [`Arc`]: std::sync::Arc
759     /// [`AcquireError`]: crate::sync::AcquireError
760     /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError>761     pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
762         #[cfg(all(tokio_unstable, feature = "tracing"))]
763         let inner = trace::async_op(
764             || self.ll_sem.acquire(1),
765             self.resource_span.clone(),
766             "Semaphore::acquire_owned",
767             "poll",
768             true,
769         );
770         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
771         let inner = self.ll_sem.acquire(1);
772 
773         inner.await?;
774         Ok(OwnedSemaphorePermit {
775             sem: self,
776             permits: 1,
777         })
778     }
779 
780     /// Acquires `n` permits from the semaphore.
781     ///
782     /// The semaphore must be wrapped in an [`Arc`] to call this method.
783     /// If the semaphore has been closed, this returns an [`AcquireError`].
784     /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
785     /// acquired permit.
786     ///
787     /// # Cancel safety
788     ///
789     /// This method uses a queue to fairly distribute permits in the order they
790     /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
791     /// your place in the queue.
792     ///
793     /// # Examples
794     ///
795     /// ```
796     /// use std::sync::Arc;
797     /// use tokio::sync::Semaphore;
798     ///
799     /// #[tokio::main]
800     /// async fn main() {
801     ///     let semaphore = Arc::new(Semaphore::new(10));
802     ///     let mut join_handles = Vec::new();
803     ///
804     ///     for _ in 0..5 {
805     ///         let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
806     ///         join_handles.push(tokio::spawn(async move {
807     ///             // perform task...
808     ///             // explicitly own `permit` in the task
809     ///             drop(permit);
810     ///         }));
811     ///     }
812     ///
813     ///     for handle in join_handles {
814     ///         handle.await.unwrap();
815     ///     }
816     /// }
817     /// ```
818     ///
819     /// [`Arc`]: std::sync::Arc
820     /// [`AcquireError`]: crate::sync::AcquireError
821     /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, AcquireError>822     pub async fn acquire_many_owned(
823         self: Arc<Self>,
824         n: u32,
825     ) -> Result<OwnedSemaphorePermit, AcquireError> {
826         #[cfg(all(tokio_unstable, feature = "tracing"))]
827         let inner = trace::async_op(
828             || self.ll_sem.acquire(n as usize),
829             self.resource_span.clone(),
830             "Semaphore::acquire_many_owned",
831             "poll",
832             true,
833         );
834         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
835         let inner = self.ll_sem.acquire(n as usize);
836 
837         inner.await?;
838         Ok(OwnedSemaphorePermit {
839             sem: self,
840             permits: n,
841         })
842     }
843 
844     /// Tries to acquire a permit from the semaphore.
845     ///
846     /// The semaphore must be wrapped in an [`Arc`] to call this method. If
847     /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
848     /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
849     /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
850     /// acquired permit.
851     ///
852     /// # Examples
853     ///
854     /// ```
855     /// use std::sync::Arc;
856     /// use tokio::sync::{Semaphore, TryAcquireError};
857     ///
858     /// # fn main() {
859     /// let semaphore = Arc::new(Semaphore::new(2));
860     ///
861     /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
862     /// assert_eq!(semaphore.available_permits(), 1);
863     ///
864     /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
865     /// assert_eq!(semaphore.available_permits(), 0);
866     ///
867     /// let permit_3 = semaphore.try_acquire_owned();
868     /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
869     /// # }
870     /// ```
871     ///
872     /// [`Arc`]: std::sync::Arc
873     /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
874     /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
875     /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError>876     pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
877         match self.ll_sem.try_acquire(1) {
878             Ok(()) => Ok(OwnedSemaphorePermit {
879                 sem: self,
880                 permits: 1,
881             }),
882             Err(e) => Err(e),
883         }
884     }
885 
886     /// Tries to acquire `n` permits from the semaphore.
887     ///
888     /// The semaphore must be wrapped in an [`Arc`] to call this method. If
889     /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
890     /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
891     /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
892     /// acquired permit.
893     ///
894     /// # Examples
895     ///
896     /// ```
897     /// use std::sync::Arc;
898     /// use tokio::sync::{Semaphore, TryAcquireError};
899     ///
900     /// # fn main() {
901     /// let semaphore = Arc::new(Semaphore::new(4));
902     ///
903     /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
904     /// assert_eq!(semaphore.available_permits(), 1);
905     ///
906     /// let permit_2 = semaphore.try_acquire_many_owned(2);
907     /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
908     /// # }
909     /// ```
910     ///
911     /// [`Arc`]: std::sync::Arc
912     /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
913     /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
914     /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
try_acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, TryAcquireError>915     pub fn try_acquire_many_owned(
916         self: Arc<Self>,
917         n: u32,
918     ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
919         match self.ll_sem.try_acquire(n as usize) {
920             Ok(()) => Ok(OwnedSemaphorePermit {
921                 sem: self,
922                 permits: n,
923             }),
924             Err(e) => Err(e),
925         }
926     }
927 
928     /// Closes the semaphore.
929     ///
930     /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
931     ///
932     /// # Examples
933     ///
934     /// ```
935     /// use tokio::sync::Semaphore;
936     /// use std::sync::Arc;
937     /// use tokio::sync::TryAcquireError;
938     ///
939     /// #[tokio::main]
940     /// async fn main() {
941     ///     let semaphore = Arc::new(Semaphore::new(1));
942     ///     let semaphore2 = semaphore.clone();
943     ///
944     ///     tokio::spawn(async move {
945     ///         let permit = semaphore.acquire_many(2).await;
946     ///         assert!(permit.is_err());
947     ///         println!("waiter received error");
948     ///     });
949     ///
950     ///     println!("closing semaphore");
951     ///     semaphore2.close();
952     ///
953     ///     // Cannot obtain more permits
954     ///     assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
955     /// }
956     /// ```
close(&self)957     pub fn close(&self) {
958         self.ll_sem.close();
959     }
960 
961     /// Returns true if the semaphore is closed
is_closed(&self) -> bool962     pub fn is_closed(&self) -> bool {
963         self.ll_sem.is_closed()
964     }
965 }
966 
967 impl<'a> SemaphorePermit<'a> {
968     /// Forgets the permit **without** releasing it back to the semaphore.
969     /// This can be used to reduce the amount of permits available from a
970     /// semaphore.
971     ///
972     /// # Examples
973     ///
974     /// ```
975     /// use std::sync::Arc;
976     /// use tokio::sync::Semaphore;
977     ///
978     /// let sem = Arc::new(Semaphore::new(10));
979     /// {
980     ///     let permit = sem.try_acquire_many(5).unwrap();
981     ///     assert_eq!(sem.available_permits(), 5);
982     ///     permit.forget();
983     /// }
984     ///
985     /// // Since we forgot the permit, available permits won't go back to its initial value
986     /// // even after the permit is dropped.
987     /// assert_eq!(sem.available_permits(), 5);
988     /// ```
forget(mut self)989     pub fn forget(mut self) {
990         self.permits = 0;
991     }
992 
993     /// Merge two [`SemaphorePermit`] instances together, consuming `other`
994     /// without releasing the permits it holds.
995     ///
996     /// Permits held by both `self` and `other` are released when `self` drops.
997     ///
998     /// # Panics
999     ///
1000     /// This function panics if permits from different [`Semaphore`] instances
1001     /// are merged.
1002     ///
1003     /// # Examples
1004     ///
1005     /// ```
1006     /// use std::sync::Arc;
1007     /// use tokio::sync::Semaphore;
1008     ///
1009     /// let sem = Arc::new(Semaphore::new(10));
1010     /// let mut permit = sem.try_acquire().unwrap();
1011     ///
1012     /// for _ in 0..9 {
1013     ///     let _permit = sem.try_acquire().unwrap();
1014     ///     // Merge individual permits into a single one.
1015     ///     permit.merge(_permit)
1016     /// }
1017     ///
1018     /// assert_eq!(sem.available_permits(), 0);
1019     ///
1020     /// // Release all permits in a single batch.
1021     /// drop(permit);
1022     ///
1023     /// assert_eq!(sem.available_permits(), 10);
1024     /// ```
1025     #[track_caller]
merge(&mut self, mut other: Self)1026     pub fn merge(&mut self, mut other: Self) {
1027         assert!(
1028             std::ptr::eq(self.sem, other.sem),
1029             "merging permits from different semaphore instances"
1030         );
1031         self.permits += other.permits;
1032         other.permits = 0;
1033     }
1034 
1035     /// Splits `n` permits from `self` and returns a new [`SemaphorePermit`] instance that holds `n` permits.
1036     ///
1037     /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1038     ///
1039     /// # Examples
1040     ///
1041     /// ```
1042     /// use std::sync::Arc;
1043     /// use tokio::sync::Semaphore;
1044     ///
1045     /// let sem = Arc::new(Semaphore::new(3));
1046     ///
1047     /// let mut p1 = sem.try_acquire_many(3).unwrap();
1048     /// let p2 = p1.split(1).unwrap();
1049     ///
1050     /// assert_eq!(p1.num_permits(), 2);
1051     /// assert_eq!(p2.num_permits(), 1);
1052     /// ```
split(&mut self, n: usize) -> Option<Self>1053     pub fn split(&mut self, n: usize) -> Option<Self> {
1054         let n = u32::try_from(n).ok()?;
1055 
1056         if n > self.permits {
1057             return None;
1058         }
1059 
1060         self.permits -= n;
1061 
1062         Some(Self {
1063             sem: self.sem,
1064             permits: n,
1065         })
1066     }
1067 
1068     /// Returns the number of permits held by `self`.
num_permits(&self) -> usize1069     pub fn num_permits(&self) -> usize {
1070         self.permits as usize
1071     }
1072 }
1073 
1074 impl OwnedSemaphorePermit {
1075     /// Forgets the permit **without** releasing it back to the semaphore.
1076     /// This can be used to reduce the amount of permits available from a
1077     /// semaphore.
1078     ///
1079     /// # Examples
1080     ///
1081     /// ```
1082     /// use std::sync::Arc;
1083     /// use tokio::sync::Semaphore;
1084     ///
1085     /// let sem = Arc::new(Semaphore::new(10));
1086     /// {
1087     ///     let permit = sem.clone().try_acquire_many_owned(5).unwrap();
1088     ///     assert_eq!(sem.available_permits(), 5);
1089     ///     permit.forget();
1090     /// }
1091     ///
1092     /// // Since we forgot the permit, available permits won't go back to its initial value
1093     /// // even after the permit is dropped.
1094     /// assert_eq!(sem.available_permits(), 5);
1095     /// ```
forget(mut self)1096     pub fn forget(mut self) {
1097         self.permits = 0;
1098     }
1099 
1100     /// Merge two [`OwnedSemaphorePermit`] instances together, consuming `other`
1101     /// without releasing the permits it holds.
1102     ///
1103     /// Permits held by both `self` and `other` are released when `self` drops.
1104     ///
1105     /// # Panics
1106     ///
1107     /// This function panics if permits from different [`Semaphore`] instances
1108     /// are merged.
1109     ///
1110     /// # Examples
1111     ///
1112     /// ```
1113     /// use std::sync::Arc;
1114     /// use tokio::sync::Semaphore;
1115     ///
1116     /// let sem = Arc::new(Semaphore::new(10));
1117     /// let mut permit = sem.clone().try_acquire_owned().unwrap();
1118     ///
1119     /// for _ in 0..9 {
1120     ///     let _permit = sem.clone().try_acquire_owned().unwrap();
1121     ///     // Merge individual permits into a single one.
1122     ///     permit.merge(_permit)
1123     /// }
1124     ///
1125     /// assert_eq!(sem.available_permits(), 0);
1126     ///
1127     /// // Release all permits in a single batch.
1128     /// drop(permit);
1129     ///
1130     /// assert_eq!(sem.available_permits(), 10);
1131     /// ```
1132     #[track_caller]
merge(&mut self, mut other: Self)1133     pub fn merge(&mut self, mut other: Self) {
1134         assert!(
1135             Arc::ptr_eq(&self.sem, &other.sem),
1136             "merging permits from different semaphore instances"
1137         );
1138         self.permits += other.permits;
1139         other.permits = 0;
1140     }
1141 
1142     /// Splits `n` permits from `self` and returns a new [`OwnedSemaphorePermit`] instance that holds `n` permits.
1143     ///
1144     /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1145     ///
1146     /// # Note
1147     ///
1148     /// It will clone the owned `Arc<Semaphore>` to construct the new instance.
1149     ///
1150     /// # Examples
1151     ///
1152     /// ```
1153     /// use std::sync::Arc;
1154     /// use tokio::sync::Semaphore;
1155     ///
1156     /// let sem = Arc::new(Semaphore::new(3));
1157     ///
1158     /// let mut p1 = sem.try_acquire_many_owned(3).unwrap();
1159     /// let p2 = p1.split(1).unwrap();
1160     ///
1161     /// assert_eq!(p1.num_permits(), 2);
1162     /// assert_eq!(p2.num_permits(), 1);
1163     /// ```
split(&mut self, n: usize) -> Option<Self>1164     pub fn split(&mut self, n: usize) -> Option<Self> {
1165         let n = u32::try_from(n).ok()?;
1166 
1167         if n > self.permits {
1168             return None;
1169         }
1170 
1171         self.permits -= n;
1172 
1173         Some(Self {
1174             sem: self.sem.clone(),
1175             permits: n,
1176         })
1177     }
1178 
1179     /// Returns the [`Semaphore`] from which this permit was acquired.
semaphore(&self) -> &Arc<Semaphore>1180     pub fn semaphore(&self) -> &Arc<Semaphore> {
1181         &self.sem
1182     }
1183 
1184     /// Returns the number of permits held by `self`.
num_permits(&self) -> usize1185     pub fn num_permits(&self) -> usize {
1186         self.permits as usize
1187     }
1188 }
1189 
1190 impl Drop for SemaphorePermit<'_> {
drop(&mut self)1191     fn drop(&mut self) {
1192         self.sem.add_permits(self.permits as usize);
1193     }
1194 }
1195 
1196 impl Drop for OwnedSemaphorePermit {
drop(&mut self)1197     fn drop(&mut self) {
1198         self.sem.add_permits(self.permits as usize);
1199     }
1200 }
1201