1from contextlib import contextmanager
2import datetime
3import faulthandler
4import os
5import re
6import signal
7import subprocess
8import sys
9from test import support
10from test.support import os_helper
11from test.support import script_helper, is_android
12from test.support import skip_if_sanitizer
13import tempfile
14import unittest
15from textwrap import dedent
16
17try:
18    import _testcapi
19except ImportError:
20    _testcapi = None
21
22if not support.has_subprocess_support:
23    raise unittest.SkipTest("test module requires subprocess")
24
25TIMEOUT = 0.5
26MS_WINDOWS = (os.name == 'nt')
27
28
29def expected_traceback(lineno1, lineno2, header, min_count=1):
30    regex = header
31    regex += '  File "<string>", line %s in func\n' % lineno1
32    regex += '  File "<string>", line %s in <module>' % lineno2
33    if 1 < min_count:
34        return '^' + (regex + '\n') * (min_count - 1) + regex
35    else:
36        return '^' + regex + '$'
37
38def skip_segfault_on_android(test):
39    # Issue #32138: Raising SIGSEGV on Android may not cause a crash.
40    return unittest.skipIf(is_android,
41                           'raising SIGSEGV on Android is unreliable')(test)
42
43@contextmanager
44def temporary_filename():
45    filename = tempfile.mktemp()
46    try:
47        yield filename
48    finally:
49        os_helper.unlink(filename)
50
51class FaultHandlerTests(unittest.TestCase):
52
53    def get_output(self, code, filename=None, fd=None):
54        """
55        Run the specified code in Python (in a new child process) and read the
56        output from the standard error or from a file (if filename is set).
57        Return the output lines as a list.
58
59        Strip the reference count from the standard error for Python debug
60        build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
61        thread XXX".
62        """
63        code = dedent(code).strip()
64        pass_fds = []
65        if fd is not None:
66            pass_fds.append(fd)
67        with support.SuppressCrashReport():
68            process = script_helper.spawn_python('-c', code, pass_fds=pass_fds)
69            with process:
70                output, stderr = process.communicate()
71                exitcode = process.wait()
72        output = output.decode('ascii', 'backslashreplace')
73        if filename:
74            self.assertEqual(output, '')
75            with open(filename, "rb") as fp:
76                output = fp.read()
77            output = output.decode('ascii', 'backslashreplace')
78        elif fd is not None:
79            self.assertEqual(output, '')
80            os.lseek(fd, os.SEEK_SET, 0)
81            with open(fd, "rb", closefd=False) as fp:
82                output = fp.read()
83            output = output.decode('ascii', 'backslashreplace')
84        return output.splitlines(), exitcode
85
86    def check_error(self, code, lineno, fatal_error, *,
87                    filename=None, all_threads=True, other_regex=None,
88                    fd=None, know_current_thread=True,
89                    py_fatal_error=False,
90                    garbage_collecting=False,
91                    function='<module>'):
92        """
93        Check that the fault handler for fatal errors is enabled and check the
94        traceback from the child process output.
95
96        Raise an error if the output doesn't match the expected format.
97        """
98        if all_threads:
99            if know_current_thread:
100                header = 'Current thread 0x[0-9a-f]+'
101            else:
102                header = 'Thread 0x[0-9a-f]+'
103        else:
104            header = 'Stack'
105        regex = [f'^{fatal_error}']
106        if py_fatal_error:
107            regex.append("Python runtime state: initialized")
108        regex.append('')
109        regex.append(fr'{header} \(most recent call first\):')
110        if garbage_collecting:
111            regex.append('  Garbage-collecting')
112        regex.append(fr'  File "<string>", line {lineno} in {function}')
113        regex = '\n'.join(regex)
114
115        if other_regex:
116            regex = f'(?:{regex}|{other_regex})'
117
118        # Enable MULTILINE flag
119        regex = f'(?m){regex}'
120        output, exitcode = self.get_output(code, filename=filename, fd=fd)
121        output = '\n'.join(output)
122        self.assertRegex(output, regex)
123        self.assertNotEqual(exitcode, 0)
124
125    def check_fatal_error(self, code, line_number, name_regex, func=None, **kw):
126        if func:
127            name_regex = '%s: %s' % (func, name_regex)
128        fatal_error = 'Fatal Python error: %s' % name_regex
129        self.check_error(code, line_number, fatal_error, **kw)
130
131    def check_windows_exception(self, code, line_number, name_regex, **kw):
132        fatal_error = 'Windows fatal exception: %s' % name_regex
133        self.check_error(code, line_number, fatal_error, **kw)
134
135    @unittest.skipIf(sys.platform.startswith('aix'),
136                     "the first page of memory is a mapped read-only on AIX")
137    def test_read_null(self):
138        if not MS_WINDOWS:
139            self.check_fatal_error("""
140                import faulthandler
141                faulthandler.enable()
142                faulthandler._read_null()
143                """,
144                3,
145                # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
146                '(?:Segmentation fault'
147                    '|Bus error'
148                    '|Illegal instruction)')
149        else:
150            self.check_windows_exception("""
151                import faulthandler
152                faulthandler.enable()
153                faulthandler._read_null()
154                """,
155                3,
156                'access violation')
157
158    @skip_segfault_on_android
159    def test_sigsegv(self):
160        self.check_fatal_error("""
161            import faulthandler
162            faulthandler.enable()
163            faulthandler._sigsegv()
164            """,
165            3,
166            'Segmentation fault')
167
168    @skip_segfault_on_android
169    def test_gc(self):
170        # bpo-44466: Detect if the GC is running
171        self.check_fatal_error("""
172            import faulthandler
173            import gc
174            import sys
175
176            faulthandler.enable()
177
178            class RefCycle:
179                def __del__(self):
180                    faulthandler._sigsegv()
181
182            # create a reference cycle which triggers a fatal
183            # error in a destructor
184            a = RefCycle()
185            b = RefCycle()
186            a.b = b
187            b.a = a
188
189            # Delete the objects, not the cycle
190            a = None
191            b = None
192
193            # Break the reference cycle: call __del__()
194            gc.collect()
195
196            # Should not reach this line
197            print("exit", file=sys.stderr)
198            """,
199            9,
200            'Segmentation fault',
201            function='__del__',
202            garbage_collecting=True)
203
204    def test_fatal_error_c_thread(self):
205        self.check_fatal_error("""
206            import faulthandler
207            faulthandler.enable()
208            faulthandler._fatal_error_c_thread()
209            """,
210            3,
211            'in new thread',
212            know_current_thread=False,
213            func='faulthandler_fatal_error_thread',
214            py_fatal_error=True)
215
216    def test_sigabrt(self):
217        self.check_fatal_error("""
218            import faulthandler
219            faulthandler.enable()
220            faulthandler._sigabrt()
221            """,
222            3,
223            'Aborted')
224
225    @unittest.skipIf(sys.platform == 'win32',
226                     "SIGFPE cannot be caught on Windows")
227    def test_sigfpe(self):
228        self.check_fatal_error("""
229            import faulthandler
230            faulthandler.enable()
231            faulthandler._sigfpe()
232            """,
233            3,
234            'Floating point exception')
235
236    @unittest.skipIf(_testcapi is None, 'need _testcapi')
237    @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
238    @skip_segfault_on_android
239    def test_sigbus(self):
240        self.check_fatal_error("""
241            import faulthandler
242            import signal
243
244            faulthandler.enable()
245            signal.raise_signal(signal.SIGBUS)
246            """,
247            5,
248            'Bus error')
249
250    @unittest.skipIf(_testcapi is None, 'need _testcapi')
251    @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
252    @skip_segfault_on_android
253    def test_sigill(self):
254        self.check_fatal_error("""
255            import faulthandler
256            import signal
257
258            faulthandler.enable()
259            signal.raise_signal(signal.SIGILL)
260            """,
261            5,
262            'Illegal instruction')
263
264    def check_fatal_error_func(self, release_gil):
265        # Test that Py_FatalError() dumps a traceback
266        with support.SuppressCrashReport():
267            self.check_fatal_error(f"""
268                import _testcapi
269                _testcapi.fatal_error(b'xyz', {release_gil})
270                """,
271                2,
272                'xyz',
273                func='test_fatal_error',
274                py_fatal_error=True)
275
276    def test_fatal_error(self):
277        self.check_fatal_error_func(False)
278
279    def test_fatal_error_without_gil(self):
280        self.check_fatal_error_func(True)
281
282    @unittest.skipIf(sys.platform.startswith('openbsd'),
283                     "Issue #12868: sigaltstack() doesn't work on "
284                     "OpenBSD if Python is compiled with pthread")
285    @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
286                     'need faulthandler._stack_overflow()')
287    def test_stack_overflow(self):
288        self.check_fatal_error("""
289            import faulthandler
290            faulthandler.enable()
291            faulthandler._stack_overflow()
292            """,
293            3,
294            '(?:Segmentation fault|Bus error)',
295            other_regex='unable to raise a stack overflow')
296
297    @skip_segfault_on_android
298    def test_gil_released(self):
299        self.check_fatal_error("""
300            import faulthandler
301            faulthandler.enable()
302            faulthandler._sigsegv(True)
303            """,
304            3,
305            'Segmentation fault')
306
307    @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer "
308                       "builds change crashing process output.")
309    @skip_segfault_on_android
310    def test_enable_file(self):
311        with temporary_filename() as filename:
312            self.check_fatal_error("""
313                import faulthandler
314                output = open({filename}, 'wb')
315                faulthandler.enable(output)
316                faulthandler._sigsegv()
317                """.format(filename=repr(filename)),
318                4,
319                'Segmentation fault',
320                filename=filename)
321
322    @unittest.skipIf(sys.platform == "win32",
323                     "subprocess doesn't support pass_fds on Windows")
324    @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer "
325                       "builds change crashing process output.")
326    @skip_segfault_on_android
327    def test_enable_fd(self):
328        with tempfile.TemporaryFile('wb+') as fp:
329            fd = fp.fileno()
330            self.check_fatal_error("""
331                import faulthandler
332                import sys
333                faulthandler.enable(%s)
334                faulthandler._sigsegv()
335                """ % fd,
336                4,
337                'Segmentation fault',
338                fd=fd)
339
340    @skip_segfault_on_android
341    def test_enable_single_thread(self):
342        self.check_fatal_error("""
343            import faulthandler
344            faulthandler.enable(all_threads=False)
345            faulthandler._sigsegv()
346            """,
347            3,
348            'Segmentation fault',
349            all_threads=False)
350
351    @skip_segfault_on_android
352    def test_disable(self):
353        code = """
354            import faulthandler
355            faulthandler.enable()
356            faulthandler.disable()
357            faulthandler._sigsegv()
358            """
359        not_expected = 'Fatal Python error'
360        stderr, exitcode = self.get_output(code)
361        stderr = '\n'.join(stderr)
362        self.assertTrue(not_expected not in stderr,
363                     "%r is present in %r" % (not_expected, stderr))
364        self.assertNotEqual(exitcode, 0)
365
366    @skip_segfault_on_android
367    def test_dump_ext_modules(self):
368        code = """
369            import faulthandler
370            import sys
371            # Don't filter stdlib module names
372            sys.stdlib_module_names = frozenset()
373            faulthandler.enable()
374            faulthandler._sigsegv()
375            """
376        stderr, exitcode = self.get_output(code)
377        stderr = '\n'.join(stderr)
378        match = re.search(r'^Extension modules:(.*) \(total: [0-9]+\)$',
379                          stderr, re.MULTILINE)
380        if not match:
381            self.fail(f"Cannot find 'Extension modules:' in {stderr!r}")
382        modules = set(match.group(1).strip().split(', '))
383        for name in ('sys', 'faulthandler'):
384            self.assertIn(name, modules)
385
386    def test_is_enabled(self):
387        orig_stderr = sys.stderr
388        try:
389            # regrtest may replace sys.stderr by io.StringIO object, but
390            # faulthandler.enable() requires that sys.stderr has a fileno()
391            # method
392            sys.stderr = sys.__stderr__
393
394            was_enabled = faulthandler.is_enabled()
395            try:
396                faulthandler.enable()
397                self.assertTrue(faulthandler.is_enabled())
398                faulthandler.disable()
399                self.assertFalse(faulthandler.is_enabled())
400            finally:
401                if was_enabled:
402                    faulthandler.enable()
403                else:
404                    faulthandler.disable()
405        finally:
406            sys.stderr = orig_stderr
407
408    @support.requires_subprocess()
409    def test_disabled_by_default(self):
410        # By default, the module should be disabled
411        code = "import faulthandler; print(faulthandler.is_enabled())"
412        args = (sys.executable, "-E", "-c", code)
413        # don't use assert_python_ok() because it always enables faulthandler
414        output = subprocess.check_output(args)
415        self.assertEqual(output.rstrip(), b"False")
416
417    @support.requires_subprocess()
418    def test_sys_xoptions(self):
419        # Test python -X faulthandler
420        code = "import faulthandler; print(faulthandler.is_enabled())"
421        args = filter(None, (sys.executable,
422                             "-E" if sys.flags.ignore_environment else "",
423                             "-X", "faulthandler", "-c", code))
424        env = os.environ.copy()
425        env.pop("PYTHONFAULTHANDLER", None)
426        # don't use assert_python_ok() because it always enables faulthandler
427        output = subprocess.check_output(args, env=env)
428        self.assertEqual(output.rstrip(), b"True")
429
430    @support.requires_subprocess()
431    def test_env_var(self):
432        # empty env var
433        code = "import faulthandler; print(faulthandler.is_enabled())"
434        args = (sys.executable, "-c", code)
435        env = dict(os.environ)
436        env['PYTHONFAULTHANDLER'] = ''
437        env['PYTHONDEVMODE'] = ''
438        # don't use assert_python_ok() because it always enables faulthandler
439        output = subprocess.check_output(args, env=env)
440        self.assertEqual(output.rstrip(), b"False")
441
442        # non-empty env var
443        env = dict(os.environ)
444        env['PYTHONFAULTHANDLER'] = '1'
445        env['PYTHONDEVMODE'] = ''
446        output = subprocess.check_output(args, env=env)
447        self.assertEqual(output.rstrip(), b"True")
448
449    def check_dump_traceback(self, *, filename=None, fd=None):
450        """
451        Explicitly call dump_traceback() function and check its output.
452        Raise an error if the output doesn't match the expected format.
453        """
454        code = """
455            import faulthandler
456
457            filename = {filename!r}
458            fd = {fd}
459
460            def funcB():
461                if filename:
462                    with open(filename, "wb") as fp:
463                        faulthandler.dump_traceback(fp, all_threads=False)
464                elif fd is not None:
465                    faulthandler.dump_traceback(fd,
466                                                all_threads=False)
467                else:
468                    faulthandler.dump_traceback(all_threads=False)
469
470            def funcA():
471                funcB()
472
473            funcA()
474            """
475        code = code.format(
476            filename=filename,
477            fd=fd,
478        )
479        if filename:
480            lineno = 9
481        elif fd is not None:
482            lineno = 11
483        else:
484            lineno = 14
485        expected = [
486            'Stack (most recent call first):',
487            '  File "<string>", line %s in funcB' % lineno,
488            '  File "<string>", line 17 in funcA',
489            '  File "<string>", line 19 in <module>'
490        ]
491        trace, exitcode = self.get_output(code, filename, fd)
492        self.assertEqual(trace, expected)
493        self.assertEqual(exitcode, 0)
494
495    def test_dump_traceback(self):
496        self.check_dump_traceback()
497
498    def test_dump_traceback_file(self):
499        with temporary_filename() as filename:
500            self.check_dump_traceback(filename=filename)
501
502    @unittest.skipIf(sys.platform == "win32",
503                     "subprocess doesn't support pass_fds on Windows")
504    def test_dump_traceback_fd(self):
505        with tempfile.TemporaryFile('wb+') as fp:
506            self.check_dump_traceback(fd=fp.fileno())
507
508    def test_truncate(self):
509        maxlen = 500
510        func_name = 'x' * (maxlen + 50)
511        truncated = 'x' * maxlen + '...'
512        code = """
513            import faulthandler
514
515            def {func_name}():
516                faulthandler.dump_traceback(all_threads=False)
517
518            {func_name}()
519            """
520        code = code.format(
521            func_name=func_name,
522        )
523        expected = [
524            'Stack (most recent call first):',
525            '  File "<string>", line 4 in %s' % truncated,
526            '  File "<string>", line 6 in <module>'
527        ]
528        trace, exitcode = self.get_output(code)
529        self.assertEqual(trace, expected)
530        self.assertEqual(exitcode, 0)
531
532    def check_dump_traceback_threads(self, filename):
533        """
534        Call explicitly dump_traceback(all_threads=True) and check the output.
535        Raise an error if the output doesn't match the expected format.
536        """
537        code = """
538            import faulthandler
539            from threading import Thread, Event
540            import time
541
542            def dump():
543                if {filename}:
544                    with open({filename}, "wb") as fp:
545                        faulthandler.dump_traceback(fp, all_threads=True)
546                else:
547                    faulthandler.dump_traceback(all_threads=True)
548
549            class Waiter(Thread):
550                # avoid blocking if the main thread raises an exception.
551                daemon = True
552
553                def __init__(self):
554                    Thread.__init__(self)
555                    self.running = Event()
556                    self.stop = Event()
557
558                def run(self):
559                    self.running.set()
560                    self.stop.wait()
561
562            waiter = Waiter()
563            waiter.start()
564            waiter.running.wait()
565            dump()
566            waiter.stop.set()
567            waiter.join()
568            """
569        code = code.format(filename=repr(filename))
570        output, exitcode = self.get_output(code, filename)
571        output = '\n'.join(output)
572        if filename:
573            lineno = 8
574        else:
575            lineno = 10
576        regex = r"""
577            ^Thread 0x[0-9a-f]+ \(most recent call first\):
578            (?:  File ".*threading.py", line [0-9]+ in [_a-z]+
579            ){{1,3}}  File "<string>", line 23 in run
580              File ".*threading.py", line [0-9]+ in _bootstrap_inner
581              File ".*threading.py", line [0-9]+ in _bootstrap
582
583            Current thread 0x[0-9a-f]+ \(most recent call first\):
584              File "<string>", line {lineno} in dump
585              File "<string>", line 28 in <module>$
586            """
587        regex = dedent(regex.format(lineno=lineno)).strip()
588        self.assertRegex(output, regex)
589        self.assertEqual(exitcode, 0)
590
591    def test_dump_traceback_threads(self):
592        self.check_dump_traceback_threads(None)
593
594    def test_dump_traceback_threads_file(self):
595        with temporary_filename() as filename:
596            self.check_dump_traceback_threads(filename)
597
598    def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1,
599                                   *, filename=None, fd=None):
600        """
601        Check how many times the traceback is written in timeout x 2.5 seconds,
602        or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
603        on repeat and cancel options.
604
605        Raise an error if the output doesn't match the expect format.
606        """
607        timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
608        code = """
609            import faulthandler
610            import time
611            import sys
612
613            timeout = {timeout}
614            repeat = {repeat}
615            cancel = {cancel}
616            loops = {loops}
617            filename = {filename!r}
618            fd = {fd}
619
620            def func(timeout, repeat, cancel, file, loops):
621                for loop in range(loops):
622                    faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
623                    if cancel:
624                        faulthandler.cancel_dump_traceback_later()
625                    time.sleep(timeout * 5)
626                    faulthandler.cancel_dump_traceback_later()
627
628            if filename:
629                file = open(filename, "wb")
630            elif fd is not None:
631                file = sys.stderr.fileno()
632            else:
633                file = None
634            func(timeout, repeat, cancel, file, loops)
635            if filename:
636                file.close()
637            """
638        code = code.format(
639            timeout=TIMEOUT,
640            repeat=repeat,
641            cancel=cancel,
642            loops=loops,
643            filename=filename,
644            fd=fd,
645        )
646        trace, exitcode = self.get_output(code, filename)
647        trace = '\n'.join(trace)
648
649        if not cancel:
650            count = loops
651            if repeat:
652                count *= 2
653            header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str
654            regex = expected_traceback(17, 26, header, min_count=count)
655            self.assertRegex(trace, regex)
656        else:
657            self.assertEqual(trace, '')
658        self.assertEqual(exitcode, 0)
659
660    def test_dump_traceback_later(self):
661        self.check_dump_traceback_later()
662
663    def test_dump_traceback_later_repeat(self):
664        self.check_dump_traceback_later(repeat=True)
665
666    def test_dump_traceback_later_cancel(self):
667        self.check_dump_traceback_later(cancel=True)
668
669    def test_dump_traceback_later_file(self):
670        with temporary_filename() as filename:
671            self.check_dump_traceback_later(filename=filename)
672
673    @unittest.skipIf(sys.platform == "win32",
674                     "subprocess doesn't support pass_fds on Windows")
675    def test_dump_traceback_later_fd(self):
676        with tempfile.TemporaryFile('wb+') as fp:
677            self.check_dump_traceback_later(fd=fp.fileno())
678
679    def test_dump_traceback_later_twice(self):
680        self.check_dump_traceback_later(loops=2)
681
682    @unittest.skipIf(not hasattr(faulthandler, "register"),
683                     "need faulthandler.register")
684    def check_register(self, filename=False, all_threads=False,
685                       unregister=False, chain=False, fd=None):
686        """
687        Register a handler displaying the traceback on a user signal. Raise the
688        signal and check the written traceback.
689
690        If chain is True, check that the previous signal handler is called.
691
692        Raise an error if the output doesn't match the expected format.
693        """
694        signum = signal.SIGUSR1
695        code = """
696            import faulthandler
697            import os
698            import signal
699            import sys
700
701            all_threads = {all_threads}
702            signum = {signum:d}
703            unregister = {unregister}
704            chain = {chain}
705            filename = {filename!r}
706            fd = {fd}
707
708            def func(signum):
709                os.kill(os.getpid(), signum)
710
711            def handler(signum, frame):
712                handler.called = True
713            handler.called = False
714
715            if filename:
716                file = open(filename, "wb")
717            elif fd is not None:
718                file = sys.stderr.fileno()
719            else:
720                file = None
721            if chain:
722                signal.signal(signum, handler)
723            faulthandler.register(signum, file=file,
724                                  all_threads=all_threads, chain={chain})
725            if unregister:
726                faulthandler.unregister(signum)
727            func(signum)
728            if chain and not handler.called:
729                if file is not None:
730                    output = file
731                else:
732                    output = sys.stderr
733                print("Error: signal handler not called!", file=output)
734                exitcode = 1
735            else:
736                exitcode = 0
737            if filename:
738                file.close()
739            sys.exit(exitcode)
740            """
741        code = code.format(
742            all_threads=all_threads,
743            signum=signum,
744            unregister=unregister,
745            chain=chain,
746            filename=filename,
747            fd=fd,
748        )
749        trace, exitcode = self.get_output(code, filename)
750        trace = '\n'.join(trace)
751        if not unregister:
752            if all_threads:
753                regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n'
754            else:
755                regex = r'Stack \(most recent call first\):\n'
756            regex = expected_traceback(14, 32, regex)
757            self.assertRegex(trace, regex)
758        else:
759            self.assertEqual(trace, '')
760        if unregister:
761            self.assertNotEqual(exitcode, 0)
762        else:
763            self.assertEqual(exitcode, 0)
764
765    def test_register(self):
766        self.check_register()
767
768    def test_unregister(self):
769        self.check_register(unregister=True)
770
771    def test_register_file(self):
772        with temporary_filename() as filename:
773            self.check_register(filename=filename)
774
775    @unittest.skipIf(sys.platform == "win32",
776                     "subprocess doesn't support pass_fds on Windows")
777    def test_register_fd(self):
778        with tempfile.TemporaryFile('wb+') as fp:
779            self.check_register(fd=fp.fileno())
780
781    def test_register_threads(self):
782        self.check_register(all_threads=True)
783
784    def test_register_chain(self):
785        self.check_register(chain=True)
786
787    @contextmanager
788    def check_stderr_none(self):
789        stderr = sys.stderr
790        try:
791            sys.stderr = None
792            with self.assertRaises(RuntimeError) as cm:
793                yield
794            self.assertEqual(str(cm.exception), "sys.stderr is None")
795        finally:
796            sys.stderr = stderr
797
798    def test_stderr_None(self):
799        # Issue #21497: provide a helpful error if sys.stderr is None,
800        # instead of just an attribute error: "None has no attribute fileno".
801        with self.check_stderr_none():
802            faulthandler.enable()
803        with self.check_stderr_none():
804            faulthandler.dump_traceback()
805        with self.check_stderr_none():
806            faulthandler.dump_traceback_later(1e-3)
807        if hasattr(faulthandler, "register"):
808            with self.check_stderr_none():
809                faulthandler.register(signal.SIGUSR1)
810
811    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
812    def test_raise_exception(self):
813        for exc, name in (
814            ('EXCEPTION_ACCESS_VIOLATION', 'access violation'),
815            ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'),
816            ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'),
817        ):
818            self.check_windows_exception(f"""
819                import faulthandler
820                faulthandler.enable()
821                faulthandler._raise_exception(faulthandler._{exc})
822                """,
823                3,
824                name)
825
826    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
827    def test_ignore_exception(self):
828        for exc_code in (
829            0xE06D7363,   # MSC exception ("Emsc")
830            0xE0434352,   # COM Callable Runtime exception ("ECCR")
831        ):
832            code = f"""
833                    import faulthandler
834                    faulthandler.enable()
835                    faulthandler._raise_exception({exc_code})
836                    """
837            code = dedent(code)
838            output, exitcode = self.get_output(code)
839            self.assertEqual(output, [])
840            self.assertEqual(exitcode, exc_code)
841
842    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
843    def test_raise_nonfatal_exception(self):
844        # These exceptions are not strictly errors. Letting
845        # faulthandler display the traceback when they are
846        # raised is likely to result in noise. However, they
847        # may still terminate the process if there is no
848        # handler installed for them (which there typically
849        # is, e.g. for debug messages).
850        for exc in (
851            0x00000000,
852            0x34567890,
853            0x40000000,
854            0x40001000,
855            0x70000000,
856            0x7FFFFFFF,
857        ):
858            output, exitcode = self.get_output(f"""
859                import faulthandler
860                faulthandler.enable()
861                faulthandler._raise_exception(0x{exc:x})
862                """
863            )
864            self.assertEqual(output, [])
865            # On Windows older than 7 SP1, the actual exception code has
866            # bit 29 cleared.
867            self.assertIn(exitcode,
868                          (exc, exc & ~0x10000000))
869
870    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
871    def test_disable_windows_exc_handler(self):
872        code = dedent("""
873            import faulthandler
874            faulthandler.enable()
875            faulthandler.disable()
876            code = faulthandler._EXCEPTION_ACCESS_VIOLATION
877            faulthandler._raise_exception(code)
878        """)
879        output, exitcode = self.get_output(code)
880        self.assertEqual(output, [])
881        self.assertEqual(exitcode, 0xC0000005)
882
883    def test_cancel_later_without_dump_traceback_later(self):
884        # bpo-37933: Calling cancel_dump_traceback_later()
885        # without dump_traceback_later() must not segfault.
886        code = dedent("""
887            import faulthandler
888            faulthandler.cancel_dump_traceback_later()
889        """)
890        output, exitcode = self.get_output(code)
891        self.assertEqual(output, [])
892        self.assertEqual(exitcode, 0)
893
894
895if __name__ == "__main__":
896    unittest.main()
897