xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/unittest/test/test_break.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1import gc
2import io
3import os
4import sys
5import signal
6import weakref
7import unittest
8
9from test import support
10
11
12@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
13@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
14class TestBreak(unittest.TestCase):
15    int_handler = None
16    # This number was smart-guessed, previously tests were failing
17    # after 7th run. So, we take `x * 2 + 1` to be sure.
18    default_repeats = 15
19
20    def setUp(self):
21        self._default_handler = signal.getsignal(signal.SIGINT)
22        if self.int_handler is not None:
23            signal.signal(signal.SIGINT, self.int_handler)
24
25    def tearDown(self):
26        signal.signal(signal.SIGINT, self._default_handler)
27        unittest.signals._results = weakref.WeakKeyDictionary()
28        unittest.signals._interrupt_handler = None
29
30
31    def withRepeats(self, test_function, repeats=None):
32        if not support.check_impl_detail(cpython=True):
33            # Override repeats count on non-cpython to execute only once.
34            # Because this test only makes sense to be repeated on CPython.
35            repeats = 1
36        elif repeats is None:
37            repeats = self.default_repeats
38
39        for repeat in range(repeats):
40            with self.subTest(repeat=repeat):
41                # We don't run `setUp` for the very first repeat
42                # and we don't run `tearDown` for the very last one,
43                # because they are handled by the test class itself.
44                if repeat != 0:
45                    self.setUp()
46                try:
47                    test_function()
48                finally:
49                    if repeat != repeats - 1:
50                        self.tearDown()
51
52    def testInstallHandler(self):
53        default_handler = signal.getsignal(signal.SIGINT)
54        unittest.installHandler()
55        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
56
57        try:
58            pid = os.getpid()
59            os.kill(pid, signal.SIGINT)
60        except KeyboardInterrupt:
61            self.fail("KeyboardInterrupt not handled")
62
63        self.assertTrue(unittest.signals._interrupt_handler.called)
64
65    def testRegisterResult(self):
66        result = unittest.TestResult()
67        self.assertNotIn(result, unittest.signals._results)
68
69        unittest.registerResult(result)
70        try:
71            self.assertIn(result, unittest.signals._results)
72        finally:
73            unittest.removeResult(result)
74
75    def testInterruptCaught(self):
76        def test(result):
77            pid = os.getpid()
78            os.kill(pid, signal.SIGINT)
79            result.breakCaught = True
80            self.assertTrue(result.shouldStop)
81
82        def test_function():
83            result = unittest.TestResult()
84            unittest.installHandler()
85            unittest.registerResult(result)
86
87            self.assertNotEqual(
88                signal.getsignal(signal.SIGINT),
89                self._default_handler,
90            )
91
92            try:
93                test(result)
94            except KeyboardInterrupt:
95                self.fail("KeyboardInterrupt not handled")
96            self.assertTrue(result.breakCaught)
97        self.withRepeats(test_function)
98
99    def testSecondInterrupt(self):
100        # Can't use skipIf decorator because the signal handler may have
101        # been changed after defining this method.
102        if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
103            self.skipTest("test requires SIGINT to not be ignored")
104
105        def test(result):
106            pid = os.getpid()
107            os.kill(pid, signal.SIGINT)
108            result.breakCaught = True
109            self.assertTrue(result.shouldStop)
110            os.kill(pid, signal.SIGINT)
111            self.fail("Second KeyboardInterrupt not raised")
112
113        def test_function():
114            result = unittest.TestResult()
115            unittest.installHandler()
116            unittest.registerResult(result)
117
118            with self.assertRaises(KeyboardInterrupt):
119                test(result)
120            self.assertTrue(result.breakCaught)
121        self.withRepeats(test_function)
122
123
124    def testTwoResults(self):
125        def test_function():
126            unittest.installHandler()
127
128            result = unittest.TestResult()
129            unittest.registerResult(result)
130            new_handler = signal.getsignal(signal.SIGINT)
131
132            result2 = unittest.TestResult()
133            unittest.registerResult(result2)
134            self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
135
136            result3 = unittest.TestResult()
137
138            try:
139                os.kill(os.getpid(), signal.SIGINT)
140            except KeyboardInterrupt:
141                self.fail("KeyboardInterrupt not handled")
142
143            self.assertTrue(result.shouldStop)
144            self.assertTrue(result2.shouldStop)
145            self.assertFalse(result3.shouldStop)
146        self.withRepeats(test_function)
147
148
149    def testHandlerReplacedButCalled(self):
150        # Can't use skipIf decorator because the signal handler may have
151        # been changed after defining this method.
152        if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
153            self.skipTest("test requires SIGINT to not be ignored")
154
155        def test_function():
156            # If our handler has been replaced (is no longer installed) but is
157            # called by the *new* handler, then it isn't safe to delay the
158            # SIGINT and we should immediately delegate to the default handler
159            unittest.installHandler()
160
161            handler = signal.getsignal(signal.SIGINT)
162            def new_handler(frame, signum):
163                handler(frame, signum)
164            signal.signal(signal.SIGINT, new_handler)
165
166            try:
167                os.kill(os.getpid(), signal.SIGINT)
168            except KeyboardInterrupt:
169                pass
170            else:
171                self.fail("replaced but delegated handler doesn't raise interrupt")
172        self.withRepeats(test_function)
173
174    def testRunner(self):
175        # Creating a TextTestRunner with the appropriate argument should
176        # register the TextTestResult it creates
177        runner = unittest.TextTestRunner(stream=io.StringIO())
178
179        result = runner.run(unittest.TestSuite())
180        self.assertIn(result, unittest.signals._results)
181
182    def testWeakReferences(self):
183        # Calling registerResult on a result should not keep it alive
184        result = unittest.TestResult()
185        unittest.registerResult(result)
186
187        ref = weakref.ref(result)
188        del result
189
190        # For non-reference counting implementations
191        gc.collect();gc.collect()
192        self.assertIsNone(ref())
193
194
195    def testRemoveResult(self):
196        result = unittest.TestResult()
197        unittest.registerResult(result)
198
199        unittest.installHandler()
200        self.assertTrue(unittest.removeResult(result))
201
202        # Should this raise an error instead?
203        self.assertFalse(unittest.removeResult(unittest.TestResult()))
204
205        try:
206            pid = os.getpid()
207            os.kill(pid, signal.SIGINT)
208        except KeyboardInterrupt:
209            pass
210
211        self.assertFalse(result.shouldStop)
212
213    def testMainInstallsHandler(self):
214        failfast = object()
215        test = object()
216        verbosity = object()
217        result = object()
218        default_handler = signal.getsignal(signal.SIGINT)
219
220        class FakeRunner(object):
221            initArgs = []
222            runArgs = []
223            def __init__(self, *args, **kwargs):
224                self.initArgs.append((args, kwargs))
225            def run(self, test):
226                self.runArgs.append(test)
227                return result
228
229        class Program(unittest.TestProgram):
230            def __init__(self, catchbreak):
231                self.exit = False
232                self.verbosity = verbosity
233                self.failfast = failfast
234                self.catchbreak = catchbreak
235                self.tb_locals = False
236                self.testRunner = FakeRunner
237                self.test = test
238                self.result = None
239
240        p = Program(False)
241        p.runTests()
242
243        self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
244                                                     'verbosity': verbosity,
245                                                     'failfast': failfast,
246                                                     'tb_locals': False,
247                                                     'warnings': None})])
248        self.assertEqual(FakeRunner.runArgs, [test])
249        self.assertEqual(p.result, result)
250
251        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
252
253        FakeRunner.initArgs = []
254        FakeRunner.runArgs = []
255        p = Program(True)
256        p.runTests()
257
258        self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
259                                                     'verbosity': verbosity,
260                                                     'failfast': failfast,
261                                                     'tb_locals': False,
262                                                     'warnings': None})])
263        self.assertEqual(FakeRunner.runArgs, [test])
264        self.assertEqual(p.result, result)
265
266        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
267
268    def testRemoveHandler(self):
269        default_handler = signal.getsignal(signal.SIGINT)
270        unittest.installHandler()
271        unittest.removeHandler()
272        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
273
274        # check that calling removeHandler multiple times has no ill-effect
275        unittest.removeHandler()
276        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
277
278    def testRemoveHandlerAsDecorator(self):
279        default_handler = signal.getsignal(signal.SIGINT)
280        unittest.installHandler()
281
282        @unittest.removeHandler
283        def test():
284            self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
285
286        test()
287        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
288
289@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
290@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
291class TestBreakDefaultIntHandler(TestBreak):
292    int_handler = signal.default_int_handler
293
294@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
295@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
296class TestBreakSignalIgnored(TestBreak):
297    int_handler = signal.SIG_IGN
298
299@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
300@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
301class TestBreakSignalDefault(TestBreak):
302    int_handler = signal.SIG_DFL
303
304
305if __name__ == "__main__":
306    unittest.main()
307