1import gc 2import io 3import os 4import sys 5import signal 6import weakref 7import unittest 8 9from test import support 10 11 12@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 13@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 14class TestBreak(unittest.TestCase): 15 int_handler = None 16 # This number was smart-guessed, previously tests were failing 17 # after 7th run. So, we take `x * 2 + 1` to be sure. 18 default_repeats = 15 19 20 def setUp(self): 21 self._default_handler = signal.getsignal(signal.SIGINT) 22 if self.int_handler is not None: 23 signal.signal(signal.SIGINT, self.int_handler) 24 25 def tearDown(self): 26 signal.signal(signal.SIGINT, self._default_handler) 27 unittest.signals._results = weakref.WeakKeyDictionary() 28 unittest.signals._interrupt_handler = None 29 30 31 def withRepeats(self, test_function, repeats=None): 32 if not support.check_impl_detail(cpython=True): 33 # Override repeats count on non-cpython to execute only once. 34 # Because this test only makes sense to be repeated on CPython. 35 repeats = 1 36 elif repeats is None: 37 repeats = self.default_repeats 38 39 for repeat in range(repeats): 40 with self.subTest(repeat=repeat): 41 # We don't run `setUp` for the very first repeat 42 # and we don't run `tearDown` for the very last one, 43 # because they are handled by the test class itself. 44 if repeat != 0: 45 self.setUp() 46 try: 47 test_function() 48 finally: 49 if repeat != repeats - 1: 50 self.tearDown() 51 52 def testInstallHandler(self): 53 default_handler = signal.getsignal(signal.SIGINT) 54 unittest.installHandler() 55 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 56 57 try: 58 pid = os.getpid() 59 os.kill(pid, signal.SIGINT) 60 except KeyboardInterrupt: 61 self.fail("KeyboardInterrupt not handled") 62 63 self.assertTrue(unittest.signals._interrupt_handler.called) 64 65 def testRegisterResult(self): 66 result = unittest.TestResult() 67 self.assertNotIn(result, unittest.signals._results) 68 69 unittest.registerResult(result) 70 try: 71 self.assertIn(result, unittest.signals._results) 72 finally: 73 unittest.removeResult(result) 74 75 def testInterruptCaught(self): 76 def test(result): 77 pid = os.getpid() 78 os.kill(pid, signal.SIGINT) 79 result.breakCaught = True 80 self.assertTrue(result.shouldStop) 81 82 def test_function(): 83 result = unittest.TestResult() 84 unittest.installHandler() 85 unittest.registerResult(result) 86 87 self.assertNotEqual( 88 signal.getsignal(signal.SIGINT), 89 self._default_handler, 90 ) 91 92 try: 93 test(result) 94 except KeyboardInterrupt: 95 self.fail("KeyboardInterrupt not handled") 96 self.assertTrue(result.breakCaught) 97 self.withRepeats(test_function) 98 99 def testSecondInterrupt(self): 100 # Can't use skipIf decorator because the signal handler may have 101 # been changed after defining this method. 102 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: 103 self.skipTest("test requires SIGINT to not be ignored") 104 105 def test(result): 106 pid = os.getpid() 107 os.kill(pid, signal.SIGINT) 108 result.breakCaught = True 109 self.assertTrue(result.shouldStop) 110 os.kill(pid, signal.SIGINT) 111 self.fail("Second KeyboardInterrupt not raised") 112 113 def test_function(): 114 result = unittest.TestResult() 115 unittest.installHandler() 116 unittest.registerResult(result) 117 118 with self.assertRaises(KeyboardInterrupt): 119 test(result) 120 self.assertTrue(result.breakCaught) 121 self.withRepeats(test_function) 122 123 124 def testTwoResults(self): 125 def test_function(): 126 unittest.installHandler() 127 128 result = unittest.TestResult() 129 unittest.registerResult(result) 130 new_handler = signal.getsignal(signal.SIGINT) 131 132 result2 = unittest.TestResult() 133 unittest.registerResult(result2) 134 self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) 135 136 result3 = unittest.TestResult() 137 138 try: 139 os.kill(os.getpid(), signal.SIGINT) 140 except KeyboardInterrupt: 141 self.fail("KeyboardInterrupt not handled") 142 143 self.assertTrue(result.shouldStop) 144 self.assertTrue(result2.shouldStop) 145 self.assertFalse(result3.shouldStop) 146 self.withRepeats(test_function) 147 148 149 def testHandlerReplacedButCalled(self): 150 # Can't use skipIf decorator because the signal handler may have 151 # been changed after defining this method. 152 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: 153 self.skipTest("test requires SIGINT to not be ignored") 154 155 def test_function(): 156 # If our handler has been replaced (is no longer installed) but is 157 # called by the *new* handler, then it isn't safe to delay the 158 # SIGINT and we should immediately delegate to the default handler 159 unittest.installHandler() 160 161 handler = signal.getsignal(signal.SIGINT) 162 def new_handler(frame, signum): 163 handler(frame, signum) 164 signal.signal(signal.SIGINT, new_handler) 165 166 try: 167 os.kill(os.getpid(), signal.SIGINT) 168 except KeyboardInterrupt: 169 pass 170 else: 171 self.fail("replaced but delegated handler doesn't raise interrupt") 172 self.withRepeats(test_function) 173 174 def testRunner(self): 175 # Creating a TextTestRunner with the appropriate argument should 176 # register the TextTestResult it creates 177 runner = unittest.TextTestRunner(stream=io.StringIO()) 178 179 result = runner.run(unittest.TestSuite()) 180 self.assertIn(result, unittest.signals._results) 181 182 def testWeakReferences(self): 183 # Calling registerResult on a result should not keep it alive 184 result = unittest.TestResult() 185 unittest.registerResult(result) 186 187 ref = weakref.ref(result) 188 del result 189 190 # For non-reference counting implementations 191 gc.collect();gc.collect() 192 self.assertIsNone(ref()) 193 194 195 def testRemoveResult(self): 196 result = unittest.TestResult() 197 unittest.registerResult(result) 198 199 unittest.installHandler() 200 self.assertTrue(unittest.removeResult(result)) 201 202 # Should this raise an error instead? 203 self.assertFalse(unittest.removeResult(unittest.TestResult())) 204 205 try: 206 pid = os.getpid() 207 os.kill(pid, signal.SIGINT) 208 except KeyboardInterrupt: 209 pass 210 211 self.assertFalse(result.shouldStop) 212 213 def testMainInstallsHandler(self): 214 failfast = object() 215 test = object() 216 verbosity = object() 217 result = object() 218 default_handler = signal.getsignal(signal.SIGINT) 219 220 class FakeRunner(object): 221 initArgs = [] 222 runArgs = [] 223 def __init__(self, *args, **kwargs): 224 self.initArgs.append((args, kwargs)) 225 def run(self, test): 226 self.runArgs.append(test) 227 return result 228 229 class Program(unittest.TestProgram): 230 def __init__(self, catchbreak): 231 self.exit = False 232 self.verbosity = verbosity 233 self.failfast = failfast 234 self.catchbreak = catchbreak 235 self.tb_locals = False 236 self.testRunner = FakeRunner 237 self.test = test 238 self.result = None 239 240 p = Program(False) 241 p.runTests() 242 243 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, 244 'verbosity': verbosity, 245 'failfast': failfast, 246 'tb_locals': False, 247 'warnings': None})]) 248 self.assertEqual(FakeRunner.runArgs, [test]) 249 self.assertEqual(p.result, result) 250 251 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 252 253 FakeRunner.initArgs = [] 254 FakeRunner.runArgs = [] 255 p = Program(True) 256 p.runTests() 257 258 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, 259 'verbosity': verbosity, 260 'failfast': failfast, 261 'tb_locals': False, 262 'warnings': None})]) 263 self.assertEqual(FakeRunner.runArgs, [test]) 264 self.assertEqual(p.result, result) 265 266 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 267 268 def testRemoveHandler(self): 269 default_handler = signal.getsignal(signal.SIGINT) 270 unittest.installHandler() 271 unittest.removeHandler() 272 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 273 274 # check that calling removeHandler multiple times has no ill-effect 275 unittest.removeHandler() 276 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 277 278 def testRemoveHandlerAsDecorator(self): 279 default_handler = signal.getsignal(signal.SIGINT) 280 unittest.installHandler() 281 282 @unittest.removeHandler 283 def test(): 284 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 285 286 test() 287 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 288 289@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 290@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 291class TestBreakDefaultIntHandler(TestBreak): 292 int_handler = signal.default_int_handler 293 294@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 295@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 296class TestBreakSignalIgnored(TestBreak): 297 int_handler = signal.SIG_IGN 298 299@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 300@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 301class TestBreakSignalDefault(TestBreak): 302 int_handler = signal.SIG_DFL 303 304 305if __name__ == "__main__": 306 unittest.main() 307