xref: /aosp_15_r20/external/autotest/server/cros/stress_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import threading
6import unittest
7
8import common
9
10from autotest_lib.server.cros import stress
11
12
13class StopThreadForTesting(Exception):
14    pass
15
16
17class StressorTest(unittest.TestCase):
18
19    def testEscalateExceptions(self):
20        def stress_event():
21            raise StopThreadForTesting
22
23        stressor = stress.CountedStressor(stress_event)
24        stressor.start(1)
25        self.assertRaises(StopThreadForTesting, stressor.wait)
26
27
28    def testDontEscalateExceptions(self):
29        event = threading.Event()
30        def stress_event():
31            event.set()
32            raise StopThreadForTesting
33
34        stressor = stress.CountedStressor(stress_event,
35                                          escalate_exceptions=False)
36        stressor.start(1)
37        stressor.wait()
38        self.assertTrue(event.is_set(), 'The stress event did not run')
39
40
41    def testOnExit(self):
42        def stress_event():
43            pass
44
45        event = threading.Event()
46        def on_exit():
47            event.set()
48
49        stressor = stress.CountedStressor(stress_event, on_exit=on_exit)
50        stressor.start(1)
51        stressor.wait()
52        self.assertTrue(event.is_set())
53
54
55    def testOnExitWithException(self):
56        def stress_event():
57            raise StopThreadForTesting
58
59        event = threading.Event()
60        def on_exit():
61            event.set()
62
63        stressor = stress.CountedStressor(stress_event, on_exit=on_exit)
64        stressor.start(1)
65        self.assertRaises(StopThreadForTesting, stressor.wait)
66        self.assertTrue(event.is_set())
67
68
69    def testCountedStressorStartCondition(self):
70        event = threading.Event()
71
72        def start_condition():
73            if event.is_set():
74                return True
75            event.set()
76            return False
77
78        def stress_event():
79            raise StopThreadForTesting
80
81        stressor = stress.CountedStressor(stress_event)
82        stressor.start(1, start_condition=start_condition)
83        self.assertRaises(StopThreadForTesting, stressor.wait)
84        self.assertTrue(event.is_set(),
85                        'Stress event ran despite a False start condition')
86
87
88    def testControlledStressorStartCondition(self):
89        start_event = threading.Event()
90        ran_event = threading.Event()
91
92        def start_condition():
93            if start_event.is_set():
94                return True
95            start_event.set()
96            return False
97
98        def stress_event():
99            ran_event.set()
100            raise StopThreadForTesting
101
102        stressor = stress.ControlledStressor(stress_event)
103        stressor.start(start_condition=start_condition)
104        ran_event.wait()
105        self.assertRaises(StopThreadForTesting, stressor.stop)
106        self.assertTrue(start_event.is_set(),
107                        'Stress event ran despite a False start condition')
108
109
110    def testCountedStressorIterations(self):
111        # This is a list to get around scoping rules in Python 2.x. See
112        # 'nonlocal' for the Python 3 remedy.
113        count = [0]
114
115        def stress_event():
116            count[0] += 1
117
118        stressor = stress.CountedStressor(stress_event)
119        stressor.start(10)
120        stressor.wait()
121        self.assertEqual(10, count[0], 'Stress event did not run the expected '
122                                       'number of iterations')
123
124
125if __name__ == '__main__':
126    unittest.main()
127