1 //! Stress-tests
2 //!
3 //! The tests in here try to torture the implementation with multiple threads, in an attempt to
4 //! discover any possible race condition.
5
6 use std::sync::atomic::{AtomicUsize, Ordering};
7 use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
8
9 use adaptive_barrier::{Barrier, PanicMode};
10 use arc_swap::strategy::{CaS, DefaultStrategy, IndependentStrategy, Strategy};
11 use arc_swap::ArcSwapAny;
12 use crossbeam_utils::thread;
13 use itertools::Itertools;
14 use once_cell::sync::Lazy;
15
16 static LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
17
18 /// We want to prevent these tests from running concurrently, because they run multi-threaded.
lock() -> MutexGuard<'static, ()>19 fn lock() -> MutexGuard<'static, ()> {
20 LOCK.lock().unwrap_or_else(PoisonError::into_inner)
21 }
22
23 struct LLNode<S: Strategy<Option<Arc<LLNode<S>>>>> {
24 next: ArcSwapAny<Option<Arc<LLNode<S>>>, S>,
25 num: usize,
26 owner: usize,
27 }
28
29 /// A test that repeatedly builds a linked list concurrently with multiple threads.
30 ///
31 /// The idea here is to stress-test the RCU implementation and see that no items get lost and that
32 /// the ref counts are correct afterwards.
storm_link_list<S>(node_cnt: usize, iters: usize) where S: Default + CaS<Option<Arc<LLNode<S>>>> + Send + Sync,33 fn storm_link_list<S>(node_cnt: usize, iters: usize)
34 where
35 S: Default + CaS<Option<Arc<LLNode<S>>>> + Send + Sync,
36 {
37 let _lock = lock();
38 let head = ArcSwapAny::<_, S>::from(None::<Arc<LLNode<S>>>);
39 #[cfg(not(miri))]
40 let cpus = num_cpus::get();
41 #[cfg(miri)]
42 let cpus = 2;
43 let barr = Barrier::new(PanicMode::Poison);
44 thread::scope(|scope| {
45 for thread in 0..cpus {
46 // We want to borrow these, but that kind-of conflicts with the move closure mode
47 let mut barr = barr.clone();
48 let head = &head;
49 scope.spawn(move |_| {
50 let nodes = (0..node_cnt)
51 .map(|i| LLNode {
52 next: ArcSwapAny::from(None),
53 num: i,
54 owner: thread,
55 })
56 .map(Arc::new)
57 .collect::<Vec<_>>();
58 for iter in 0..iters {
59 barr.wait(); // Start synchronously
60 for n in nodes.iter().rev() {
61 head.rcu(|head| {
62 n.next.store(head.clone()); // Cloning the optional Arc
63 Some(Arc::clone(n))
64 });
65 }
66 // And do the checks once everyone finishes
67 barr.wait();
68 // First, check that all our numbers are increasing by one and all are present
69 let mut node = head.load();
70 let mut expecting = 0;
71 while node.is_some() {
72 // A bit of gymnastics, we don't have NLL yet and we need to persuade the
73 // borrow checker this is safe.
74 let next = {
75 let inner = node.as_ref().unwrap();
76 if inner.owner == thread {
77 assert_eq!(expecting, inner.num);
78 expecting += 1;
79 }
80 inner.next.load()
81 };
82 node = next;
83 }
84 assert_eq!(node_cnt, expecting);
85 // We don't want to count the ref-counts while someone still plays around with
86 // them and loading.
87 barr.wait();
88 // Now that we've checked we have everything, check that all the nodes have ref
89 // count 2 ‒ once in the vector, once in the linked list.
90 for n in &nodes {
91 assert_eq!(
92 2,
93 Arc::strong_count(n),
94 "Wrong number of counts in item {} in iteration {}",
95 n.num,
96 iter,
97 );
98 }
99 // Reset the head so we don't mix the runs together, which would create a mess.
100 // Also, the tails might disturb the ref counts.
101 barr.wait();
102 head.store(None);
103 nodes.last().unwrap().next.store(None);
104 }
105 barr.wait();
106 // We went through all the iterations. Dismantle the list and see that everything
107 // has ref count 1.
108 head.store(None);
109 for n in &nodes {
110 n.next.store(None);
111 }
112 barr.wait(); // Wait until everyone resets their own nexts
113 for n in &nodes {
114 assert_eq!(1, Arc::strong_count(n));
115 }
116 });
117 }
118
119 drop(barr);
120 })
121 .unwrap();
122 }
123
124 struct LLNodeCnt<'a> {
125 next: Option<Arc<LLNodeCnt<'a>>>,
126 num: usize,
127 owner: usize,
128 live_cnt: &'a AtomicUsize,
129 }
130
131 impl<'a> Drop for LLNodeCnt<'a> {
drop(&mut self)132 fn drop(&mut self) {
133 self.live_cnt.fetch_sub(1, Ordering::Relaxed);
134 }
135 }
136
137 /// Test where we build and then deconstruct a linked list using multiple threads.
storm_unroll<S>(node_cnt: usize, iters: usize) where S: Default + Send + Sync, for<'a> S: CaS<Option<Arc<LLNodeCnt<'a>>>>,138 fn storm_unroll<S>(node_cnt: usize, iters: usize)
139 where
140 S: Default + Send + Sync,
141 for<'a> S: CaS<Option<Arc<LLNodeCnt<'a>>>>,
142 {
143 let _lock = lock();
144
145 #[cfg(not(miri))]
146 let cpus = num_cpus::get();
147 #[cfg(miri)]
148 let cpus = 2;
149 let barr = Barrier::new(PanicMode::Poison);
150 let global_cnt = AtomicUsize::new(0);
151 // We plan to create this many nodes during the whole test.
152 let live_cnt = AtomicUsize::new(cpus * node_cnt * iters);
153 let head = ArcSwapAny::<_, S>::from(None);
154 thread::scope(|scope| {
155 for thread in 0..cpus {
156 // Borrow these instead of moving.
157 let head = &head;
158 let mut barr = barr.clone();
159 let global_cnt = &global_cnt;
160 let live_cnt = &live_cnt;
161 scope.spawn(move |_| {
162 for iter in 0..iters {
163 barr.wait();
164 // Create bunch of nodes and put them into the list.
165 for i in 0..node_cnt {
166 let mut node = Arc::new(LLNodeCnt {
167 next: None,
168 num: i,
169 owner: thread,
170 live_cnt,
171 });
172 head.rcu(|head| {
173 // Clone Option<Arc>
174 Arc::get_mut(&mut node).unwrap().next = head.clone();
175 Arc::clone(&node)
176 });
177 }
178 if barr.wait().is_leader() {
179 let mut cnt = 0;
180 let mut node = head.load_full();
181 while let Some(n) = node.as_ref() {
182 cnt += 1;
183 node = n.next.clone();
184 }
185 assert_eq!(cnt, node_cnt * cpus);
186 }
187 barr.wait();
188 // Keep removing items, count how many there are and that they increase in each
189 // thread's list.
190 let mut last_seen = vec![node_cnt; cpus];
191 let mut cnt = 0;
192 while let Some(node) =
193 head.rcu(|head| head.as_ref().and_then(|h| h.next.clone()))
194 {
195 assert!(last_seen[node.owner] > node.num);
196 last_seen[node.owner] = node.num;
197 cnt += 1;
198 }
199 global_cnt.fetch_add(cnt, Ordering::Relaxed);
200 if barr.wait().is_leader() {
201 assert_eq!(node_cnt * cpus, global_cnt.swap(0, Ordering::Relaxed));
202 }
203 assert_eq!(
204 (iters - iter - 1) * node_cnt * cpus,
205 live_cnt.load(Ordering::Relaxed),
206 );
207 }
208 });
209 }
210
211 drop(barr);
212 })
213 .unwrap();
214 // Everything got destroyed properly.
215 assert_eq!(0, live_cnt.load(Ordering::Relaxed));
216 }
217
load_parallel<S>(iters: usize) where S: Default + Strategy<Arc<usize>> + Send + Sync,218 fn load_parallel<S>(iters: usize)
219 where
220 S: Default + Strategy<Arc<usize>> + Send + Sync,
221 {
222 let _lock = lock();
223 #[cfg(not(miri))]
224 let cpus = num_cpus::get();
225 #[cfg(miri)]
226 let cpus = 2;
227 let shared = ArcSwapAny::<_, S>::from(Arc::new(0));
228 thread::scope(|scope| {
229 scope.spawn(|_| {
230 for i in 0..iters {
231 shared.store(Arc::new(i));
232 }
233 });
234 for _ in 0..cpus {
235 scope.spawn(|_| {
236 for _ in 0..iters {
237 let guards = (0..256).map(|_| shared.load()).collect::<Vec<_>>();
238 for (l, h) in guards.iter().tuple_windows() {
239 assert!(**l <= **h, "{} > {}", l, h);
240 }
241 }
242 });
243 }
244 })
245 .unwrap();
246 let v = shared.load_full();
247 assert_eq!(2, Arc::strong_count(&v));
248 }
249
250 #[cfg(not(miri))]
251 const ITER_SMALL: usize = 100;
252 #[cfg(not(miri))]
253 const ITER_MID: usize = 1000;
254
255 #[cfg(miri)]
256 const ITER_SMALL: usize = 2;
257 #[cfg(miri)]
258 const ITER_MID: usize = 5;
259
260 macro_rules! t {
261 ($name: ident, $strategy: ty) => {
262 mod $name {
263 use super::*;
264
265 #[allow(deprecated)] // We use some "deprecated" testing strategies
266 type Strategy = $strategy;
267
268 #[test]
269 fn storm_link_list_small() {
270 storm_link_list::<Strategy>(ITER_SMALL, 5);
271 }
272
273 #[test]
274 #[ignore]
275 fn storm_link_list_large() {
276 storm_link_list::<Strategy>(10_000, 50);
277 }
278
279 #[test]
280 fn storm_unroll_small() {
281 storm_unroll::<Strategy>(ITER_SMALL, 5);
282 }
283
284 #[test]
285 #[ignore]
286 fn storm_unroll_large() {
287 storm_unroll::<Strategy>(10_000, 50);
288 }
289
290 #[test]
291 fn load_parallel_small() {
292 load_parallel::<Strategy>(ITER_MID);
293 }
294
295 #[test]
296 #[ignore]
297 fn load_parallel_large() {
298 load_parallel::<Strategy>(100_000);
299 }
300 }
301 };
302 }
303
304 t!(default, DefaultStrategy);
305 t!(independent, IndependentStrategy);
306 #[cfg(feature = "internal-test-strategies")]
307 t!(
308 full_slots,
309 arc_swap::strategy::test_strategies::FillFastSlots
310 );
311