1 use super::batch_semaphore as ll; // low level implementation
2 use super::{AcquireError, TryAcquireError};
3 #[cfg(all(tokio_unstable, feature = "tracing"))]
4 use crate::util::trace;
5 use std::sync::Arc;
6
7 /// Counting semaphore performing asynchronous permit acquisition.
8 ///
9 /// A semaphore maintains a set of permits. Permits are used to synchronize
10 /// access to a shared resource. A semaphore differs from a mutex in that it
11 /// can allow more than one concurrent caller to access the shared resource at a
12 /// time.
13 ///
14 /// When `acquire` is called and the semaphore has remaining permits, the
15 /// function immediately returns a permit. However, if no remaining permits are
16 /// available, `acquire` (asynchronously) waits until an outstanding permit is
17 /// dropped. At this point, the freed permit is assigned to the caller.
18 ///
19 /// This `Semaphore` is fair, which means that permits are given out in the order
20 /// they were requested. This fairness is also applied when `acquire_many` gets
21 /// involved, so if a call to `acquire_many` at the front of the queue requests
22 /// more permits than currently available, this can prevent a call to `acquire`
23 /// from completing, even if the semaphore has enough permits complete the call
24 /// to `acquire`.
25 ///
26 /// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
27 /// utility.
28 ///
29 /// # Examples
30 ///
31 /// Basic usage:
32 ///
33 /// ```
34 /// use tokio::sync::{Semaphore, TryAcquireError};
35 ///
36 /// #[tokio::main]
37 /// async fn main() {
38 /// let semaphore = Semaphore::new(3);
39 ///
40 /// let a_permit = semaphore.acquire().await.unwrap();
41 /// let two_permits = semaphore.acquire_many(2).await.unwrap();
42 ///
43 /// assert_eq!(semaphore.available_permits(), 0);
44 ///
45 /// let permit_attempt = semaphore.try_acquire();
46 /// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
47 /// }
48 /// ```
49 ///
50 /// ## Limit the number of simultaneously opened files in your program
51 ///
52 /// Most operating systems have limits on the number of open file
53 /// handles. Even in systems without explicit limits, resource constraints
54 /// implicitly set an upper bound on the number of open files. If your
55 /// program attempts to open a large number of files and exceeds this
56 /// limit, it will result in an error.
57 ///
58 /// This example uses a Semaphore with 100 permits. By acquiring a permit from
59 /// the Semaphore before accessing a file, you ensure that your program opens
60 /// no more than 100 files at a time. When trying to open the 101st
61 /// file, the program will wait until a permit becomes available before
62 /// proceeding to open another file.
63 /// ```
64 /// use std::io::Result;
65 /// use tokio::fs::File;
66 /// use tokio::sync::Semaphore;
67 /// use tokio::io::AsyncWriteExt;
68 ///
69 /// static PERMITS: Semaphore = Semaphore::const_new(100);
70 ///
71 /// async fn write_to_file(message: &[u8]) -> Result<()> {
72 /// let _permit = PERMITS.acquire().await.unwrap();
73 /// let mut buffer = File::create("example.txt").await?;
74 /// buffer.write_all(message).await?;
75 /// Ok(()) // Permit goes out of scope here, and is available again for acquisition
76 /// }
77 /// ```
78 ///
79 /// ## Limit the number of outgoing requests being sent at the same time
80 ///
81 /// In some scenarios, it might be required to limit the number of outgoing
82 /// requests being sent in parallel. This could be due to limits of a consumed
83 /// API or the network resources of the system the application is running on.
84 ///
85 /// This example uses an `Arc<Semaphore>` with 10 permits. Each task spawned is
86 /// given a reference to the semaphore by cloning the `Arc<Semaphore>`. Before
87 /// a task sends a request, it must acquire a permit from the semaphore by
88 /// calling [`Semaphore::acquire`]. This ensures that at most 10 requests are
89 /// sent in parallel at any given time. After a task has sent a request, it
90 /// drops the permit to allow other tasks to send requests.
91 ///
92 /// ```
93 /// use std::sync::Arc;
94 /// use tokio::sync::Semaphore;
95 ///
96 /// #[tokio::main]
97 /// async fn main() {
98 /// // Define maximum number of parallel requests.
99 /// let semaphore = Arc::new(Semaphore::new(10));
100 /// // Spawn many tasks that will send requests.
101 /// let mut jhs = Vec::new();
102 /// for task_id in 0..100 {
103 /// let semaphore = semaphore.clone();
104 /// let jh = tokio::spawn(async move {
105 /// // Acquire permit before sending request.
106 /// let _permit = semaphore.acquire().await.unwrap();
107 /// // Send the request.
108 /// let response = send_request(task_id).await;
109 /// // Drop the permit after the request has been sent.
110 /// drop(_permit);
111 /// // Handle response.
112 /// // ...
113 ///
114 /// response
115 /// });
116 /// jhs.push(jh);
117 /// }
118 /// // Collect responses from tasks.
119 /// let mut responses = Vec::new();
120 /// for jh in jhs {
121 /// let response = jh.await.unwrap();
122 /// responses.push(response);
123 /// }
124 /// // Process responses.
125 /// // ...
126 /// }
127 /// # async fn send_request(task_id: usize) {
128 /// # // Send request.
129 /// # }
130 /// ```
131 ///
132 /// ## Limit the number of incoming requests being handled at the same time
133 ///
134 /// Similar to limiting the number of simultaneously opened files, network handles
135 /// are a limited resource. Allowing an unbounded amount of requests to be processed
136 /// could result in a denial-of-service, among many other issues.
137 ///
138 /// This example uses an `Arc<Semaphore>` instead of a global variable.
139 /// To limit the number of requests that can be processed at the time,
140 /// we acquire a permit for each task before spawning it. Once acquired,
141 /// a new task is spawned; and once finished, the permit is dropped inside
142 /// of the task to allow others to spawn. Permits must be acquired via
143 /// [`Semaphore::acquire_owned`] to be movable across the task boundary.
144 /// (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.)
145 ///
146 /// ```no_run
147 /// use std::sync::Arc;
148 /// use tokio::sync::Semaphore;
149 /// use tokio::net::TcpListener;
150 ///
151 /// #[tokio::main]
152 /// async fn main() -> std::io::Result<()> {
153 /// let semaphore = Arc::new(Semaphore::new(3));
154 /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
155 ///
156 /// loop {
157 /// // Acquire permit before accepting the next socket.
158 /// //
159 /// // We use `acquire_owned` so that we can move `permit` into
160 /// // other tasks.
161 /// let permit = semaphore.clone().acquire_owned().await.unwrap();
162 /// let (mut socket, _) = listener.accept().await?;
163 ///
164 /// tokio::spawn(async move {
165 /// // Do work using the socket.
166 /// handle_connection(&mut socket).await;
167 /// // Drop socket while the permit is still live.
168 /// drop(socket);
169 /// // Drop the permit, so more tasks can be created.
170 /// drop(permit);
171 /// });
172 /// }
173 /// }
174 /// # async fn handle_connection(_socket: &mut tokio::net::TcpStream) {
175 /// # // Do work
176 /// # }
177 /// ```
178 ///
179 /// ## Prevent tests from running in parallel
180 ///
181 /// By default, Rust runs tests in the same file in parallel. However, in some
182 /// cases, running two tests in parallel may lead to problems. For example, this
183 /// can happen when tests use the same database.
184 ///
185 /// Consider the following scenario:
186 /// 1. `test_insert`: Inserts a key-value pair into the database, then retrieves
187 /// the value using the same key to verify the insertion.
188 /// 2. `test_update`: Inserts a key, then updates the key to a new value and
189 /// verifies that the value has been accurately updated.
190 /// 3. `test_others`: A third test that doesn't modify the database state. It
191 /// can run in parallel with the other tests.
192 ///
193 /// In this example, `test_insert` and `test_update` need to run in sequence to
194 /// work, but it doesn't matter which test runs first. We can leverage a
195 /// semaphore with a single permit to address this challenge.
196 ///
197 /// ```
198 /// # use tokio::sync::Mutex;
199 /// # use std::collections::BTreeMap;
200 /// # struct Database {
201 /// # map: Mutex<BTreeMap<String, i32>>,
202 /// # }
203 /// # impl Database {
204 /// # pub const fn setup() -> Database {
205 /// # Database {
206 /// # map: Mutex::const_new(BTreeMap::new()),
207 /// # }
208 /// # }
209 /// # pub async fn insert(&self, key: &str, value: i32) {
210 /// # self.map.lock().await.insert(key.to_string(), value);
211 /// # }
212 /// # pub async fn update(&self, key: &str, value: i32) {
213 /// # self.map.lock().await
214 /// # .entry(key.to_string())
215 /// # .and_modify(|origin| *origin = value);
216 /// # }
217 /// # pub async fn delete(&self, key: &str) {
218 /// # self.map.lock().await.remove(key);
219 /// # }
220 /// # pub async fn get(&self, key: &str) -> i32 {
221 /// # *self.map.lock().await.get(key).unwrap()
222 /// # }
223 /// # }
224 /// use tokio::sync::Semaphore;
225 ///
226 /// // Initialize a static semaphore with only one permit, which is used to
227 /// // prevent test_insert and test_update from running in parallel.
228 /// static PERMIT: Semaphore = Semaphore::const_new(1);
229 ///
230 /// // Initialize the database that will be used by the subsequent tests.
231 /// static DB: Database = Database::setup();
232 ///
233 /// #[tokio::test]
234 /// # async fn fake_test_insert() {}
235 /// async fn test_insert() {
236 /// // Acquire permit before proceeding. Since the semaphore has only one permit,
237 /// // the test will wait if the permit is already acquired by other tests.
238 /// let permit = PERMIT.acquire().await.unwrap();
239 ///
240 /// // Do the actual test stuff with database
241 ///
242 /// // Insert a key-value pair to database
243 /// let (key, value) = ("name", 0);
244 /// DB.insert(key, value).await;
245 ///
246 /// // Verify that the value has been inserted correctly.
247 /// assert_eq!(DB.get(key).await, value);
248 ///
249 /// // Undo the insertion, so the database is empty at the end of the test.
250 /// DB.delete(key).await;
251 ///
252 /// // Drop permit. This allows the other test to start running.
253 /// drop(permit);
254 /// }
255 ///
256 /// #[tokio::test]
257 /// # async fn fake_test_update() {}
258 /// async fn test_update() {
259 /// // Acquire permit before proceeding. Since the semaphore has only one permit,
260 /// // the test will wait if the permit is already acquired by other tests.
261 /// let permit = PERMIT.acquire().await.unwrap();
262 ///
263 /// // Do the same insert.
264 /// let (key, value) = ("name", 0);
265 /// DB.insert(key, value).await;
266 ///
267 /// // Update the existing value with a new one.
268 /// let new_value = 1;
269 /// DB.update(key, new_value).await;
270 ///
271 /// // Verify that the value has been updated correctly.
272 /// assert_eq!(DB.get(key).await, new_value);
273 ///
274 /// // Undo any modificattion.
275 /// DB.delete(key).await;
276 ///
277 /// // Drop permit. This allows the other test to start running.
278 /// drop(permit);
279 /// }
280 ///
281 /// #[tokio::test]
282 /// # async fn fake_test_others() {}
283 /// async fn test_others() {
284 /// // This test can run in parallel with test_insert and test_update,
285 /// // so it does not use PERMIT.
286 /// }
287 /// # #[tokio::main(flavor = "current_thread")]
288 /// # async fn main() {
289 /// # test_insert().await;
290 /// # test_update().await;
291 /// # test_others().await;
292 /// # }
293 /// ```
294 ///
295 /// ## Rate limiting using a token bucket
296 ///
297 /// This example showcases the [`add_permits`] and [`SemaphorePermit::forget`] methods.
298 ///
299 /// Many applications and systems have constraints on the rate at which certain
300 /// operations should occur. Exceeding this rate can result in suboptimal
301 /// performance or even errors.
302 ///
303 /// This example implements rate limiting using a [token bucket]. A token bucket is a form of rate
304 /// limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that
305 /// arrive at the same time.
306 ///
307 /// With a token bucket, each incoming request consumes a token, and the tokens are refilled at a
308 /// certain rate that defines the rate limit. When a burst of requests arrives, tokens are
309 /// immediately given out until the bucket is empty. Once the bucket is empty, requests will have to
310 /// wait for new tokens to be added.
311 ///
312 /// Unlike the example that limits how many requests can be handled at the same time, we do not add
313 /// tokens back when we finish handling a request. Instead, tokens are added only by a timer task.
314 ///
315 /// Note that this implementation is suboptimal when the duration is small, because it consumes a
316 /// lot of cpu constantly looping and sleeping.
317 ///
318 /// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
319 /// [`add_permits`]: crate::sync::Semaphore::add_permits
320 /// [`SemaphorePermit::forget`]: crate::sync::SemaphorePermit::forget
321 /// ```
322 /// use std::sync::Arc;
323 /// use tokio::sync::Semaphore;
324 /// use tokio::time::{interval, Duration};
325 ///
326 /// struct TokenBucket {
327 /// sem: Arc<Semaphore>,
328 /// jh: tokio::task::JoinHandle<()>,
329 /// }
330 ///
331 /// impl TokenBucket {
332 /// fn new(duration: Duration, capacity: usize) -> Self {
333 /// let sem = Arc::new(Semaphore::new(capacity));
334 ///
335 /// // refills the tokens at the end of each interval
336 /// let jh = tokio::spawn({
337 /// let sem = sem.clone();
338 /// let mut interval = interval(duration);
339 /// interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
340 ///
341 /// async move {
342 /// loop {
343 /// interval.tick().await;
344 ///
345 /// if sem.available_permits() < capacity {
346 /// sem.add_permits(1);
347 /// }
348 /// }
349 /// }
350 /// });
351 ///
352 /// Self { jh, sem }
353 /// }
354 ///
355 /// async fn acquire(&self) {
356 /// // This can return an error if the semaphore is closed, but we
357 /// // never close it, so this error can never happen.
358 /// let permit = self.sem.acquire().await.unwrap();
359 /// // To avoid releasing the permit back to the semaphore, we use
360 /// // the `SemaphorePermit::forget` method.
361 /// permit.forget();
362 /// }
363 /// }
364 ///
365 /// impl Drop for TokenBucket {
366 /// fn drop(&mut self) {
367 /// // Kill the background task so it stops taking up resources when we
368 /// // don't need it anymore.
369 /// self.jh.abort();
370 /// }
371 /// }
372 ///
373 /// #[tokio::main]
374 /// # async fn _hidden() {}
375 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
376 /// async fn main() {
377 /// let capacity = 5;
378 /// let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
379 /// let bucket = TokenBucket::new(update_interval, capacity);
380 ///
381 /// for _ in 0..5 {
382 /// bucket.acquire().await;
383 ///
384 /// // do the operation
385 /// }
386 /// }
387 /// ```
388 ///
389 /// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html
390 /// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
391 #[derive(Debug)]
392 pub struct Semaphore {
393 /// The low level semaphore
394 ll_sem: ll::Semaphore,
395 #[cfg(all(tokio_unstable, feature = "tracing"))]
396 resource_span: tracing::Span,
397 }
398
399 /// A permit from the semaphore.
400 ///
401 /// This type is created by the [`acquire`] method.
402 ///
403 /// [`acquire`]: crate::sync::Semaphore::acquire()
404 #[must_use]
405 #[clippy::has_significant_drop]
406 #[derive(Debug)]
407 pub struct SemaphorePermit<'a> {
408 sem: &'a Semaphore,
409 permits: u32,
410 }
411
412 /// An owned permit from the semaphore.
413 ///
414 /// This type is created by the [`acquire_owned`] method.
415 ///
416 /// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
417 #[must_use]
418 #[clippy::has_significant_drop]
419 #[derive(Debug)]
420 pub struct OwnedSemaphorePermit {
421 sem: Arc<Semaphore>,
422 permits: u32,
423 }
424
425 #[test]
426 #[cfg(not(loom))]
bounds()427 fn bounds() {
428 fn check_unpin<T: Unpin>() {}
429 // This has to take a value, since the async fn's return type is unnameable.
430 fn check_send_sync_val<T: Send + Sync>(_t: T) {}
431 fn check_send_sync<T: Send + Sync>() {}
432 check_unpin::<Semaphore>();
433 check_unpin::<SemaphorePermit<'_>>();
434 check_send_sync::<Semaphore>();
435
436 let semaphore = Semaphore::new(0);
437 check_send_sync_val(semaphore.acquire());
438 }
439
440 impl Semaphore {
441 /// The maximum number of permits which a semaphore can hold. It is `usize::MAX >> 3`.
442 ///
443 /// Exceeding this limit typically results in a panic.
444 pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS;
445
446 /// Creates a new semaphore with the initial number of permits.
447 ///
448 /// Panics if `permits` exceeds [`Semaphore::MAX_PERMITS`].
449 #[track_caller]
new(permits: usize) -> Self450 pub fn new(permits: usize) -> Self {
451 #[cfg(all(tokio_unstable, feature = "tracing"))]
452 let resource_span = {
453 let location = std::panic::Location::caller();
454
455 tracing::trace_span!(
456 parent: None,
457 "runtime.resource",
458 concrete_type = "Semaphore",
459 kind = "Sync",
460 loc.file = location.file(),
461 loc.line = location.line(),
462 loc.col = location.column(),
463 inherits_child_attrs = true,
464 )
465 };
466
467 #[cfg(all(tokio_unstable, feature = "tracing"))]
468 let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits));
469
470 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
471 let ll_sem = ll::Semaphore::new(permits);
472
473 Self {
474 ll_sem,
475 #[cfg(all(tokio_unstable, feature = "tracing"))]
476 resource_span,
477 }
478 }
479
480 /// Creates a new semaphore with the initial number of permits.
481 ///
482 /// When using the `tracing` [unstable feature], a `Semaphore` created with
483 /// `const_new` will not be instrumented. As such, it will not be visible
484 /// in [`tokio-console`]. Instead, [`Semaphore::new`] should be used to
485 /// create an instrumented object if that is needed.
486 ///
487 /// # Examples
488 ///
489 /// ```
490 /// use tokio::sync::Semaphore;
491 ///
492 /// static SEM: Semaphore = Semaphore::const_new(10);
493 /// ```
494 ///
495 /// [`tokio-console`]: https://github.com/tokio-rs/console
496 /// [unstable feature]: crate#unstable-features
497 #[cfg(not(all(loom, test)))]
const_new(permits: usize) -> Self498 pub const fn const_new(permits: usize) -> Self {
499 Self {
500 ll_sem: ll::Semaphore::const_new(permits),
501 #[cfg(all(tokio_unstable, feature = "tracing"))]
502 resource_span: tracing::Span::none(),
503 }
504 }
505
506 /// Creates a new closed semaphore with 0 permits.
new_closed() -> Self507 pub(crate) fn new_closed() -> Self {
508 Self {
509 ll_sem: ll::Semaphore::new_closed(),
510 #[cfg(all(tokio_unstable, feature = "tracing"))]
511 resource_span: tracing::Span::none(),
512 }
513 }
514
515 /// Creates a new closed semaphore with 0 permits.
516 #[cfg(not(all(loom, test)))]
const_new_closed() -> Self517 pub(crate) const fn const_new_closed() -> Self {
518 Self {
519 ll_sem: ll::Semaphore::const_new_closed(),
520 #[cfg(all(tokio_unstable, feature = "tracing"))]
521 resource_span: tracing::Span::none(),
522 }
523 }
524
525 /// Returns the current number of available permits.
available_permits(&self) -> usize526 pub fn available_permits(&self) -> usize {
527 self.ll_sem.available_permits()
528 }
529
530 /// Adds `n` new permits to the semaphore.
531 ///
532 /// The maximum number of permits is [`Semaphore::MAX_PERMITS`], and this function will panic if the limit is exceeded.
add_permits(&self, n: usize)533 pub fn add_permits(&self, n: usize) {
534 self.ll_sem.release(n);
535 }
536
537 /// Decrease a semaphore's permits by a maximum of `n`.
538 ///
539 /// If there are insufficient permits and it's not possible to reduce by `n`,
540 /// return the number of permits that were actually reduced.
forget_permits(&self, n: usize) -> usize541 pub fn forget_permits(&self, n: usize) -> usize {
542 self.ll_sem.forget_permits(n)
543 }
544
545 /// Acquires a permit from the semaphore.
546 ///
547 /// If the semaphore has been closed, this returns an [`AcquireError`].
548 /// Otherwise, this returns a [`SemaphorePermit`] representing the
549 /// acquired permit.
550 ///
551 /// # Cancel safety
552 ///
553 /// This method uses a queue to fairly distribute permits in the order they
554 /// were requested. Cancelling a call to `acquire` makes you lose your place
555 /// in the queue.
556 ///
557 /// # Examples
558 ///
559 /// ```
560 /// use tokio::sync::Semaphore;
561 ///
562 /// #[tokio::main]
563 /// async fn main() {
564 /// let semaphore = Semaphore::new(2);
565 ///
566 /// let permit_1 = semaphore.acquire().await.unwrap();
567 /// assert_eq!(semaphore.available_permits(), 1);
568 ///
569 /// let permit_2 = semaphore.acquire().await.unwrap();
570 /// assert_eq!(semaphore.available_permits(), 0);
571 ///
572 /// drop(permit_1);
573 /// assert_eq!(semaphore.available_permits(), 1);
574 /// }
575 /// ```
576 ///
577 /// [`AcquireError`]: crate::sync::AcquireError
578 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError>579 pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
580 #[cfg(all(tokio_unstable, feature = "tracing"))]
581 let inner = trace::async_op(
582 || self.ll_sem.acquire(1),
583 self.resource_span.clone(),
584 "Semaphore::acquire",
585 "poll",
586 true,
587 );
588 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
589 let inner = self.ll_sem.acquire(1);
590
591 inner.await?;
592 Ok(SemaphorePermit {
593 sem: self,
594 permits: 1,
595 })
596 }
597
598 /// Acquires `n` permits from the semaphore.
599 ///
600 /// If the semaphore has been closed, this returns an [`AcquireError`].
601 /// Otherwise, this returns a [`SemaphorePermit`] representing the
602 /// acquired permits.
603 ///
604 /// # Cancel safety
605 ///
606 /// This method uses a queue to fairly distribute permits in the order they
607 /// were requested. Cancelling a call to `acquire_many` makes you lose your
608 /// place in the queue.
609 ///
610 /// # Examples
611 ///
612 /// ```
613 /// use tokio::sync::Semaphore;
614 ///
615 /// #[tokio::main]
616 /// async fn main() {
617 /// let semaphore = Semaphore::new(5);
618 ///
619 /// let permit = semaphore.acquire_many(3).await.unwrap();
620 /// assert_eq!(semaphore.available_permits(), 2);
621 /// }
622 /// ```
623 ///
624 /// [`AcquireError`]: crate::sync::AcquireError
625 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError>626 pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
627 #[cfg(all(tokio_unstable, feature = "tracing"))]
628 trace::async_op(
629 || self.ll_sem.acquire(n as usize),
630 self.resource_span.clone(),
631 "Semaphore::acquire_many",
632 "poll",
633 true,
634 )
635 .await?;
636
637 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
638 self.ll_sem.acquire(n as usize).await?;
639
640 Ok(SemaphorePermit {
641 sem: self,
642 permits: n,
643 })
644 }
645
646 /// Tries to acquire a permit from the semaphore.
647 ///
648 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
649 /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
650 /// this returns a [`SemaphorePermit`] representing the acquired permits.
651 ///
652 /// # Examples
653 ///
654 /// ```
655 /// use tokio::sync::{Semaphore, TryAcquireError};
656 ///
657 /// # fn main() {
658 /// let semaphore = Semaphore::new(2);
659 ///
660 /// let permit_1 = semaphore.try_acquire().unwrap();
661 /// assert_eq!(semaphore.available_permits(), 1);
662 ///
663 /// let permit_2 = semaphore.try_acquire().unwrap();
664 /// assert_eq!(semaphore.available_permits(), 0);
665 ///
666 /// let permit_3 = semaphore.try_acquire();
667 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
668 /// # }
669 /// ```
670 ///
671 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
672 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
673 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError>674 pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
675 match self.ll_sem.try_acquire(1) {
676 Ok(()) => Ok(SemaphorePermit {
677 sem: self,
678 permits: 1,
679 }),
680 Err(e) => Err(e),
681 }
682 }
683
684 /// Tries to acquire `n` permits from the semaphore.
685 ///
686 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
687 /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
688 /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
689 ///
690 /// # Examples
691 ///
692 /// ```
693 /// use tokio::sync::{Semaphore, TryAcquireError};
694 ///
695 /// # fn main() {
696 /// let semaphore = Semaphore::new(4);
697 ///
698 /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
699 /// assert_eq!(semaphore.available_permits(), 1);
700 ///
701 /// let permit_2 = semaphore.try_acquire_many(2);
702 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
703 /// # }
704 /// ```
705 ///
706 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
707 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
708 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError>709 pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
710 match self.ll_sem.try_acquire(n as usize) {
711 Ok(()) => Ok(SemaphorePermit {
712 sem: self,
713 permits: n,
714 }),
715 Err(e) => Err(e),
716 }
717 }
718
719 /// Acquires a permit from the semaphore.
720 ///
721 /// The semaphore must be wrapped in an [`Arc`] to call this method.
722 /// If the semaphore has been closed, this returns an [`AcquireError`].
723 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
724 /// acquired permit.
725 ///
726 /// # Cancel safety
727 ///
728 /// This method uses a queue to fairly distribute permits in the order they
729 /// were requested. Cancelling a call to `acquire_owned` makes you lose your
730 /// place in the queue.
731 ///
732 /// # Examples
733 ///
734 /// ```
735 /// use std::sync::Arc;
736 /// use tokio::sync::Semaphore;
737 ///
738 /// #[tokio::main]
739 /// async fn main() {
740 /// let semaphore = Arc::new(Semaphore::new(3));
741 /// let mut join_handles = Vec::new();
742 ///
743 /// for _ in 0..5 {
744 /// let permit = semaphore.clone().acquire_owned().await.unwrap();
745 /// join_handles.push(tokio::spawn(async move {
746 /// // perform task...
747 /// // explicitly own `permit` in the task
748 /// drop(permit);
749 /// }));
750 /// }
751 ///
752 /// for handle in join_handles {
753 /// handle.await.unwrap();
754 /// }
755 /// }
756 /// ```
757 ///
758 /// [`Arc`]: std::sync::Arc
759 /// [`AcquireError`]: crate::sync::AcquireError
760 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError>761 pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
762 #[cfg(all(tokio_unstable, feature = "tracing"))]
763 let inner = trace::async_op(
764 || self.ll_sem.acquire(1),
765 self.resource_span.clone(),
766 "Semaphore::acquire_owned",
767 "poll",
768 true,
769 );
770 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
771 let inner = self.ll_sem.acquire(1);
772
773 inner.await?;
774 Ok(OwnedSemaphorePermit {
775 sem: self,
776 permits: 1,
777 })
778 }
779
780 /// Acquires `n` permits from the semaphore.
781 ///
782 /// The semaphore must be wrapped in an [`Arc`] to call this method.
783 /// If the semaphore has been closed, this returns an [`AcquireError`].
784 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
785 /// acquired permit.
786 ///
787 /// # Cancel safety
788 ///
789 /// This method uses a queue to fairly distribute permits in the order they
790 /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
791 /// your place in the queue.
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// use std::sync::Arc;
797 /// use tokio::sync::Semaphore;
798 ///
799 /// #[tokio::main]
800 /// async fn main() {
801 /// let semaphore = Arc::new(Semaphore::new(10));
802 /// let mut join_handles = Vec::new();
803 ///
804 /// for _ in 0..5 {
805 /// let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
806 /// join_handles.push(tokio::spawn(async move {
807 /// // perform task...
808 /// // explicitly own `permit` in the task
809 /// drop(permit);
810 /// }));
811 /// }
812 ///
813 /// for handle in join_handles {
814 /// handle.await.unwrap();
815 /// }
816 /// }
817 /// ```
818 ///
819 /// [`Arc`]: std::sync::Arc
820 /// [`AcquireError`]: crate::sync::AcquireError
821 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, AcquireError>822 pub async fn acquire_many_owned(
823 self: Arc<Self>,
824 n: u32,
825 ) -> Result<OwnedSemaphorePermit, AcquireError> {
826 #[cfg(all(tokio_unstable, feature = "tracing"))]
827 let inner = trace::async_op(
828 || self.ll_sem.acquire(n as usize),
829 self.resource_span.clone(),
830 "Semaphore::acquire_many_owned",
831 "poll",
832 true,
833 );
834 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
835 let inner = self.ll_sem.acquire(n as usize);
836
837 inner.await?;
838 Ok(OwnedSemaphorePermit {
839 sem: self,
840 permits: n,
841 })
842 }
843
844 /// Tries to acquire a permit from the semaphore.
845 ///
846 /// The semaphore must be wrapped in an [`Arc`] to call this method. If
847 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
848 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
849 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
850 /// acquired permit.
851 ///
852 /// # Examples
853 ///
854 /// ```
855 /// use std::sync::Arc;
856 /// use tokio::sync::{Semaphore, TryAcquireError};
857 ///
858 /// # fn main() {
859 /// let semaphore = Arc::new(Semaphore::new(2));
860 ///
861 /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
862 /// assert_eq!(semaphore.available_permits(), 1);
863 ///
864 /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
865 /// assert_eq!(semaphore.available_permits(), 0);
866 ///
867 /// let permit_3 = semaphore.try_acquire_owned();
868 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
869 /// # }
870 /// ```
871 ///
872 /// [`Arc`]: std::sync::Arc
873 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
874 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
875 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError>876 pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
877 match self.ll_sem.try_acquire(1) {
878 Ok(()) => Ok(OwnedSemaphorePermit {
879 sem: self,
880 permits: 1,
881 }),
882 Err(e) => Err(e),
883 }
884 }
885
886 /// Tries to acquire `n` permits from the semaphore.
887 ///
888 /// The semaphore must be wrapped in an [`Arc`] to call this method. If
889 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
890 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
891 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
892 /// acquired permit.
893 ///
894 /// # Examples
895 ///
896 /// ```
897 /// use std::sync::Arc;
898 /// use tokio::sync::{Semaphore, TryAcquireError};
899 ///
900 /// # fn main() {
901 /// let semaphore = Arc::new(Semaphore::new(4));
902 ///
903 /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
904 /// assert_eq!(semaphore.available_permits(), 1);
905 ///
906 /// let permit_2 = semaphore.try_acquire_many_owned(2);
907 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
908 /// # }
909 /// ```
910 ///
911 /// [`Arc`]: std::sync::Arc
912 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
913 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
914 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
try_acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, TryAcquireError>915 pub fn try_acquire_many_owned(
916 self: Arc<Self>,
917 n: u32,
918 ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
919 match self.ll_sem.try_acquire(n as usize) {
920 Ok(()) => Ok(OwnedSemaphorePermit {
921 sem: self,
922 permits: n,
923 }),
924 Err(e) => Err(e),
925 }
926 }
927
928 /// Closes the semaphore.
929 ///
930 /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
931 ///
932 /// # Examples
933 ///
934 /// ```
935 /// use tokio::sync::Semaphore;
936 /// use std::sync::Arc;
937 /// use tokio::sync::TryAcquireError;
938 ///
939 /// #[tokio::main]
940 /// async fn main() {
941 /// let semaphore = Arc::new(Semaphore::new(1));
942 /// let semaphore2 = semaphore.clone();
943 ///
944 /// tokio::spawn(async move {
945 /// let permit = semaphore.acquire_many(2).await;
946 /// assert!(permit.is_err());
947 /// println!("waiter received error");
948 /// });
949 ///
950 /// println!("closing semaphore");
951 /// semaphore2.close();
952 ///
953 /// // Cannot obtain more permits
954 /// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
955 /// }
956 /// ```
close(&self)957 pub fn close(&self) {
958 self.ll_sem.close();
959 }
960
961 /// Returns true if the semaphore is closed
is_closed(&self) -> bool962 pub fn is_closed(&self) -> bool {
963 self.ll_sem.is_closed()
964 }
965 }
966
967 impl<'a> SemaphorePermit<'a> {
968 /// Forgets the permit **without** releasing it back to the semaphore.
969 /// This can be used to reduce the amount of permits available from a
970 /// semaphore.
971 ///
972 /// # Examples
973 ///
974 /// ```
975 /// use std::sync::Arc;
976 /// use tokio::sync::Semaphore;
977 ///
978 /// let sem = Arc::new(Semaphore::new(10));
979 /// {
980 /// let permit = sem.try_acquire_many(5).unwrap();
981 /// assert_eq!(sem.available_permits(), 5);
982 /// permit.forget();
983 /// }
984 ///
985 /// // Since we forgot the permit, available permits won't go back to its initial value
986 /// // even after the permit is dropped.
987 /// assert_eq!(sem.available_permits(), 5);
988 /// ```
forget(mut self)989 pub fn forget(mut self) {
990 self.permits = 0;
991 }
992
993 /// Merge two [`SemaphorePermit`] instances together, consuming `other`
994 /// without releasing the permits it holds.
995 ///
996 /// Permits held by both `self` and `other` are released when `self` drops.
997 ///
998 /// # Panics
999 ///
1000 /// This function panics if permits from different [`Semaphore`] instances
1001 /// are merged.
1002 ///
1003 /// # Examples
1004 ///
1005 /// ```
1006 /// use std::sync::Arc;
1007 /// use tokio::sync::Semaphore;
1008 ///
1009 /// let sem = Arc::new(Semaphore::new(10));
1010 /// let mut permit = sem.try_acquire().unwrap();
1011 ///
1012 /// for _ in 0..9 {
1013 /// let _permit = sem.try_acquire().unwrap();
1014 /// // Merge individual permits into a single one.
1015 /// permit.merge(_permit)
1016 /// }
1017 ///
1018 /// assert_eq!(sem.available_permits(), 0);
1019 ///
1020 /// // Release all permits in a single batch.
1021 /// drop(permit);
1022 ///
1023 /// assert_eq!(sem.available_permits(), 10);
1024 /// ```
1025 #[track_caller]
merge(&mut self, mut other: Self)1026 pub fn merge(&mut self, mut other: Self) {
1027 assert!(
1028 std::ptr::eq(self.sem, other.sem),
1029 "merging permits from different semaphore instances"
1030 );
1031 self.permits += other.permits;
1032 other.permits = 0;
1033 }
1034
1035 /// Splits `n` permits from `self` and returns a new [`SemaphorePermit`] instance that holds `n` permits.
1036 ///
1037 /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1038 ///
1039 /// # Examples
1040 ///
1041 /// ```
1042 /// use std::sync::Arc;
1043 /// use tokio::sync::Semaphore;
1044 ///
1045 /// let sem = Arc::new(Semaphore::new(3));
1046 ///
1047 /// let mut p1 = sem.try_acquire_many(3).unwrap();
1048 /// let p2 = p1.split(1).unwrap();
1049 ///
1050 /// assert_eq!(p1.num_permits(), 2);
1051 /// assert_eq!(p2.num_permits(), 1);
1052 /// ```
split(&mut self, n: usize) -> Option<Self>1053 pub fn split(&mut self, n: usize) -> Option<Self> {
1054 let n = u32::try_from(n).ok()?;
1055
1056 if n > self.permits {
1057 return None;
1058 }
1059
1060 self.permits -= n;
1061
1062 Some(Self {
1063 sem: self.sem,
1064 permits: n,
1065 })
1066 }
1067
1068 /// Returns the number of permits held by `self`.
num_permits(&self) -> usize1069 pub fn num_permits(&self) -> usize {
1070 self.permits as usize
1071 }
1072 }
1073
1074 impl OwnedSemaphorePermit {
1075 /// Forgets the permit **without** releasing it back to the semaphore.
1076 /// This can be used to reduce the amount of permits available from a
1077 /// semaphore.
1078 ///
1079 /// # Examples
1080 ///
1081 /// ```
1082 /// use std::sync::Arc;
1083 /// use tokio::sync::Semaphore;
1084 ///
1085 /// let sem = Arc::new(Semaphore::new(10));
1086 /// {
1087 /// let permit = sem.clone().try_acquire_many_owned(5).unwrap();
1088 /// assert_eq!(sem.available_permits(), 5);
1089 /// permit.forget();
1090 /// }
1091 ///
1092 /// // Since we forgot the permit, available permits won't go back to its initial value
1093 /// // even after the permit is dropped.
1094 /// assert_eq!(sem.available_permits(), 5);
1095 /// ```
forget(mut self)1096 pub fn forget(mut self) {
1097 self.permits = 0;
1098 }
1099
1100 /// Merge two [`OwnedSemaphorePermit`] instances together, consuming `other`
1101 /// without releasing the permits it holds.
1102 ///
1103 /// Permits held by both `self` and `other` are released when `self` drops.
1104 ///
1105 /// # Panics
1106 ///
1107 /// This function panics if permits from different [`Semaphore`] instances
1108 /// are merged.
1109 ///
1110 /// # Examples
1111 ///
1112 /// ```
1113 /// use std::sync::Arc;
1114 /// use tokio::sync::Semaphore;
1115 ///
1116 /// let sem = Arc::new(Semaphore::new(10));
1117 /// let mut permit = sem.clone().try_acquire_owned().unwrap();
1118 ///
1119 /// for _ in 0..9 {
1120 /// let _permit = sem.clone().try_acquire_owned().unwrap();
1121 /// // Merge individual permits into a single one.
1122 /// permit.merge(_permit)
1123 /// }
1124 ///
1125 /// assert_eq!(sem.available_permits(), 0);
1126 ///
1127 /// // Release all permits in a single batch.
1128 /// drop(permit);
1129 ///
1130 /// assert_eq!(sem.available_permits(), 10);
1131 /// ```
1132 #[track_caller]
merge(&mut self, mut other: Self)1133 pub fn merge(&mut self, mut other: Self) {
1134 assert!(
1135 Arc::ptr_eq(&self.sem, &other.sem),
1136 "merging permits from different semaphore instances"
1137 );
1138 self.permits += other.permits;
1139 other.permits = 0;
1140 }
1141
1142 /// Splits `n` permits from `self` and returns a new [`OwnedSemaphorePermit`] instance that holds `n` permits.
1143 ///
1144 /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1145 ///
1146 /// # Note
1147 ///
1148 /// It will clone the owned `Arc<Semaphore>` to construct the new instance.
1149 ///
1150 /// # Examples
1151 ///
1152 /// ```
1153 /// use std::sync::Arc;
1154 /// use tokio::sync::Semaphore;
1155 ///
1156 /// let sem = Arc::new(Semaphore::new(3));
1157 ///
1158 /// let mut p1 = sem.try_acquire_many_owned(3).unwrap();
1159 /// let p2 = p1.split(1).unwrap();
1160 ///
1161 /// assert_eq!(p1.num_permits(), 2);
1162 /// assert_eq!(p2.num_permits(), 1);
1163 /// ```
split(&mut self, n: usize) -> Option<Self>1164 pub fn split(&mut self, n: usize) -> Option<Self> {
1165 let n = u32::try_from(n).ok()?;
1166
1167 if n > self.permits {
1168 return None;
1169 }
1170
1171 self.permits -= n;
1172
1173 Some(Self {
1174 sem: self.sem.clone(),
1175 permits: n,
1176 })
1177 }
1178
1179 /// Returns the [`Semaphore`] from which this permit was acquired.
semaphore(&self) -> &Arc<Semaphore>1180 pub fn semaphore(&self) -> &Arc<Semaphore> {
1181 &self.sem
1182 }
1183
1184 /// Returns the number of permits held by `self`.
num_permits(&self) -> usize1185 pub fn num_permits(&self) -> usize {
1186 self.permits as usize
1187 }
1188 }
1189
1190 impl Drop for SemaphorePermit<'_> {
drop(&mut self)1191 fn drop(&mut self) {
1192 self.sem.add_permits(self.permits as usize);
1193 }
1194 }
1195
1196 impl Drop for OwnedSemaphorePermit {
drop(&mut self)1197 fn drop(&mut self) {
1198 self.sem.add_permits(self.permits as usize);
1199 }
1200 }
1201