1from contextlib import contextmanager 2import datetime 3import faulthandler 4import os 5import re 6import signal 7import subprocess 8import sys 9from test import support 10from test.support import os_helper 11from test.support import script_helper, is_android 12from test.support import skip_if_sanitizer 13import tempfile 14import unittest 15from textwrap import dedent 16 17try: 18 import _testcapi 19except ImportError: 20 _testcapi = None 21 22if not support.has_subprocess_support: 23 raise unittest.SkipTest("test module requires subprocess") 24 25TIMEOUT = 0.5 26MS_WINDOWS = (os.name == 'nt') 27 28 29def expected_traceback(lineno1, lineno2, header, min_count=1): 30 regex = header 31 regex += ' File "<string>", line %s in func\n' % lineno1 32 regex += ' File "<string>", line %s in <module>' % lineno2 33 if 1 < min_count: 34 return '^' + (regex + '\n') * (min_count - 1) + regex 35 else: 36 return '^' + regex + '$' 37 38def skip_segfault_on_android(test): 39 # Issue #32138: Raising SIGSEGV on Android may not cause a crash. 40 return unittest.skipIf(is_android, 41 'raising SIGSEGV on Android is unreliable')(test) 42 43@contextmanager 44def temporary_filename(): 45 filename = tempfile.mktemp() 46 try: 47 yield filename 48 finally: 49 os_helper.unlink(filename) 50 51class FaultHandlerTests(unittest.TestCase): 52 53 def get_output(self, code, filename=None, fd=None): 54 """ 55 Run the specified code in Python (in a new child process) and read the 56 output from the standard error or from a file (if filename is set). 57 Return the output lines as a list. 58 59 Strip the reference count from the standard error for Python debug 60 build, and replace "Current thread 0x00007f8d8fbd9700" by "Current 61 thread XXX". 62 """ 63 code = dedent(code).strip() 64 pass_fds = [] 65 if fd is not None: 66 pass_fds.append(fd) 67 with support.SuppressCrashReport(): 68 process = script_helper.spawn_python('-c', code, pass_fds=pass_fds) 69 with process: 70 output, stderr = process.communicate() 71 exitcode = process.wait() 72 output = output.decode('ascii', 'backslashreplace') 73 if filename: 74 self.assertEqual(output, '') 75 with open(filename, "rb") as fp: 76 output = fp.read() 77 output = output.decode('ascii', 'backslashreplace') 78 elif fd is not None: 79 self.assertEqual(output, '') 80 os.lseek(fd, os.SEEK_SET, 0) 81 with open(fd, "rb", closefd=False) as fp: 82 output = fp.read() 83 output = output.decode('ascii', 'backslashreplace') 84 return output.splitlines(), exitcode 85 86 def check_error(self, code, lineno, fatal_error, *, 87 filename=None, all_threads=True, other_regex=None, 88 fd=None, know_current_thread=True, 89 py_fatal_error=False, 90 garbage_collecting=False, 91 function='<module>'): 92 """ 93 Check that the fault handler for fatal errors is enabled and check the 94 traceback from the child process output. 95 96 Raise an error if the output doesn't match the expected format. 97 """ 98 if all_threads: 99 if know_current_thread: 100 header = 'Current thread 0x[0-9a-f]+' 101 else: 102 header = 'Thread 0x[0-9a-f]+' 103 else: 104 header = 'Stack' 105 regex = [f'^{fatal_error}'] 106 if py_fatal_error: 107 regex.append("Python runtime state: initialized") 108 regex.append('') 109 regex.append(fr'{header} \(most recent call first\):') 110 if garbage_collecting: 111 regex.append(' Garbage-collecting') 112 regex.append(fr' File "<string>", line {lineno} in {function}') 113 regex = '\n'.join(regex) 114 115 if other_regex: 116 regex = f'(?:{regex}|{other_regex})' 117 118 # Enable MULTILINE flag 119 regex = f'(?m){regex}' 120 output, exitcode = self.get_output(code, filename=filename, fd=fd) 121 output = '\n'.join(output) 122 self.assertRegex(output, regex) 123 self.assertNotEqual(exitcode, 0) 124 125 def check_fatal_error(self, code, line_number, name_regex, func=None, **kw): 126 if func: 127 name_regex = '%s: %s' % (func, name_regex) 128 fatal_error = 'Fatal Python error: %s' % name_regex 129 self.check_error(code, line_number, fatal_error, **kw) 130 131 def check_windows_exception(self, code, line_number, name_regex, **kw): 132 fatal_error = 'Windows fatal exception: %s' % name_regex 133 self.check_error(code, line_number, fatal_error, **kw) 134 135 @unittest.skipIf(sys.platform.startswith('aix'), 136 "the first page of memory is a mapped read-only on AIX") 137 def test_read_null(self): 138 if not MS_WINDOWS: 139 self.check_fatal_error(""" 140 import faulthandler 141 faulthandler.enable() 142 faulthandler._read_null() 143 """, 144 3, 145 # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion 146 '(?:Segmentation fault' 147 '|Bus error' 148 '|Illegal instruction)') 149 else: 150 self.check_windows_exception(""" 151 import faulthandler 152 faulthandler.enable() 153 faulthandler._read_null() 154 """, 155 3, 156 'access violation') 157 158 @skip_segfault_on_android 159 def test_sigsegv(self): 160 self.check_fatal_error(""" 161 import faulthandler 162 faulthandler.enable() 163 faulthandler._sigsegv() 164 """, 165 3, 166 'Segmentation fault') 167 168 @skip_segfault_on_android 169 def test_gc(self): 170 # bpo-44466: Detect if the GC is running 171 self.check_fatal_error(""" 172 import faulthandler 173 import gc 174 import sys 175 176 faulthandler.enable() 177 178 class RefCycle: 179 def __del__(self): 180 faulthandler._sigsegv() 181 182 # create a reference cycle which triggers a fatal 183 # error in a destructor 184 a = RefCycle() 185 b = RefCycle() 186 a.b = b 187 b.a = a 188 189 # Delete the objects, not the cycle 190 a = None 191 b = None 192 193 # Break the reference cycle: call __del__() 194 gc.collect() 195 196 # Should not reach this line 197 print("exit", file=sys.stderr) 198 """, 199 9, 200 'Segmentation fault', 201 function='__del__', 202 garbage_collecting=True) 203 204 def test_fatal_error_c_thread(self): 205 self.check_fatal_error(""" 206 import faulthandler 207 faulthandler.enable() 208 faulthandler._fatal_error_c_thread() 209 """, 210 3, 211 'in new thread', 212 know_current_thread=False, 213 func='faulthandler_fatal_error_thread', 214 py_fatal_error=True) 215 216 def test_sigabrt(self): 217 self.check_fatal_error(""" 218 import faulthandler 219 faulthandler.enable() 220 faulthandler._sigabrt() 221 """, 222 3, 223 'Aborted') 224 225 @unittest.skipIf(sys.platform == 'win32', 226 "SIGFPE cannot be caught on Windows") 227 def test_sigfpe(self): 228 self.check_fatal_error(""" 229 import faulthandler 230 faulthandler.enable() 231 faulthandler._sigfpe() 232 """, 233 3, 234 'Floating point exception') 235 236 @unittest.skipIf(_testcapi is None, 'need _testcapi') 237 @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS') 238 @skip_segfault_on_android 239 def test_sigbus(self): 240 self.check_fatal_error(""" 241 import faulthandler 242 import signal 243 244 faulthandler.enable() 245 signal.raise_signal(signal.SIGBUS) 246 """, 247 5, 248 'Bus error') 249 250 @unittest.skipIf(_testcapi is None, 'need _testcapi') 251 @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL') 252 @skip_segfault_on_android 253 def test_sigill(self): 254 self.check_fatal_error(""" 255 import faulthandler 256 import signal 257 258 faulthandler.enable() 259 signal.raise_signal(signal.SIGILL) 260 """, 261 5, 262 'Illegal instruction') 263 264 def check_fatal_error_func(self, release_gil): 265 # Test that Py_FatalError() dumps a traceback 266 with support.SuppressCrashReport(): 267 self.check_fatal_error(f""" 268 import _testcapi 269 _testcapi.fatal_error(b'xyz', {release_gil}) 270 """, 271 2, 272 'xyz', 273 func='test_fatal_error', 274 py_fatal_error=True) 275 276 def test_fatal_error(self): 277 self.check_fatal_error_func(False) 278 279 def test_fatal_error_without_gil(self): 280 self.check_fatal_error_func(True) 281 282 @unittest.skipIf(sys.platform.startswith('openbsd'), 283 "Issue #12868: sigaltstack() doesn't work on " 284 "OpenBSD if Python is compiled with pthread") 285 @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'), 286 'need faulthandler._stack_overflow()') 287 def test_stack_overflow(self): 288 self.check_fatal_error(""" 289 import faulthandler 290 faulthandler.enable() 291 faulthandler._stack_overflow() 292 """, 293 3, 294 '(?:Segmentation fault|Bus error)', 295 other_regex='unable to raise a stack overflow') 296 297 @skip_segfault_on_android 298 def test_gil_released(self): 299 self.check_fatal_error(""" 300 import faulthandler 301 faulthandler.enable() 302 faulthandler._sigsegv(True) 303 """, 304 3, 305 'Segmentation fault') 306 307 @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer " 308 "builds change crashing process output.") 309 @skip_segfault_on_android 310 def test_enable_file(self): 311 with temporary_filename() as filename: 312 self.check_fatal_error(""" 313 import faulthandler 314 output = open({filename}, 'wb') 315 faulthandler.enable(output) 316 faulthandler._sigsegv() 317 """.format(filename=repr(filename)), 318 4, 319 'Segmentation fault', 320 filename=filename) 321 322 @unittest.skipIf(sys.platform == "win32", 323 "subprocess doesn't support pass_fds on Windows") 324 @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer " 325 "builds change crashing process output.") 326 @skip_segfault_on_android 327 def test_enable_fd(self): 328 with tempfile.TemporaryFile('wb+') as fp: 329 fd = fp.fileno() 330 self.check_fatal_error(""" 331 import faulthandler 332 import sys 333 faulthandler.enable(%s) 334 faulthandler._sigsegv() 335 """ % fd, 336 4, 337 'Segmentation fault', 338 fd=fd) 339 340 @skip_segfault_on_android 341 def test_enable_single_thread(self): 342 self.check_fatal_error(""" 343 import faulthandler 344 faulthandler.enable(all_threads=False) 345 faulthandler._sigsegv() 346 """, 347 3, 348 'Segmentation fault', 349 all_threads=False) 350 351 @skip_segfault_on_android 352 def test_disable(self): 353 code = """ 354 import faulthandler 355 faulthandler.enable() 356 faulthandler.disable() 357 faulthandler._sigsegv() 358 """ 359 not_expected = 'Fatal Python error' 360 stderr, exitcode = self.get_output(code) 361 stderr = '\n'.join(stderr) 362 self.assertTrue(not_expected not in stderr, 363 "%r is present in %r" % (not_expected, stderr)) 364 self.assertNotEqual(exitcode, 0) 365 366 @skip_segfault_on_android 367 def test_dump_ext_modules(self): 368 code = """ 369 import faulthandler 370 import sys 371 # Don't filter stdlib module names 372 sys.stdlib_module_names = frozenset() 373 faulthandler.enable() 374 faulthandler._sigsegv() 375 """ 376 stderr, exitcode = self.get_output(code) 377 stderr = '\n'.join(stderr) 378 match = re.search(r'^Extension modules:(.*) \(total: [0-9]+\)$', 379 stderr, re.MULTILINE) 380 if not match: 381 self.fail(f"Cannot find 'Extension modules:' in {stderr!r}") 382 modules = set(match.group(1).strip().split(', ')) 383 for name in ('sys', 'faulthandler'): 384 self.assertIn(name, modules) 385 386 def test_is_enabled(self): 387 orig_stderr = sys.stderr 388 try: 389 # regrtest may replace sys.stderr by io.StringIO object, but 390 # faulthandler.enable() requires that sys.stderr has a fileno() 391 # method 392 sys.stderr = sys.__stderr__ 393 394 was_enabled = faulthandler.is_enabled() 395 try: 396 faulthandler.enable() 397 self.assertTrue(faulthandler.is_enabled()) 398 faulthandler.disable() 399 self.assertFalse(faulthandler.is_enabled()) 400 finally: 401 if was_enabled: 402 faulthandler.enable() 403 else: 404 faulthandler.disable() 405 finally: 406 sys.stderr = orig_stderr 407 408 @support.requires_subprocess() 409 def test_disabled_by_default(self): 410 # By default, the module should be disabled 411 code = "import faulthandler; print(faulthandler.is_enabled())" 412 args = (sys.executable, "-E", "-c", code) 413 # don't use assert_python_ok() because it always enables faulthandler 414 output = subprocess.check_output(args) 415 self.assertEqual(output.rstrip(), b"False") 416 417 @support.requires_subprocess() 418 def test_sys_xoptions(self): 419 # Test python -X faulthandler 420 code = "import faulthandler; print(faulthandler.is_enabled())" 421 args = filter(None, (sys.executable, 422 "-E" if sys.flags.ignore_environment else "", 423 "-X", "faulthandler", "-c", code)) 424 env = os.environ.copy() 425 env.pop("PYTHONFAULTHANDLER", None) 426 # don't use assert_python_ok() because it always enables faulthandler 427 output = subprocess.check_output(args, env=env) 428 self.assertEqual(output.rstrip(), b"True") 429 430 @support.requires_subprocess() 431 def test_env_var(self): 432 # empty env var 433 code = "import faulthandler; print(faulthandler.is_enabled())" 434 args = (sys.executable, "-c", code) 435 env = dict(os.environ) 436 env['PYTHONFAULTHANDLER'] = '' 437 env['PYTHONDEVMODE'] = '' 438 # don't use assert_python_ok() because it always enables faulthandler 439 output = subprocess.check_output(args, env=env) 440 self.assertEqual(output.rstrip(), b"False") 441 442 # non-empty env var 443 env = dict(os.environ) 444 env['PYTHONFAULTHANDLER'] = '1' 445 env['PYTHONDEVMODE'] = '' 446 output = subprocess.check_output(args, env=env) 447 self.assertEqual(output.rstrip(), b"True") 448 449 def check_dump_traceback(self, *, filename=None, fd=None): 450 """ 451 Explicitly call dump_traceback() function and check its output. 452 Raise an error if the output doesn't match the expected format. 453 """ 454 code = """ 455 import faulthandler 456 457 filename = {filename!r} 458 fd = {fd} 459 460 def funcB(): 461 if filename: 462 with open(filename, "wb") as fp: 463 faulthandler.dump_traceback(fp, all_threads=False) 464 elif fd is not None: 465 faulthandler.dump_traceback(fd, 466 all_threads=False) 467 else: 468 faulthandler.dump_traceback(all_threads=False) 469 470 def funcA(): 471 funcB() 472 473 funcA() 474 """ 475 code = code.format( 476 filename=filename, 477 fd=fd, 478 ) 479 if filename: 480 lineno = 9 481 elif fd is not None: 482 lineno = 11 483 else: 484 lineno = 14 485 expected = [ 486 'Stack (most recent call first):', 487 ' File "<string>", line %s in funcB' % lineno, 488 ' File "<string>", line 17 in funcA', 489 ' File "<string>", line 19 in <module>' 490 ] 491 trace, exitcode = self.get_output(code, filename, fd) 492 self.assertEqual(trace, expected) 493 self.assertEqual(exitcode, 0) 494 495 def test_dump_traceback(self): 496 self.check_dump_traceback() 497 498 def test_dump_traceback_file(self): 499 with temporary_filename() as filename: 500 self.check_dump_traceback(filename=filename) 501 502 @unittest.skipIf(sys.platform == "win32", 503 "subprocess doesn't support pass_fds on Windows") 504 def test_dump_traceback_fd(self): 505 with tempfile.TemporaryFile('wb+') as fp: 506 self.check_dump_traceback(fd=fp.fileno()) 507 508 def test_truncate(self): 509 maxlen = 500 510 func_name = 'x' * (maxlen + 50) 511 truncated = 'x' * maxlen + '...' 512 code = """ 513 import faulthandler 514 515 def {func_name}(): 516 faulthandler.dump_traceback(all_threads=False) 517 518 {func_name}() 519 """ 520 code = code.format( 521 func_name=func_name, 522 ) 523 expected = [ 524 'Stack (most recent call first):', 525 ' File "<string>", line 4 in %s' % truncated, 526 ' File "<string>", line 6 in <module>' 527 ] 528 trace, exitcode = self.get_output(code) 529 self.assertEqual(trace, expected) 530 self.assertEqual(exitcode, 0) 531 532 def check_dump_traceback_threads(self, filename): 533 """ 534 Call explicitly dump_traceback(all_threads=True) and check the output. 535 Raise an error if the output doesn't match the expected format. 536 """ 537 code = """ 538 import faulthandler 539 from threading import Thread, Event 540 import time 541 542 def dump(): 543 if {filename}: 544 with open({filename}, "wb") as fp: 545 faulthandler.dump_traceback(fp, all_threads=True) 546 else: 547 faulthandler.dump_traceback(all_threads=True) 548 549 class Waiter(Thread): 550 # avoid blocking if the main thread raises an exception. 551 daemon = True 552 553 def __init__(self): 554 Thread.__init__(self) 555 self.running = Event() 556 self.stop = Event() 557 558 def run(self): 559 self.running.set() 560 self.stop.wait() 561 562 waiter = Waiter() 563 waiter.start() 564 waiter.running.wait() 565 dump() 566 waiter.stop.set() 567 waiter.join() 568 """ 569 code = code.format(filename=repr(filename)) 570 output, exitcode = self.get_output(code, filename) 571 output = '\n'.join(output) 572 if filename: 573 lineno = 8 574 else: 575 lineno = 10 576 regex = r""" 577 ^Thread 0x[0-9a-f]+ \(most recent call first\): 578 (?: File ".*threading.py", line [0-9]+ in [_a-z]+ 579 ){{1,3}} File "<string>", line 23 in run 580 File ".*threading.py", line [0-9]+ in _bootstrap_inner 581 File ".*threading.py", line [0-9]+ in _bootstrap 582 583 Current thread 0x[0-9a-f]+ \(most recent call first\): 584 File "<string>", line {lineno} in dump 585 File "<string>", line 28 in <module>$ 586 """ 587 regex = dedent(regex.format(lineno=lineno)).strip() 588 self.assertRegex(output, regex) 589 self.assertEqual(exitcode, 0) 590 591 def test_dump_traceback_threads(self): 592 self.check_dump_traceback_threads(None) 593 594 def test_dump_traceback_threads_file(self): 595 with temporary_filename() as filename: 596 self.check_dump_traceback_threads(filename) 597 598 def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1, 599 *, filename=None, fd=None): 600 """ 601 Check how many times the traceback is written in timeout x 2.5 seconds, 602 or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending 603 on repeat and cancel options. 604 605 Raise an error if the output doesn't match the expect format. 606 """ 607 timeout_str = str(datetime.timedelta(seconds=TIMEOUT)) 608 code = """ 609 import faulthandler 610 import time 611 import sys 612 613 timeout = {timeout} 614 repeat = {repeat} 615 cancel = {cancel} 616 loops = {loops} 617 filename = {filename!r} 618 fd = {fd} 619 620 def func(timeout, repeat, cancel, file, loops): 621 for loop in range(loops): 622 faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file) 623 if cancel: 624 faulthandler.cancel_dump_traceback_later() 625 time.sleep(timeout * 5) 626 faulthandler.cancel_dump_traceback_later() 627 628 if filename: 629 file = open(filename, "wb") 630 elif fd is not None: 631 file = sys.stderr.fileno() 632 else: 633 file = None 634 func(timeout, repeat, cancel, file, loops) 635 if filename: 636 file.close() 637 """ 638 code = code.format( 639 timeout=TIMEOUT, 640 repeat=repeat, 641 cancel=cancel, 642 loops=loops, 643 filename=filename, 644 fd=fd, 645 ) 646 trace, exitcode = self.get_output(code, filename) 647 trace = '\n'.join(trace) 648 649 if not cancel: 650 count = loops 651 if repeat: 652 count *= 2 653 header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str 654 regex = expected_traceback(17, 26, header, min_count=count) 655 self.assertRegex(trace, regex) 656 else: 657 self.assertEqual(trace, '') 658 self.assertEqual(exitcode, 0) 659 660 def test_dump_traceback_later(self): 661 self.check_dump_traceback_later() 662 663 def test_dump_traceback_later_repeat(self): 664 self.check_dump_traceback_later(repeat=True) 665 666 def test_dump_traceback_later_cancel(self): 667 self.check_dump_traceback_later(cancel=True) 668 669 def test_dump_traceback_later_file(self): 670 with temporary_filename() as filename: 671 self.check_dump_traceback_later(filename=filename) 672 673 @unittest.skipIf(sys.platform == "win32", 674 "subprocess doesn't support pass_fds on Windows") 675 def test_dump_traceback_later_fd(self): 676 with tempfile.TemporaryFile('wb+') as fp: 677 self.check_dump_traceback_later(fd=fp.fileno()) 678 679 def test_dump_traceback_later_twice(self): 680 self.check_dump_traceback_later(loops=2) 681 682 @unittest.skipIf(not hasattr(faulthandler, "register"), 683 "need faulthandler.register") 684 def check_register(self, filename=False, all_threads=False, 685 unregister=False, chain=False, fd=None): 686 """ 687 Register a handler displaying the traceback on a user signal. Raise the 688 signal and check the written traceback. 689 690 If chain is True, check that the previous signal handler is called. 691 692 Raise an error if the output doesn't match the expected format. 693 """ 694 signum = signal.SIGUSR1 695 code = """ 696 import faulthandler 697 import os 698 import signal 699 import sys 700 701 all_threads = {all_threads} 702 signum = {signum:d} 703 unregister = {unregister} 704 chain = {chain} 705 filename = {filename!r} 706 fd = {fd} 707 708 def func(signum): 709 os.kill(os.getpid(), signum) 710 711 def handler(signum, frame): 712 handler.called = True 713 handler.called = False 714 715 if filename: 716 file = open(filename, "wb") 717 elif fd is not None: 718 file = sys.stderr.fileno() 719 else: 720 file = None 721 if chain: 722 signal.signal(signum, handler) 723 faulthandler.register(signum, file=file, 724 all_threads=all_threads, chain={chain}) 725 if unregister: 726 faulthandler.unregister(signum) 727 func(signum) 728 if chain and not handler.called: 729 if file is not None: 730 output = file 731 else: 732 output = sys.stderr 733 print("Error: signal handler not called!", file=output) 734 exitcode = 1 735 else: 736 exitcode = 0 737 if filename: 738 file.close() 739 sys.exit(exitcode) 740 """ 741 code = code.format( 742 all_threads=all_threads, 743 signum=signum, 744 unregister=unregister, 745 chain=chain, 746 filename=filename, 747 fd=fd, 748 ) 749 trace, exitcode = self.get_output(code, filename) 750 trace = '\n'.join(trace) 751 if not unregister: 752 if all_threads: 753 regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n' 754 else: 755 regex = r'Stack \(most recent call first\):\n' 756 regex = expected_traceback(14, 32, regex) 757 self.assertRegex(trace, regex) 758 else: 759 self.assertEqual(trace, '') 760 if unregister: 761 self.assertNotEqual(exitcode, 0) 762 else: 763 self.assertEqual(exitcode, 0) 764 765 def test_register(self): 766 self.check_register() 767 768 def test_unregister(self): 769 self.check_register(unregister=True) 770 771 def test_register_file(self): 772 with temporary_filename() as filename: 773 self.check_register(filename=filename) 774 775 @unittest.skipIf(sys.platform == "win32", 776 "subprocess doesn't support pass_fds on Windows") 777 def test_register_fd(self): 778 with tempfile.TemporaryFile('wb+') as fp: 779 self.check_register(fd=fp.fileno()) 780 781 def test_register_threads(self): 782 self.check_register(all_threads=True) 783 784 def test_register_chain(self): 785 self.check_register(chain=True) 786 787 @contextmanager 788 def check_stderr_none(self): 789 stderr = sys.stderr 790 try: 791 sys.stderr = None 792 with self.assertRaises(RuntimeError) as cm: 793 yield 794 self.assertEqual(str(cm.exception), "sys.stderr is None") 795 finally: 796 sys.stderr = stderr 797 798 def test_stderr_None(self): 799 # Issue #21497: provide a helpful error if sys.stderr is None, 800 # instead of just an attribute error: "None has no attribute fileno". 801 with self.check_stderr_none(): 802 faulthandler.enable() 803 with self.check_stderr_none(): 804 faulthandler.dump_traceback() 805 with self.check_stderr_none(): 806 faulthandler.dump_traceback_later(1e-3) 807 if hasattr(faulthandler, "register"): 808 with self.check_stderr_none(): 809 faulthandler.register(signal.SIGUSR1) 810 811 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 812 def test_raise_exception(self): 813 for exc, name in ( 814 ('EXCEPTION_ACCESS_VIOLATION', 'access violation'), 815 ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'), 816 ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'), 817 ): 818 self.check_windows_exception(f""" 819 import faulthandler 820 faulthandler.enable() 821 faulthandler._raise_exception(faulthandler._{exc}) 822 """, 823 3, 824 name) 825 826 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 827 def test_ignore_exception(self): 828 for exc_code in ( 829 0xE06D7363, # MSC exception ("Emsc") 830 0xE0434352, # COM Callable Runtime exception ("ECCR") 831 ): 832 code = f""" 833 import faulthandler 834 faulthandler.enable() 835 faulthandler._raise_exception({exc_code}) 836 """ 837 code = dedent(code) 838 output, exitcode = self.get_output(code) 839 self.assertEqual(output, []) 840 self.assertEqual(exitcode, exc_code) 841 842 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 843 def test_raise_nonfatal_exception(self): 844 # These exceptions are not strictly errors. Letting 845 # faulthandler display the traceback when they are 846 # raised is likely to result in noise. However, they 847 # may still terminate the process if there is no 848 # handler installed for them (which there typically 849 # is, e.g. for debug messages). 850 for exc in ( 851 0x00000000, 852 0x34567890, 853 0x40000000, 854 0x40001000, 855 0x70000000, 856 0x7FFFFFFF, 857 ): 858 output, exitcode = self.get_output(f""" 859 import faulthandler 860 faulthandler.enable() 861 faulthandler._raise_exception(0x{exc:x}) 862 """ 863 ) 864 self.assertEqual(output, []) 865 # On Windows older than 7 SP1, the actual exception code has 866 # bit 29 cleared. 867 self.assertIn(exitcode, 868 (exc, exc & ~0x10000000)) 869 870 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 871 def test_disable_windows_exc_handler(self): 872 code = dedent(""" 873 import faulthandler 874 faulthandler.enable() 875 faulthandler.disable() 876 code = faulthandler._EXCEPTION_ACCESS_VIOLATION 877 faulthandler._raise_exception(code) 878 """) 879 output, exitcode = self.get_output(code) 880 self.assertEqual(output, []) 881 self.assertEqual(exitcode, 0xC0000005) 882 883 def test_cancel_later_without_dump_traceback_later(self): 884 # bpo-37933: Calling cancel_dump_traceback_later() 885 # without dump_traceback_later() must not segfault. 886 code = dedent(""" 887 import faulthandler 888 faulthandler.cancel_dump_traceback_later() 889 """) 890 output, exitcode = self.get_output(code) 891 self.assertEqual(output, []) 892 self.assertEqual(exitcode, 0) 893 894 895if __name__ == "__main__": 896 unittest.main() 897