1 //! Stress-tests
2 //!
3 //! The tests in here try to torture the implementation with multiple threads, in an attempt to
4 //! discover any possible race condition.
5 
6 use std::sync::atomic::{AtomicUsize, Ordering};
7 use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
8 
9 use adaptive_barrier::{Barrier, PanicMode};
10 use arc_swap::strategy::{CaS, DefaultStrategy, IndependentStrategy, Strategy};
11 use arc_swap::ArcSwapAny;
12 use crossbeam_utils::thread;
13 use itertools::Itertools;
14 use once_cell::sync::Lazy;
15 
16 static LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
17 
18 /// We want to prevent these tests from running concurrently, because they run multi-threaded.
lock() -> MutexGuard<'static, ()>19 fn lock() -> MutexGuard<'static, ()> {
20     LOCK.lock().unwrap_or_else(PoisonError::into_inner)
21 }
22 
23 struct LLNode<S: Strategy<Option<Arc<LLNode<S>>>>> {
24     next: ArcSwapAny<Option<Arc<LLNode<S>>>, S>,
25     num: usize,
26     owner: usize,
27 }
28 
29 /// A test that repeatedly builds a linked list concurrently with multiple threads.
30 ///
31 /// The idea here is to stress-test the RCU implementation and see that no items get lost and that
32 /// the ref counts are correct afterwards.
storm_link_list<S>(node_cnt: usize, iters: usize) where S: Default + CaS<Option<Arc<LLNode<S>>>> + Send + Sync,33 fn storm_link_list<S>(node_cnt: usize, iters: usize)
34 where
35     S: Default + CaS<Option<Arc<LLNode<S>>>> + Send + Sync,
36 {
37     let _lock = lock();
38     let head = ArcSwapAny::<_, S>::from(None::<Arc<LLNode<S>>>);
39     #[cfg(not(miri))]
40     let cpus = num_cpus::get();
41     #[cfg(miri)]
42     let cpus = 2;
43     let barr = Barrier::new(PanicMode::Poison);
44     thread::scope(|scope| {
45         for thread in 0..cpus {
46             // We want to borrow these, but that kind-of conflicts with the move closure mode
47             let mut barr = barr.clone();
48             let head = &head;
49             scope.spawn(move |_| {
50                 let nodes = (0..node_cnt)
51                     .map(|i| LLNode {
52                         next: ArcSwapAny::from(None),
53                         num: i,
54                         owner: thread,
55                     })
56                     .map(Arc::new)
57                     .collect::<Vec<_>>();
58                 for iter in 0..iters {
59                     barr.wait(); // Start synchronously
60                     for n in nodes.iter().rev() {
61                         head.rcu(|head| {
62                             n.next.store(head.clone()); // Cloning the optional Arc
63                             Some(Arc::clone(n))
64                         });
65                     }
66                     // And do the checks once everyone finishes
67                     barr.wait();
68                     // First, check that all our numbers are increasing by one and all are present
69                     let mut node = head.load();
70                     let mut expecting = 0;
71                     while node.is_some() {
72                         // A bit of gymnastics, we don't have NLL yet and we need to persuade the
73                         // borrow checker this is safe.
74                         let next = {
75                             let inner = node.as_ref().unwrap();
76                             if inner.owner == thread {
77                                 assert_eq!(expecting, inner.num);
78                                 expecting += 1;
79                             }
80                             inner.next.load()
81                         };
82                         node = next;
83                     }
84                     assert_eq!(node_cnt, expecting);
85                     // We don't want to count the ref-counts while someone still plays around with
86                     // them and loading.
87                     barr.wait();
88                     // Now that we've checked we have everything, check that all the nodes have ref
89                     // count 2 ‒ once in the vector, once in the linked list.
90                     for n in &nodes {
91                         assert_eq!(
92                             2,
93                             Arc::strong_count(n),
94                             "Wrong number of counts in item {} in iteration {}",
95                             n.num,
96                             iter,
97                         );
98                     }
99                     // Reset the head so we don't mix the runs together, which would create a mess.
100                     // Also, the tails might disturb the ref counts.
101                     barr.wait();
102                     head.store(None);
103                     nodes.last().unwrap().next.store(None);
104                 }
105                 barr.wait();
106                 // We went through all the iterations. Dismantle the list and see that everything
107                 // has ref count 1.
108                 head.store(None);
109                 for n in &nodes {
110                     n.next.store(None);
111                 }
112                 barr.wait(); // Wait until everyone resets their own nexts
113                 for n in &nodes {
114                     assert_eq!(1, Arc::strong_count(n));
115                 }
116             });
117         }
118 
119         drop(barr);
120     })
121     .unwrap();
122 }
123 
124 struct LLNodeCnt<'a> {
125     next: Option<Arc<LLNodeCnt<'a>>>,
126     num: usize,
127     owner: usize,
128     live_cnt: &'a AtomicUsize,
129 }
130 
131 impl<'a> Drop for LLNodeCnt<'a> {
drop(&mut self)132     fn drop(&mut self) {
133         self.live_cnt.fetch_sub(1, Ordering::Relaxed);
134     }
135 }
136 
137 /// Test where we build and then deconstruct a linked list using multiple threads.
storm_unroll<S>(node_cnt: usize, iters: usize) where S: Default + Send + Sync, for<'a> S: CaS<Option<Arc<LLNodeCnt<'a>>>>,138 fn storm_unroll<S>(node_cnt: usize, iters: usize)
139 where
140     S: Default + Send + Sync,
141     for<'a> S: CaS<Option<Arc<LLNodeCnt<'a>>>>,
142 {
143     let _lock = lock();
144 
145     #[cfg(not(miri))]
146     let cpus = num_cpus::get();
147     #[cfg(miri)]
148     let cpus = 2;
149     let barr = Barrier::new(PanicMode::Poison);
150     let global_cnt = AtomicUsize::new(0);
151     // We plan to create this many nodes during the whole test.
152     let live_cnt = AtomicUsize::new(cpus * node_cnt * iters);
153     let head = ArcSwapAny::<_, S>::from(None);
154     thread::scope(|scope| {
155         for thread in 0..cpus {
156             // Borrow these instead of moving.
157             let head = &head;
158             let mut barr = barr.clone();
159             let global_cnt = &global_cnt;
160             let live_cnt = &live_cnt;
161             scope.spawn(move |_| {
162                 for iter in 0..iters {
163                     barr.wait();
164                     // Create bunch of nodes and put them into the list.
165                     for i in 0..node_cnt {
166                         let mut node = Arc::new(LLNodeCnt {
167                             next: None,
168                             num: i,
169                             owner: thread,
170                             live_cnt,
171                         });
172                         head.rcu(|head| {
173                             // Clone Option<Arc>
174                             Arc::get_mut(&mut node).unwrap().next = head.clone();
175                             Arc::clone(&node)
176                         });
177                     }
178                     if barr.wait().is_leader() {
179                         let mut cnt = 0;
180                         let mut node = head.load_full();
181                         while let Some(n) = node.as_ref() {
182                             cnt += 1;
183                             node = n.next.clone();
184                         }
185                         assert_eq!(cnt, node_cnt * cpus);
186                     }
187                     barr.wait();
188                     // Keep removing items, count how many there are and that they increase in each
189                     // thread's list.
190                     let mut last_seen = vec![node_cnt; cpus];
191                     let mut cnt = 0;
192                     while let Some(node) =
193                         head.rcu(|head| head.as_ref().and_then(|h| h.next.clone()))
194                     {
195                         assert!(last_seen[node.owner] > node.num);
196                         last_seen[node.owner] = node.num;
197                         cnt += 1;
198                     }
199                     global_cnt.fetch_add(cnt, Ordering::Relaxed);
200                     if barr.wait().is_leader() {
201                         assert_eq!(node_cnt * cpus, global_cnt.swap(0, Ordering::Relaxed));
202                     }
203                     assert_eq!(
204                         (iters - iter - 1) * node_cnt * cpus,
205                         live_cnt.load(Ordering::Relaxed),
206                     );
207                 }
208             });
209         }
210 
211         drop(barr);
212     })
213     .unwrap();
214     // Everything got destroyed properly.
215     assert_eq!(0, live_cnt.load(Ordering::Relaxed));
216 }
217 
load_parallel<S>(iters: usize) where S: Default + Strategy<Arc<usize>> + Send + Sync,218 fn load_parallel<S>(iters: usize)
219 where
220     S: Default + Strategy<Arc<usize>> + Send + Sync,
221 {
222     let _lock = lock();
223     #[cfg(not(miri))]
224     let cpus = num_cpus::get();
225     #[cfg(miri)]
226     let cpus = 2;
227     let shared = ArcSwapAny::<_, S>::from(Arc::new(0));
228     thread::scope(|scope| {
229         scope.spawn(|_| {
230             for i in 0..iters {
231                 shared.store(Arc::new(i));
232             }
233         });
234         for _ in 0..cpus {
235             scope.spawn(|_| {
236                 for _ in 0..iters {
237                     let guards = (0..256).map(|_| shared.load()).collect::<Vec<_>>();
238                     for (l, h) in guards.iter().tuple_windows() {
239                         assert!(**l <= **h, "{} > {}", l, h);
240                     }
241                 }
242             });
243         }
244     })
245     .unwrap();
246     let v = shared.load_full();
247     assert_eq!(2, Arc::strong_count(&v));
248 }
249 
250 #[cfg(not(miri))]
251 const ITER_SMALL: usize = 100;
252 #[cfg(not(miri))]
253 const ITER_MID: usize = 1000;
254 
255 #[cfg(miri)]
256 const ITER_SMALL: usize = 2;
257 #[cfg(miri)]
258 const ITER_MID: usize = 5;
259 
260 macro_rules! t {
261     ($name: ident, $strategy: ty) => {
262         mod $name {
263             use super::*;
264 
265             #[allow(deprecated)] // We use some "deprecated" testing strategies
266             type Strategy = $strategy;
267 
268             #[test]
269             fn storm_link_list_small() {
270                 storm_link_list::<Strategy>(ITER_SMALL, 5);
271             }
272 
273             #[test]
274             #[ignore]
275             fn storm_link_list_large() {
276                 storm_link_list::<Strategy>(10_000, 50);
277             }
278 
279             #[test]
280             fn storm_unroll_small() {
281                 storm_unroll::<Strategy>(ITER_SMALL, 5);
282             }
283 
284             #[test]
285             #[ignore]
286             fn storm_unroll_large() {
287                 storm_unroll::<Strategy>(10_000, 50);
288             }
289 
290             #[test]
291             fn load_parallel_small() {
292                 load_parallel::<Strategy>(ITER_MID);
293             }
294 
295             #[test]
296             #[ignore]
297             fn load_parallel_large() {
298                 load_parallel::<Strategy>(100_000);
299             }
300         }
301     };
302 }
303 
304 t!(default, DefaultStrategy);
305 t!(independent, IndependentStrategy);
306 #[cfg(feature = "internal-test-strategies")]
307 t!(
308     full_slots,
309     arc_swap::strategy::test_strategies::FillFastSlots
310 );
311