1 #![cfg_attr(loom, allow(dead_code, unreachable_pub, unused_imports))]
2 
3 //! Synchronization primitives for use in asynchronous contexts.
4 //!
5 //! Tokio programs tend to be organized as a set of [tasks] where each task
6 //! operates independently and may be executed on separate physical threads. The
7 //! synchronization primitives provided in this module permit these independent
8 //! tasks to communicate together.
9 //!
10 //! [tasks]: crate::task
11 //!
12 //! # Message passing
13 //!
14 //! The most common form of synchronization in a Tokio program is message
15 //! passing. Two tasks operate independently and send messages to each other to
16 //! synchronize. Doing so has the advantage of avoiding shared state.
17 //!
18 //! Message passing is implemented using channels. A channel supports sending a
19 //! message from one producer task to one or more consumer tasks. There are a
20 //! few flavors of channels provided by Tokio. Each channel flavor supports
21 //! different message passing patterns. When a channel supports multiple
22 //! producers, many separate tasks may **send** messages. When a channel
23 //! supports multiple consumers, many different separate tasks may **receive**
24 //! messages.
25 //!
26 //! Tokio provides many different channel flavors as different message passing
27 //! patterns are best handled with different implementations.
28 //!
29 //! ## `oneshot` channel
30 //!
31 //! The [`oneshot` channel][oneshot] supports sending a **single** value from a
32 //! single producer to a single consumer. This channel is usually used to send
33 //! the result of a computation to a waiter.
34 //!
35 //! **Example:** using a [`oneshot` channel][oneshot] to receive the result of a
36 //! computation.
37 //!
38 //! ```
39 //! use tokio::sync::oneshot;
40 //!
41 //! async fn some_computation() -> String {
42 //!     "represents the result of the computation".to_string()
43 //! }
44 //!
45 //! #[tokio::main]
46 //! async fn main() {
47 //!     let (tx, rx) = oneshot::channel();
48 //!
49 //!     tokio::spawn(async move {
50 //!         let res = some_computation().await;
51 //!         tx.send(res).unwrap();
52 //!     });
53 //!
54 //!     // Do other work while the computation is happening in the background
55 //!
56 //!     // Wait for the computation result
57 //!     let res = rx.await.unwrap();
58 //! }
59 //! ```
60 //!
61 //! Note, if the task produces a computation result as its final
62 //! action before terminating, the [`JoinHandle`] can be used to
63 //! receive that value instead of allocating resources for the
64 //! `oneshot` channel. Awaiting on [`JoinHandle`] returns `Result`. If
65 //! the task panics, the `Joinhandle` yields `Err` with the panic
66 //! cause.
67 //!
68 //! **Example:**
69 //!
70 //! ```
71 //! async fn some_computation() -> String {
72 //!     "the result of the computation".to_string()
73 //! }
74 //!
75 //! #[tokio::main]
76 //! async fn main() {
77 //!     let join_handle = tokio::spawn(async move {
78 //!         some_computation().await
79 //!     });
80 //!
81 //!     // Do other work while the computation is happening in the background
82 //!
83 //!     // Wait for the computation result
84 //!     let res = join_handle.await.unwrap();
85 //! }
86 //! ```
87 //!
88 //! [`JoinHandle`]: crate::task::JoinHandle
89 //!
90 //! ## `mpsc` channel
91 //!
92 //! The [`mpsc` channel][mpsc] supports sending **many** values from **many**
93 //! producers to a single consumer. This channel is often used to send work to a
94 //! task or to receive the result of many computations.
95 //!
96 //! This is also the channel you should use if you want to send many messages
97 //! from a single producer to a single consumer. There is no dedicated spsc
98 //! channel.
99 //!
100 //! **Example:** using an mpsc to incrementally stream the results of a series
101 //! of computations.
102 //!
103 //! ```
104 //! use tokio::sync::mpsc;
105 //!
106 //! async fn some_computation(input: u32) -> String {
107 //!     format!("the result of computation {}", input)
108 //! }
109 //!
110 //! #[tokio::main]
111 //! async fn main() {
112 //!     let (tx, mut rx) = mpsc::channel(100);
113 //!
114 //!     tokio::spawn(async move {
115 //!         for i in 0..10 {
116 //!             let res = some_computation(i).await;
117 //!             tx.send(res).await.unwrap();
118 //!         }
119 //!     });
120 //!
121 //!     while let Some(res) = rx.recv().await {
122 //!         println!("got = {}", res);
123 //!     }
124 //! }
125 //! ```
126 //!
127 //! The argument to `mpsc::channel` is the channel capacity. This is the maximum
128 //! number of values that can be stored in the channel pending receipt at any
129 //! given time. Properly setting this value is key in implementing robust
130 //! programs as the channel capacity plays a critical part in handling back
131 //! pressure.
132 //!
133 //! A common concurrency pattern for resource management is to spawn a task
134 //! dedicated to managing that resource and using message passing between other
135 //! tasks to interact with the resource. The resource may be anything that may
136 //! not be concurrently used. Some examples include a socket and program state.
137 //! For example, if multiple tasks need to send data over a single socket, spawn
138 //! a task to manage the socket and use a channel to synchronize.
139 //!
140 //! **Example:** sending data from many tasks over a single socket using message
141 //! passing.
142 //!
143 //! ```no_run
144 //! use tokio::io::{self, AsyncWriteExt};
145 //! use tokio::net::TcpStream;
146 //! use tokio::sync::mpsc;
147 //!
148 //! #[tokio::main]
149 //! async fn main() -> io::Result<()> {
150 //!     let mut socket = TcpStream::connect("www.example.com:1234").await?;
151 //!     let (tx, mut rx) = mpsc::channel(100);
152 //!
153 //!     for _ in 0..10 {
154 //!         // Each task needs its own `tx` handle. This is done by cloning the
155 //!         // original handle.
156 //!         let tx = tx.clone();
157 //!
158 //!         tokio::spawn(async move {
159 //!             tx.send(&b"data to write"[..]).await.unwrap();
160 //!         });
161 //!     }
162 //!
163 //!     // The `rx` half of the channel returns `None` once **all** `tx` clones
164 //!     // drop. To ensure `None` is returned, drop the handle owned by the
165 //!     // current task. If this `tx` handle is not dropped, there will always
166 //!     // be a single outstanding `tx` handle.
167 //!     drop(tx);
168 //!
169 //!     while let Some(res) = rx.recv().await {
170 //!         socket.write_all(res).await?;
171 //!     }
172 //!
173 //!     Ok(())
174 //! }
175 //! ```
176 //!
177 //! The [`mpsc`] and [`oneshot`] channels can be combined to provide a request /
178 //! response type synchronization pattern with a shared resource. A task is
179 //! spawned to synchronize a resource and waits on commands received on a
180 //! [`mpsc`] channel. Each command includes a [`oneshot`] `Sender` on which the
181 //! result of the command is sent.
182 //!
183 //! **Example:** use a task to synchronize a `u64` counter. Each task sends an
184 //! "fetch and increment" command. The counter value **before** the increment is
185 //! sent over the provided `oneshot` channel.
186 //!
187 //! ```
188 //! use tokio::sync::{oneshot, mpsc};
189 //! use Command::Increment;
190 //!
191 //! enum Command {
192 //!     Increment,
193 //!     // Other commands can be added here
194 //! }
195 //!
196 //! #[tokio::main]
197 //! async fn main() {
198 //!     let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100);
199 //!
200 //!     // Spawn a task to manage the counter
201 //!     tokio::spawn(async move {
202 //!         let mut counter: u64 = 0;
203 //!
204 //!         while let Some((cmd, response)) = cmd_rx.recv().await {
205 //!             match cmd {
206 //!                 Increment => {
207 //!                     let prev = counter;
208 //!                     counter += 1;
209 //!                     response.send(prev).unwrap();
210 //!                 }
211 //!             }
212 //!         }
213 //!     });
214 //!
215 //!     let mut join_handles = vec![];
216 //!
217 //!     // Spawn tasks that will send the increment command.
218 //!     for _ in 0..10 {
219 //!         let cmd_tx = cmd_tx.clone();
220 //!
221 //!         join_handles.push(tokio::spawn(async move {
222 //!             let (resp_tx, resp_rx) = oneshot::channel();
223 //!
224 //!             cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
225 //!             let res = resp_rx.await.unwrap();
226 //!
227 //!             println!("previous value = {}", res);
228 //!         }));
229 //!     }
230 //!
231 //!     // Wait for all tasks to complete
232 //!     for join_handle in join_handles.drain(..) {
233 //!         join_handle.await.unwrap();
234 //!     }
235 //! }
236 //! ```
237 //!
238 //! ## `broadcast` channel
239 //!
240 //! The [`broadcast` channel] supports sending **many** values from
241 //! **many** producers to **many** consumers. Each consumer will receive
242 //! **each** value. This channel can be used to implement "fan out" style
243 //! patterns common with pub / sub or "chat" systems.
244 //!
245 //! This channel tends to be used less often than `oneshot` and `mpsc` but still
246 //! has its use cases.
247 //!
248 //! This is also the channel you should use if you want to broadcast values from
249 //! a single producer to many consumers. There is no dedicated spmc broadcast
250 //! channel.
251 //!
252 //! Basic usage
253 //!
254 //! ```
255 //! use tokio::sync::broadcast;
256 //!
257 //! #[tokio::main]
258 //! async fn main() {
259 //!     let (tx, mut rx1) = broadcast::channel(16);
260 //!     let mut rx2 = tx.subscribe();
261 //!
262 //!     tokio::spawn(async move {
263 //!         assert_eq!(rx1.recv().await.unwrap(), 10);
264 //!         assert_eq!(rx1.recv().await.unwrap(), 20);
265 //!     });
266 //!
267 //!     tokio::spawn(async move {
268 //!         assert_eq!(rx2.recv().await.unwrap(), 10);
269 //!         assert_eq!(rx2.recv().await.unwrap(), 20);
270 //!     });
271 //!
272 //!     tx.send(10).unwrap();
273 //!     tx.send(20).unwrap();
274 //! }
275 //! ```
276 //!
277 //! [`broadcast` channel]: crate::sync::broadcast
278 //!
279 //! ## `watch` channel
280 //!
281 //! The [`watch` channel] supports sending **many** values from a **many**
282 //! producer to **many** consumers. However, only the **most recent** value is
283 //! stored in the channel. Consumers are notified when a new value is sent, but
284 //! there is no guarantee that consumers will see **all** values.
285 //!
286 //! The [`watch` channel] is similar to a [`broadcast` channel] with capacity 1.
287 //!
288 //! Use cases for the [`watch` channel] include broadcasting configuration
289 //! changes or signalling program state changes, such as transitioning to
290 //! shutdown.
291 //!
292 //! **Example:** use a [`watch` channel] to notify tasks of configuration
293 //! changes. In this example, a configuration file is checked periodically. When
294 //! the file changes, the configuration changes are signalled to consumers.
295 //!
296 //! ```
297 //! use tokio::sync::watch;
298 //! use tokio::time::{self, Duration, Instant};
299 //!
300 //! use std::io;
301 //!
302 //! #[derive(Debug, Clone, Eq, PartialEq)]
303 //! struct Config {
304 //!     timeout: Duration,
305 //! }
306 //!
307 //! impl Config {
308 //!     async fn load_from_file() -> io::Result<Config> {
309 //!         // file loading and deserialization logic here
310 //! # Ok(Config { timeout: Duration::from_secs(1) })
311 //!     }
312 //! }
313 //!
314 //! async fn my_async_operation() {
315 //!     // Do something here
316 //! }
317 //!
318 //! #[tokio::main]
319 //! async fn main() {
320 //!     // Load initial configuration value
321 //!     let mut config = Config::load_from_file().await.unwrap();
322 //!
323 //!     // Create the watch channel, initialized with the loaded configuration
324 //!     let (tx, rx) = watch::channel(config.clone());
325 //!
326 //!     // Spawn a task to monitor the file.
327 //!     tokio::spawn(async move {
328 //!         loop {
329 //!             // Wait 10 seconds between checks
330 //!             time::sleep(Duration::from_secs(10)).await;
331 //!
332 //!             // Load the configuration file
333 //!             let new_config = Config::load_from_file().await.unwrap();
334 //!
335 //!             // If the configuration changed, send the new config value
336 //!             // on the watch channel.
337 //!             if new_config != config {
338 //!                 tx.send(new_config.clone()).unwrap();
339 //!                 config = new_config;
340 //!             }
341 //!         }
342 //!     });
343 //!
344 //!     let mut handles = vec![];
345 //!
346 //!     // Spawn tasks that runs the async operation for at most `timeout`. If
347 //!     // the timeout elapses, restart the operation.
348 //!     //
349 //!     // The task simultaneously watches the `Config` for changes. When the
350 //!     // timeout duration changes, the timeout is updated without restarting
351 //!     // the in-flight operation.
352 //!     for _ in 0..5 {
353 //!         // Clone a config watch handle for use in this task
354 //!         let mut rx = rx.clone();
355 //!
356 //!         let handle = tokio::spawn(async move {
357 //!             // Start the initial operation and pin the future to the stack.
358 //!             // Pinning to the stack is required to resume the operation
359 //!             // across multiple calls to `select!`
360 //!             let op = my_async_operation();
361 //!             tokio::pin!(op);
362 //!
363 //!             // Get the initial config value
364 //!             let mut conf = rx.borrow().clone();
365 //!
366 //!             let mut op_start = Instant::now();
367 //!             let sleep = time::sleep_until(op_start + conf.timeout);
368 //!             tokio::pin!(sleep);
369 //!
370 //!             loop {
371 //!                 tokio::select! {
372 //!                     _ = &mut sleep => {
373 //!                         // The operation elapsed. Restart it
374 //!                         op.set(my_async_operation());
375 //!
376 //!                         // Track the new start time
377 //!                         op_start = Instant::now();
378 //!
379 //!                         // Restart the timeout
380 //!                         sleep.set(time::sleep_until(op_start + conf.timeout));
381 //!                     }
382 //!                     _ = rx.changed() => {
383 //!                         conf = rx.borrow_and_update().clone();
384 //!
385 //!                         // The configuration has been updated. Update the
386 //!                         // `sleep` using the new `timeout` value.
387 //!                         sleep.as_mut().reset(op_start + conf.timeout);
388 //!                     }
389 //!                     _ = &mut op => {
390 //!                         // The operation completed!
391 //!                         return
392 //!                     }
393 //!                 }
394 //!             }
395 //!         });
396 //!
397 //!         handles.push(handle);
398 //!     }
399 //!
400 //!     for handle in handles.drain(..) {
401 //!         handle.await.unwrap();
402 //!     }
403 //! }
404 //! ```
405 //!
406 //! [`watch` channel]: mod@crate::sync::watch
407 //! [`broadcast` channel]: mod@crate::sync::broadcast
408 //!
409 //! # State synchronization
410 //!
411 //! The remaining synchronization primitives focus on synchronizing state.
412 //! These are asynchronous equivalents to versions provided by `std`. They
413 //! operate in a similar way as their `std` counterparts but will wait
414 //! asynchronously instead of blocking the thread.
415 //!
416 //! * [`Barrier`] Ensures multiple tasks will wait for each other to reach a
417 //!   point in the program, before continuing execution all together.
418 //!
419 //! * [`Mutex`] Mutual Exclusion mechanism, which ensures that at most one
420 //!   thread at a time is able to access some data.
421 //!
422 //! * [`Notify`] Basic task notification. `Notify` supports notifying a
423 //!   receiving task without sending data. In this case, the task wakes up and
424 //!   resumes processing.
425 //!
426 //! * [`RwLock`] Provides a mutual exclusion mechanism which allows multiple
427 //!   readers at the same time, while allowing only one writer at a time. In
428 //!   some cases, this can be more efficient than a mutex.
429 //!
430 //! * [`Semaphore`] Limits the amount of concurrency. A semaphore holds a
431 //!   number of permits, which tasks may request in order to enter a critical
432 //!   section. Semaphores are useful for implementing limiting or bounding of
433 //!   any kind.
434 //!
435 //! # Runtime compatibility
436 //!
437 //! All synchronization primitives provided in this module are runtime agnostic.
438 //! You can freely move them between different instances of the Tokio runtime
439 //! or even use them from non-Tokio runtimes.
440 //!
441 //! When used in a Tokio runtime, the synchronization primitives participate in
442 //! [cooperative scheduling](crate::task#cooperative-scheduling) to avoid
443 //! starvation. This feature does not apply when used from non-Tokio runtimes.
444 //!
445 //! As an exception, methods ending in `_timeout` are not runtime agnostic
446 //! because they require access to the Tokio timer. See the documentation of
447 //! each `*_timeout` method for more information on its use.
448 
449 cfg_sync! {
450     /// Named future types.
451     pub mod futures {
452         pub use super::notify::Notified;
453     }
454 
455     mod barrier;
456     pub use barrier::{Barrier, BarrierWaitResult};
457 
458     pub mod broadcast;
459 
460     pub mod mpsc;
461 
462     mod mutex;
463     pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard, OwnedMappedMutexGuard};
464 
465     pub(crate) mod notify;
466     pub use notify::Notify;
467 
468     pub mod oneshot;
469 
470     pub(crate) mod batch_semaphore;
471     pub use batch_semaphore::{AcquireError, TryAcquireError};
472 
473     mod semaphore;
474     pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};
475 
476     mod rwlock;
477     pub use rwlock::RwLock;
478     pub use rwlock::owned_read_guard::OwnedRwLockReadGuard;
479     pub use rwlock::owned_write_guard::OwnedRwLockWriteGuard;
480     pub use rwlock::owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
481     pub use rwlock::read_guard::RwLockReadGuard;
482     pub use rwlock::write_guard::RwLockWriteGuard;
483     pub use rwlock::write_guard_mapped::RwLockMappedWriteGuard;
484 
485     mod task;
486     pub(crate) use task::AtomicWaker;
487 
488     mod once_cell;
489     pub use self::once_cell::{OnceCell, SetError};
490 
491     pub mod watch;
492 }
493 
494 cfg_not_sync! {
495     cfg_fs! {
496         pub(crate) mod batch_semaphore;
497         mod mutex;
498         pub(crate) use mutex::Mutex;
499     }
500 
501     #[cfg(any(feature = "rt", feature = "signal", all(unix, feature = "process")))]
502     pub(crate) mod notify;
503 
504     #[cfg(any(feature = "rt", all(windows, feature = "process")))]
505     pub(crate) mod oneshot;
506 
507     cfg_atomic_waker_impl! {
508         mod task;
509         pub(crate) use task::AtomicWaker;
510     }
511 
512     #[cfg(any(feature = "signal", all(unix, feature = "process")))]
513     pub(crate) mod watch;
514 }
515 
516 /// Unit tests
517 #[cfg(test)]
518 mod tests;
519