1import enum
2import errno
3import inspect
4import os
5import random
6import signal
7import socket
8import statistics
9import subprocess
10import sys
11import threading
12import time
13import unittest
14from test import support
15from test.support import os_helper
16from test.support.script_helper import assert_python_ok, spawn_python
17from test.support import threading_helper
18try:
19    import _testcapi
20except ImportError:
21    _testcapi = None
22
23
24class GenericTests(unittest.TestCase):
25
26    def test_enums(self):
27        for name in dir(signal):
28            sig = getattr(signal, name)
29            if name in {'SIG_DFL', 'SIG_IGN'}:
30                self.assertIsInstance(sig, signal.Handlers)
31            elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
32                self.assertIsInstance(sig, signal.Sigmasks)
33            elif name.startswith('SIG') and not name.startswith('SIG_'):
34                self.assertIsInstance(sig, signal.Signals)
35            elif name.startswith('CTRL_'):
36                self.assertIsInstance(sig, signal.Signals)
37                self.assertEqual(sys.platform, "win32")
38
39        CheckedSignals = enum._old_convert_(
40                enum.IntEnum, 'Signals', 'signal',
41                lambda name:
42                    name.isupper()
43                    and (name.startswith('SIG') and not name.startswith('SIG_'))
44                    or name.startswith('CTRL_'),
45                source=signal,
46                )
47        enum._test_simple_enum(CheckedSignals, signal.Signals)
48
49        CheckedHandlers = enum._old_convert_(
50                enum.IntEnum, 'Handlers', 'signal',
51                lambda name: name in ('SIG_DFL', 'SIG_IGN'),
52                source=signal,
53                )
54        enum._test_simple_enum(CheckedHandlers, signal.Handlers)
55
56        Sigmasks = getattr(signal, 'Sigmasks', None)
57        if Sigmasks is not None:
58            CheckedSigmasks = enum._old_convert_(
59                    enum.IntEnum, 'Sigmasks', 'signal',
60                    lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'),
61                    source=signal,
62                    )
63            enum._test_simple_enum(CheckedSigmasks, Sigmasks)
64
65    def test_functions_module_attr(self):
66        # Issue #27718: If __all__ is not defined all non-builtin functions
67        # should have correct __module__ to be displayed by pydoc.
68        for name in dir(signal):
69            value = getattr(signal, name)
70            if inspect.isroutine(value) and not inspect.isbuiltin(value):
71                self.assertEqual(value.__module__, 'signal')
72
73
74@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
75class PosixTests(unittest.TestCase):
76    def trivial_signal_handler(self, *args):
77        pass
78
79    def test_out_of_range_signal_number_raises_error(self):
80        self.assertRaises(ValueError, signal.getsignal, 4242)
81
82        self.assertRaises(ValueError, signal.signal, 4242,
83                          self.trivial_signal_handler)
84
85        self.assertRaises(ValueError, signal.strsignal, 4242)
86
87    def test_setting_signal_handler_to_none_raises_error(self):
88        self.assertRaises(TypeError, signal.signal,
89                          signal.SIGUSR1, None)
90
91    def test_getsignal(self):
92        hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
93        self.assertIsInstance(hup, signal.Handlers)
94        self.assertEqual(signal.getsignal(signal.SIGHUP),
95                         self.trivial_signal_handler)
96        signal.signal(signal.SIGHUP, hup)
97        self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
98
99    def test_strsignal(self):
100        self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
101        self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
102        self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
103
104    # Issue 3864, unknown if this affects earlier versions of freebsd also
105    def test_interprocess_signal(self):
106        dirname = os.path.dirname(__file__)
107        script = os.path.join(dirname, 'signalinterproctester.py')
108        assert_python_ok(script)
109
110    @unittest.skipUnless(
111        hasattr(signal, "valid_signals"),
112        "requires signal.valid_signals"
113    )
114    def test_valid_signals(self):
115        s = signal.valid_signals()
116        self.assertIsInstance(s, set)
117        self.assertIn(signal.Signals.SIGINT, s)
118        self.assertIn(signal.Signals.SIGALRM, s)
119        self.assertNotIn(0, s)
120        self.assertNotIn(signal.NSIG, s)
121        self.assertLess(len(s), signal.NSIG)
122
123        # gh-91145: Make sure that all SIGxxx constants exposed by the Python
124        # signal module have a number in the [0; signal.NSIG-1] range.
125        for name in dir(signal):
126            if not name.startswith("SIG"):
127                continue
128            if name in {"SIG_IGN", "SIG_DFL"}:
129                # SIG_IGN and SIG_DFL are pointers
130                continue
131            with self.subTest(name=name):
132                signum = getattr(signal, name)
133                self.assertGreaterEqual(signum, 0)
134                self.assertLess(signum, signal.NSIG)
135
136    @unittest.skipUnless(sys.executable, "sys.executable required.")
137    @support.requires_subprocess()
138    def test_keyboard_interrupt_exit_code(self):
139        """KeyboardInterrupt triggers exit via SIGINT."""
140        process = subprocess.run(
141                [sys.executable, "-c",
142                 "import os, signal, time\n"
143                 "os.kill(os.getpid(), signal.SIGINT)\n"
144                 "for _ in range(999): time.sleep(0.01)"],
145                stderr=subprocess.PIPE)
146        self.assertIn(b"KeyboardInterrupt", process.stderr)
147        self.assertEqual(process.returncode, -signal.SIGINT)
148        # Caveat: The exit code is insufficient to guarantee we actually died
149        # via a signal.  POSIX shells do more than look at the 8 bit value.
150        # Writing an automation friendly test of an interactive shell
151        # to confirm that our process died via a SIGINT proved too complex.
152
153
154@unittest.skipUnless(sys.platform == "win32", "Windows specific")
155class WindowsSignalTests(unittest.TestCase):
156
157    def test_valid_signals(self):
158        s = signal.valid_signals()
159        self.assertIsInstance(s, set)
160        self.assertGreaterEqual(len(s), 6)
161        self.assertIn(signal.Signals.SIGINT, s)
162        self.assertNotIn(0, s)
163        self.assertNotIn(signal.NSIG, s)
164        self.assertLess(len(s), signal.NSIG)
165
166    def test_issue9324(self):
167        # Updated for issue #10003, adding SIGBREAK
168        handler = lambda x, y: None
169        checked = set()
170        for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
171                    signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
172                    signal.SIGTERM):
173            # Set and then reset a handler for signals that work on windows.
174            # Issue #18396, only for signals without a C-level handler.
175            if signal.getsignal(sig) is not None:
176                signal.signal(sig, signal.signal(sig, handler))
177                checked.add(sig)
178        # Issue #18396: Ensure the above loop at least tested *something*
179        self.assertTrue(checked)
180
181        with self.assertRaises(ValueError):
182            signal.signal(-1, handler)
183
184        with self.assertRaises(ValueError):
185            signal.signal(7, handler)
186
187    @unittest.skipUnless(sys.executable, "sys.executable required.")
188    @support.requires_subprocess()
189    def test_keyboard_interrupt_exit_code(self):
190        """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
191        # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
192        # as that requires setting up a console control handler in a child
193        # in its own process group.  Doable, but quite complicated.  (see
194        # @eryksun on https://github.com/python/cpython/pull/11862)
195        process = subprocess.run(
196                [sys.executable, "-c", "raise KeyboardInterrupt"],
197                stderr=subprocess.PIPE)
198        self.assertIn(b"KeyboardInterrupt", process.stderr)
199        STATUS_CONTROL_C_EXIT = 0xC000013A
200        self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
201
202
203class WakeupFDTests(unittest.TestCase):
204
205    def test_invalid_call(self):
206        # First parameter is positional-only
207        with self.assertRaises(TypeError):
208            signal.set_wakeup_fd(signum=signal.SIGINT)
209
210        # warn_on_full_buffer is a keyword-only parameter
211        with self.assertRaises(TypeError):
212            signal.set_wakeup_fd(signal.SIGINT, False)
213
214    def test_invalid_fd(self):
215        fd = os_helper.make_bad_fd()
216        self.assertRaises((ValueError, OSError),
217                          signal.set_wakeup_fd, fd)
218
219    @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
220    def test_invalid_socket(self):
221        sock = socket.socket()
222        fd = sock.fileno()
223        sock.close()
224        self.assertRaises((ValueError, OSError),
225                          signal.set_wakeup_fd, fd)
226
227    # Emscripten does not support fstat on pipes yet.
228    # https://github.com/emscripten-core/emscripten/issues/16414
229    @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
230    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
231    def test_set_wakeup_fd_result(self):
232        r1, w1 = os.pipe()
233        self.addCleanup(os.close, r1)
234        self.addCleanup(os.close, w1)
235        r2, w2 = os.pipe()
236        self.addCleanup(os.close, r2)
237        self.addCleanup(os.close, w2)
238
239        if hasattr(os, 'set_blocking'):
240            os.set_blocking(w1, False)
241            os.set_blocking(w2, False)
242
243        signal.set_wakeup_fd(w1)
244        self.assertEqual(signal.set_wakeup_fd(w2), w1)
245        self.assertEqual(signal.set_wakeup_fd(-1), w2)
246        self.assertEqual(signal.set_wakeup_fd(-1), -1)
247
248    @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
249    @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
250    def test_set_wakeup_fd_socket_result(self):
251        sock1 = socket.socket()
252        self.addCleanup(sock1.close)
253        sock1.setblocking(False)
254        fd1 = sock1.fileno()
255
256        sock2 = socket.socket()
257        self.addCleanup(sock2.close)
258        sock2.setblocking(False)
259        fd2 = sock2.fileno()
260
261        signal.set_wakeup_fd(fd1)
262        self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
263        self.assertEqual(signal.set_wakeup_fd(-1), fd2)
264        self.assertEqual(signal.set_wakeup_fd(-1), -1)
265
266    # On Windows, files are always blocking and Windows does not provide a
267    # function to test if a socket is in non-blocking mode.
268    @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
269    @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
270    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
271    def test_set_wakeup_fd_blocking(self):
272        rfd, wfd = os.pipe()
273        self.addCleanup(os.close, rfd)
274        self.addCleanup(os.close, wfd)
275
276        # fd must be non-blocking
277        os.set_blocking(wfd, True)
278        with self.assertRaises(ValueError) as cm:
279            signal.set_wakeup_fd(wfd)
280        self.assertEqual(str(cm.exception),
281                         "the fd %s must be in non-blocking mode" % wfd)
282
283        # non-blocking is ok
284        os.set_blocking(wfd, False)
285        signal.set_wakeup_fd(wfd)
286        signal.set_wakeup_fd(-1)
287
288
289@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
290class WakeupSignalTests(unittest.TestCase):
291    @unittest.skipIf(_testcapi is None, 'need _testcapi')
292    def check_wakeup(self, test_body, *signals, ordered=True):
293        # use a subprocess to have only one thread
294        code = """if 1:
295        import _testcapi
296        import os
297        import signal
298        import struct
299
300        signals = {!r}
301
302        def handler(signum, frame):
303            pass
304
305        def check_signum(signals):
306            data = os.read(read, len(signals)+1)
307            raised = struct.unpack('%uB' % len(data), data)
308            if not {!r}:
309                raised = set(raised)
310                signals = set(signals)
311            if raised != signals:
312                raise Exception("%r != %r" % (raised, signals))
313
314        {}
315
316        signal.signal(signal.SIGALRM, handler)
317        read, write = os.pipe()
318        os.set_blocking(write, False)
319        signal.set_wakeup_fd(write)
320
321        test()
322        check_signum(signals)
323
324        os.close(read)
325        os.close(write)
326        """.format(tuple(map(int, signals)), ordered, test_body)
327
328        assert_python_ok('-c', code)
329
330    @unittest.skipIf(_testcapi is None, 'need _testcapi')
331    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
332    def test_wakeup_write_error(self):
333        # Issue #16105: write() errors in the C signal handler should not
334        # pass silently.
335        # Use a subprocess to have only one thread.
336        code = """if 1:
337        import _testcapi
338        import errno
339        import os
340        import signal
341        import sys
342        from test.support import captured_stderr
343
344        def handler(signum, frame):
345            1/0
346
347        signal.signal(signal.SIGALRM, handler)
348        r, w = os.pipe()
349        os.set_blocking(r, False)
350
351        # Set wakeup_fd a read-only file descriptor to trigger the error
352        signal.set_wakeup_fd(r)
353        try:
354            with captured_stderr() as err:
355                signal.raise_signal(signal.SIGALRM)
356        except ZeroDivisionError:
357            # An ignored exception should have been printed out on stderr
358            err = err.getvalue()
359            if ('Exception ignored when trying to write to the signal wakeup fd'
360                not in err):
361                raise AssertionError(err)
362            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
363                raise AssertionError(err)
364        else:
365            raise AssertionError("ZeroDivisionError not raised")
366
367        os.close(r)
368        os.close(w)
369        """
370        r, w = os.pipe()
371        try:
372            os.write(r, b'x')
373        except OSError:
374            pass
375        else:
376            self.skipTest("OS doesn't report write() error on the read end of a pipe")
377        finally:
378            os.close(r)
379            os.close(w)
380
381        assert_python_ok('-c', code)
382
383    def test_wakeup_fd_early(self):
384        self.check_wakeup("""def test():
385            import select
386            import time
387
388            TIMEOUT_FULL = 10
389            TIMEOUT_HALF = 5
390
391            class InterruptSelect(Exception):
392                pass
393
394            def handler(signum, frame):
395                raise InterruptSelect
396            signal.signal(signal.SIGALRM, handler)
397
398            signal.alarm(1)
399
400            # We attempt to get a signal during the sleep,
401            # before select is called
402            try:
403                select.select([], [], [], TIMEOUT_FULL)
404            except InterruptSelect:
405                pass
406            else:
407                raise Exception("select() was not interrupted")
408
409            before_time = time.monotonic()
410            select.select([read], [], [], TIMEOUT_FULL)
411            after_time = time.monotonic()
412            dt = after_time - before_time
413            if dt >= TIMEOUT_HALF:
414                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
415        """, signal.SIGALRM)
416
417    def test_wakeup_fd_during(self):
418        self.check_wakeup("""def test():
419            import select
420            import time
421
422            TIMEOUT_FULL = 10
423            TIMEOUT_HALF = 5
424
425            class InterruptSelect(Exception):
426                pass
427
428            def handler(signum, frame):
429                raise InterruptSelect
430            signal.signal(signal.SIGALRM, handler)
431
432            signal.alarm(1)
433            before_time = time.monotonic()
434            # We attempt to get a signal during the select call
435            try:
436                select.select([read], [], [], TIMEOUT_FULL)
437            except InterruptSelect:
438                pass
439            else:
440                raise Exception("select() was not interrupted")
441            after_time = time.monotonic()
442            dt = after_time - before_time
443            if dt >= TIMEOUT_HALF:
444                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
445        """, signal.SIGALRM)
446
447    def test_signum(self):
448        self.check_wakeup("""def test():
449            signal.signal(signal.SIGUSR1, handler)
450            signal.raise_signal(signal.SIGUSR1)
451            signal.raise_signal(signal.SIGALRM)
452        """, signal.SIGUSR1, signal.SIGALRM)
453
454    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
455                         'need signal.pthread_sigmask()')
456    def test_pending(self):
457        self.check_wakeup("""def test():
458            signum1 = signal.SIGUSR1
459            signum2 = signal.SIGUSR2
460
461            signal.signal(signum1, handler)
462            signal.signal(signum2, handler)
463
464            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
465            signal.raise_signal(signum1)
466            signal.raise_signal(signum2)
467            # Unblocking the 2 signals calls the C signal handler twice
468            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
469        """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False)
470
471
472@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
473class WakeupSocketSignalTests(unittest.TestCase):
474
475    @unittest.skipIf(_testcapi is None, 'need _testcapi')
476    def test_socket(self):
477        # use a subprocess to have only one thread
478        code = """if 1:
479        import signal
480        import socket
481        import struct
482        import _testcapi
483
484        signum = signal.SIGINT
485        signals = (signum,)
486
487        def handler(signum, frame):
488            pass
489
490        signal.signal(signum, handler)
491
492        read, write = socket.socketpair()
493        write.setblocking(False)
494        signal.set_wakeup_fd(write.fileno())
495
496        signal.raise_signal(signum)
497
498        data = read.recv(1)
499        if not data:
500            raise Exception("no signum written")
501        raised = struct.unpack('B', data)
502        if raised != signals:
503            raise Exception("%r != %r" % (raised, signals))
504
505        read.close()
506        write.close()
507        """
508
509        assert_python_ok('-c', code)
510
511    @unittest.skipIf(_testcapi is None, 'need _testcapi')
512    def test_send_error(self):
513        # Use a subprocess to have only one thread.
514        if os.name == 'nt':
515            action = 'send'
516        else:
517            action = 'write'
518        code = """if 1:
519        import errno
520        import signal
521        import socket
522        import sys
523        import time
524        import _testcapi
525        from test.support import captured_stderr
526
527        signum = signal.SIGINT
528
529        def handler(signum, frame):
530            pass
531
532        signal.signal(signum, handler)
533
534        read, write = socket.socketpair()
535        read.setblocking(False)
536        write.setblocking(False)
537
538        signal.set_wakeup_fd(write.fileno())
539
540        # Close sockets: send() will fail
541        read.close()
542        write.close()
543
544        with captured_stderr() as err:
545            signal.raise_signal(signum)
546
547        err = err.getvalue()
548        if ('Exception ignored when trying to {action} to the signal wakeup fd'
549            not in err):
550            raise AssertionError(err)
551        """.format(action=action)
552        assert_python_ok('-c', code)
553
554    @unittest.skipIf(_testcapi is None, 'need _testcapi')
555    def test_warn_on_full_buffer(self):
556        # Use a subprocess to have only one thread.
557        if os.name == 'nt':
558            action = 'send'
559        else:
560            action = 'write'
561        code = """if 1:
562        import errno
563        import signal
564        import socket
565        import sys
566        import time
567        import _testcapi
568        from test.support import captured_stderr
569
570        signum = signal.SIGINT
571
572        # This handler will be called, but we intentionally won't read from
573        # the wakeup fd.
574        def handler(signum, frame):
575            pass
576
577        signal.signal(signum, handler)
578
579        read, write = socket.socketpair()
580
581        # Fill the socketpair buffer
582        if sys.platform == 'win32':
583            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
584            # the full socketpair buffer, so use a timeout of 50 ms instead.
585            write.settimeout(0.050)
586        else:
587            write.setblocking(False)
588
589        written = 0
590        if sys.platform == "vxworks":
591            CHUNK_SIZES = (1,)
592        else:
593            # Start with large chunk size to reduce the
594            # number of send needed to fill the buffer.
595            CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
596        for chunk_size in CHUNK_SIZES:
597            chunk = b"x" * chunk_size
598            try:
599                while True:
600                    write.send(chunk)
601                    written += chunk_size
602            except (BlockingIOError, TimeoutError):
603                pass
604
605        print(f"%s bytes written into the socketpair" % written, flush=True)
606
607        write.setblocking(False)
608        try:
609            write.send(b"x")
610        except BlockingIOError:
611            # The socketpair buffer seems full
612            pass
613        else:
614            raise AssertionError("%s bytes failed to fill the socketpair "
615                                 "buffer" % written)
616
617        # By default, we get a warning when a signal arrives
618        msg = ('Exception ignored when trying to {action} '
619               'to the signal wakeup fd')
620        signal.set_wakeup_fd(write.fileno())
621
622        with captured_stderr() as err:
623            signal.raise_signal(signum)
624
625        err = err.getvalue()
626        if msg not in err:
627            raise AssertionError("first set_wakeup_fd() test failed, "
628                                 "stderr: %r" % err)
629
630        # And also if warn_on_full_buffer=True
631        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
632
633        with captured_stderr() as err:
634            signal.raise_signal(signum)
635
636        err = err.getvalue()
637        if msg not in err:
638            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
639                                 "test failed, stderr: %r" % err)
640
641        # But not if warn_on_full_buffer=False
642        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
643
644        with captured_stderr() as err:
645            signal.raise_signal(signum)
646
647        err = err.getvalue()
648        if err != "":
649            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
650                                 "test failed, stderr: %r" % err)
651
652        # And then check the default again, to make sure warn_on_full_buffer
653        # settings don't leak across calls.
654        signal.set_wakeup_fd(write.fileno())
655
656        with captured_stderr() as err:
657            signal.raise_signal(signum)
658
659        err = err.getvalue()
660        if msg not in err:
661            raise AssertionError("second set_wakeup_fd() test failed, "
662                                 "stderr: %r" % err)
663
664        """.format(action=action)
665        assert_python_ok('-c', code)
666
667
668@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
669@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
670@support.requires_subprocess()
671@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
672class SiginterruptTest(unittest.TestCase):
673
674    def readpipe_interrupted(self, interrupt):
675        """Perform a read during which a signal will arrive.  Return True if the
676        read is interrupted by the signal and raises an exception.  Return False
677        if it returns normally.
678        """
679        # use a subprocess to have only one thread, to have a timeout on the
680        # blocking read and to not touch signal handling in this process
681        code = """if 1:
682            import errno
683            import os
684            import signal
685            import sys
686
687            interrupt = %r
688            r, w = os.pipe()
689
690            def handler(signum, frame):
691                1 / 0
692
693            signal.signal(signal.SIGALRM, handler)
694            if interrupt is not None:
695                signal.siginterrupt(signal.SIGALRM, interrupt)
696
697            print("ready")
698            sys.stdout.flush()
699
700            # run the test twice
701            try:
702                for loop in range(2):
703                    # send a SIGALRM in a second (during the read)
704                    signal.alarm(1)
705                    try:
706                        # blocking call: read from a pipe without data
707                        os.read(r, 1)
708                    except ZeroDivisionError:
709                        pass
710                    else:
711                        sys.exit(2)
712                sys.exit(3)
713            finally:
714                os.close(r)
715                os.close(w)
716        """ % (interrupt,)
717        with spawn_python('-c', code) as process:
718            try:
719                # wait until the child process is loaded and has started
720                first_line = process.stdout.readline()
721
722                stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT)
723            except subprocess.TimeoutExpired:
724                process.kill()
725                return False
726            else:
727                stdout = first_line + stdout
728                exitcode = process.wait()
729                if exitcode not in (2, 3):
730                    raise Exception("Child error (exit code %s): %r"
731                                    % (exitcode, stdout))
732                return (exitcode == 3)
733
734    def test_without_siginterrupt(self):
735        # If a signal handler is installed and siginterrupt is not called
736        # at all, when that signal arrives, it interrupts a syscall that's in
737        # progress.
738        interrupted = self.readpipe_interrupted(None)
739        self.assertTrue(interrupted)
740
741    def test_siginterrupt_on(self):
742        # If a signal handler is installed and siginterrupt is called with
743        # a true value for the second argument, when that signal arrives, it
744        # interrupts a syscall that's in progress.
745        interrupted = self.readpipe_interrupted(True)
746        self.assertTrue(interrupted)
747
748    def test_siginterrupt_off(self):
749        # If a signal handler is installed and siginterrupt is called with
750        # a false value for the second argument, when that signal arrives, it
751        # does not interrupt a syscall that's in progress.
752        interrupted = self.readpipe_interrupted(False)
753        self.assertFalse(interrupted)
754
755
756@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
757@unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'),
758                         "needs signal.getitimer() and signal.setitimer()")
759class ItimerTest(unittest.TestCase):
760    def setUp(self):
761        self.hndl_called = False
762        self.hndl_count = 0
763        self.itimer = None
764        self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
765
766    def tearDown(self):
767        signal.signal(signal.SIGALRM, self.old_alarm)
768        if self.itimer is not None: # test_itimer_exc doesn't change this attr
769            # just ensure that itimer is stopped
770            signal.setitimer(self.itimer, 0)
771
772    def sig_alrm(self, *args):
773        self.hndl_called = True
774
775    def sig_vtalrm(self, *args):
776        self.hndl_called = True
777
778        if self.hndl_count > 3:
779            # it shouldn't be here, because it should have been disabled.
780            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
781                "timer.")
782        elif self.hndl_count == 3:
783            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
784            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
785
786        self.hndl_count += 1
787
788    def sig_prof(self, *args):
789        self.hndl_called = True
790        signal.setitimer(signal.ITIMER_PROF, 0)
791
792    def test_itimer_exc(self):
793        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
794        # defines it ?
795        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
796        # Negative times are treated as zero on some platforms.
797        if 0:
798            self.assertRaises(signal.ItimerError,
799                              signal.setitimer, signal.ITIMER_REAL, -1)
800
801    def test_itimer_real(self):
802        self.itimer = signal.ITIMER_REAL
803        signal.setitimer(self.itimer, 1.0)
804        signal.pause()
805        self.assertEqual(self.hndl_called, True)
806
807    # Issue 3864, unknown if this affects earlier versions of freebsd also
808    @unittest.skipIf(sys.platform in ('netbsd5',),
809        'itimer not reliable (does not mix well with threading) on some BSDs.')
810    def test_itimer_virtual(self):
811        self.itimer = signal.ITIMER_VIRTUAL
812        signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
813        signal.setitimer(self.itimer, 0.3, 0.2)
814
815        start_time = time.monotonic()
816        while time.monotonic() - start_time < 60.0:
817            # use up some virtual time by doing real work
818            _ = pow(12345, 67890, 10000019)
819            if signal.getitimer(self.itimer) == (0.0, 0.0):
820                break # sig_vtalrm handler stopped this itimer
821        else: # Issue 8424
822            self.skipTest("timeout: likely cause: machine too slow or load too "
823                          "high")
824
825        # virtual itimer should be (0.0, 0.0) now
826        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
827        # and the handler should have been called
828        self.assertEqual(self.hndl_called, True)
829
830    def test_itimer_prof(self):
831        self.itimer = signal.ITIMER_PROF
832        signal.signal(signal.SIGPROF, self.sig_prof)
833        signal.setitimer(self.itimer, 0.2, 0.2)
834
835        start_time = time.monotonic()
836        while time.monotonic() - start_time < 60.0:
837            # do some work
838            _ = pow(12345, 67890, 10000019)
839            if signal.getitimer(self.itimer) == (0.0, 0.0):
840                break # sig_prof handler stopped this itimer
841        else: # Issue 8424
842            self.skipTest("timeout: likely cause: machine too slow or load too "
843                          "high")
844
845        # profiling itimer should be (0.0, 0.0) now
846        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
847        # and the handler should have been called
848        self.assertEqual(self.hndl_called, True)
849
850    def test_setitimer_tiny(self):
851        # bpo-30807: C setitimer() takes a microsecond-resolution interval.
852        # Check that float -> timeval conversion doesn't round
853        # the interval down to zero, which would disable the timer.
854        self.itimer = signal.ITIMER_REAL
855        signal.setitimer(self.itimer, 1e-6)
856        time.sleep(1)
857        self.assertEqual(self.hndl_called, True)
858
859
860class PendingSignalsTests(unittest.TestCase):
861    """
862    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
863    functions.
864    """
865    @unittest.skipUnless(hasattr(signal, 'sigpending'),
866                         'need signal.sigpending()')
867    def test_sigpending_empty(self):
868        self.assertEqual(signal.sigpending(), set())
869
870    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
871                         'need signal.pthread_sigmask()')
872    @unittest.skipUnless(hasattr(signal, 'sigpending'),
873                         'need signal.sigpending()')
874    def test_sigpending(self):
875        code = """if 1:
876            import os
877            import signal
878
879            def handler(signum, frame):
880                1/0
881
882            signum = signal.SIGUSR1
883            signal.signal(signum, handler)
884
885            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
886            os.kill(os.getpid(), signum)
887            pending = signal.sigpending()
888            for sig in pending:
889                assert isinstance(sig, signal.Signals), repr(pending)
890            if pending != {signum}:
891                raise Exception('%s != {%s}' % (pending, signum))
892            try:
893                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
894            except ZeroDivisionError:
895                pass
896            else:
897                raise Exception("ZeroDivisionError not raised")
898        """
899        assert_python_ok('-c', code)
900
901    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
902                         'need signal.pthread_kill()')
903    @threading_helper.requires_working_threading()
904    def test_pthread_kill(self):
905        code = """if 1:
906            import signal
907            import threading
908            import sys
909
910            signum = signal.SIGUSR1
911
912            def handler(signum, frame):
913                1/0
914
915            signal.signal(signum, handler)
916
917            tid = threading.get_ident()
918            try:
919                signal.pthread_kill(tid, signum)
920            except ZeroDivisionError:
921                pass
922            else:
923                raise Exception("ZeroDivisionError not raised")
924        """
925        assert_python_ok('-c', code)
926
927    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
928                         'need signal.pthread_sigmask()')
929    def wait_helper(self, blocked, test):
930        """
931        test: body of the "def test(signum):" function.
932        blocked: number of the blocked signal
933        """
934        code = '''if 1:
935        import signal
936        import sys
937        from signal import Signals
938
939        def handler(signum, frame):
940            1/0
941
942        %s
943
944        blocked = %s
945        signum = signal.SIGALRM
946
947        # child: block and wait the signal
948        try:
949            signal.signal(signum, handler)
950            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
951
952            # Do the tests
953            test(signum)
954
955            # The handler must not be called on unblock
956            try:
957                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
958            except ZeroDivisionError:
959                print("the signal handler has been called",
960                      file=sys.stderr)
961                sys.exit(1)
962        except BaseException as err:
963            print("error: {}".format(err), file=sys.stderr)
964            sys.stderr.flush()
965            sys.exit(1)
966        ''' % (test.strip(), blocked)
967
968        # sig*wait* must be called with the signal blocked: since the current
969        # process might have several threads running, use a subprocess to have
970        # a single thread.
971        assert_python_ok('-c', code)
972
973    @unittest.skipUnless(hasattr(signal, 'sigwait'),
974                         'need signal.sigwait()')
975    def test_sigwait(self):
976        self.wait_helper(signal.SIGALRM, '''
977        def test(signum):
978            signal.alarm(1)
979            received = signal.sigwait([signum])
980            assert isinstance(received, signal.Signals), received
981            if received != signum:
982                raise Exception('received %s, not %s' % (received, signum))
983        ''')
984
985    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
986                         'need signal.sigwaitinfo()')
987    def test_sigwaitinfo(self):
988        self.wait_helper(signal.SIGALRM, '''
989        def test(signum):
990            signal.alarm(1)
991            info = signal.sigwaitinfo([signum])
992            if info.si_signo != signum:
993                raise Exception("info.si_signo != %s" % signum)
994        ''')
995
996    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
997                         'need signal.sigtimedwait()')
998    def test_sigtimedwait(self):
999        self.wait_helper(signal.SIGALRM, '''
1000        def test(signum):
1001            signal.alarm(1)
1002            info = signal.sigtimedwait([signum], 10.1000)
1003            if info.si_signo != signum:
1004                raise Exception('info.si_signo != %s' % signum)
1005        ''')
1006
1007    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
1008                         'need signal.sigtimedwait()')
1009    def test_sigtimedwait_poll(self):
1010        # check that polling with sigtimedwait works
1011        self.wait_helper(signal.SIGALRM, '''
1012        def test(signum):
1013            import os
1014            os.kill(os.getpid(), signum)
1015            info = signal.sigtimedwait([signum], 0)
1016            if info.si_signo != signum:
1017                raise Exception('info.si_signo != %s' % signum)
1018        ''')
1019
1020    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
1021                         'need signal.sigtimedwait()')
1022    def test_sigtimedwait_timeout(self):
1023        self.wait_helper(signal.SIGALRM, '''
1024        def test(signum):
1025            received = signal.sigtimedwait([signum], 1.0)
1026            if received is not None:
1027                raise Exception("received=%r" % (received,))
1028        ''')
1029
1030    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
1031                         'need signal.sigtimedwait()')
1032    def test_sigtimedwait_negative_timeout(self):
1033        signum = signal.SIGALRM
1034        self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
1035
1036    @unittest.skipUnless(hasattr(signal, 'sigwait'),
1037                         'need signal.sigwait()')
1038    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1039                         'need signal.pthread_sigmask()')
1040    @threading_helper.requires_working_threading()
1041    def test_sigwait_thread(self):
1042        # Check that calling sigwait() from a thread doesn't suspend the whole
1043        # process. A new interpreter is spawned to avoid problems when mixing
1044        # threads and fork(): only async-safe functions are allowed between
1045        # fork() and exec().
1046        assert_python_ok("-c", """if True:
1047            import os, threading, sys, time, signal
1048
1049            # the default handler terminates the process
1050            signum = signal.SIGUSR1
1051
1052            def kill_later():
1053                # wait until the main thread is waiting in sigwait()
1054                time.sleep(1)
1055                os.kill(os.getpid(), signum)
1056
1057            # the signal must be blocked by all the threads
1058            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
1059            killer = threading.Thread(target=kill_later)
1060            killer.start()
1061            received = signal.sigwait([signum])
1062            if received != signum:
1063                print("sigwait() received %s, not %s" % (received, signum),
1064                      file=sys.stderr)
1065                sys.exit(1)
1066            killer.join()
1067            # unblock the signal, which should have been cleared by sigwait()
1068            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1069        """)
1070
1071    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1072                         'need signal.pthread_sigmask()')
1073    def test_pthread_sigmask_arguments(self):
1074        self.assertRaises(TypeError, signal.pthread_sigmask)
1075        self.assertRaises(TypeError, signal.pthread_sigmask, 1)
1076        self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
1077        self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
1078        with self.assertRaises(ValueError):
1079            signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
1080        with self.assertRaises(ValueError):
1081            signal.pthread_sigmask(signal.SIG_BLOCK, [0])
1082        with self.assertRaises(ValueError):
1083            signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
1084
1085    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1086                         'need signal.pthread_sigmask()')
1087    def test_pthread_sigmask_valid_signals(self):
1088        s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
1089        self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
1090        # Get current blocked set
1091        s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
1092        self.assertLessEqual(s, signal.valid_signals())
1093
1094    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1095                         'need signal.pthread_sigmask()')
1096    @threading_helper.requires_working_threading()
1097    def test_pthread_sigmask(self):
1098        code = """if 1:
1099        import signal
1100        import os; import threading
1101
1102        def handler(signum, frame):
1103            1/0
1104
1105        def kill(signum):
1106            os.kill(os.getpid(), signum)
1107
1108        def check_mask(mask):
1109            for sig in mask:
1110                assert isinstance(sig, signal.Signals), repr(sig)
1111
1112        def read_sigmask():
1113            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
1114            check_mask(sigmask)
1115            return sigmask
1116
1117        signum = signal.SIGUSR1
1118
1119        # Install our signal handler
1120        old_handler = signal.signal(signum, handler)
1121
1122        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
1123        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1124        check_mask(old_mask)
1125        try:
1126            kill(signum)
1127        except ZeroDivisionError:
1128            pass
1129        else:
1130            raise Exception("ZeroDivisionError not raised")
1131
1132        # Block and then raise SIGUSR1. The signal is blocked: the signal
1133        # handler is not called, and the signal is now pending
1134        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
1135        check_mask(mask)
1136        kill(signum)
1137
1138        # Check the new mask
1139        blocked = read_sigmask()
1140        check_mask(blocked)
1141        if signum not in blocked:
1142            raise Exception("%s not in %s" % (signum, blocked))
1143        if old_mask ^ blocked != {signum}:
1144            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
1145
1146        # Unblock SIGUSR1
1147        try:
1148            # unblock the pending signal calls immediately the signal handler
1149            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1150        except ZeroDivisionError:
1151            pass
1152        else:
1153            raise Exception("ZeroDivisionError not raised")
1154        try:
1155            kill(signum)
1156        except ZeroDivisionError:
1157            pass
1158        else:
1159            raise Exception("ZeroDivisionError not raised")
1160
1161        # Check the new mask
1162        unblocked = read_sigmask()
1163        if signum in unblocked:
1164            raise Exception("%s in %s" % (signum, unblocked))
1165        if blocked ^ unblocked != {signum}:
1166            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
1167        if old_mask != unblocked:
1168            raise Exception("%s != %s" % (old_mask, unblocked))
1169        """
1170        assert_python_ok('-c', code)
1171
1172    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
1173                         'need signal.pthread_kill()')
1174    @threading_helper.requires_working_threading()
1175    def test_pthread_kill_main_thread(self):
1176        # Test that a signal can be sent to the main thread with pthread_kill()
1177        # before any other thread has been created (see issue #12392).
1178        code = """if True:
1179            import threading
1180            import signal
1181            import sys
1182
1183            def handler(signum, frame):
1184                sys.exit(3)
1185
1186            signal.signal(signal.SIGUSR1, handler)
1187            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
1188            sys.exit(2)
1189        """
1190
1191        with spawn_python('-c', code) as process:
1192            stdout, stderr = process.communicate()
1193            exitcode = process.wait()
1194            if exitcode != 3:
1195                raise Exception("Child error (exit code %s): %s" %
1196                                (exitcode, stdout))
1197
1198
1199class StressTest(unittest.TestCase):
1200    """
1201    Stress signal delivery, especially when a signal arrives in
1202    the middle of recomputing the signal state or executing
1203    previously tripped signal handlers.
1204    """
1205
1206    def setsig(self, signum, handler):
1207        old_handler = signal.signal(signum, handler)
1208        self.addCleanup(signal.signal, signum, old_handler)
1209
1210    def measure_itimer_resolution(self):
1211        N = 20
1212        times = []
1213
1214        def handler(signum=None, frame=None):
1215            if len(times) < N:
1216                times.append(time.perf_counter())
1217                # 1 µs is the smallest possible timer interval,
1218                # we want to measure what the concrete duration
1219                # will be on this platform
1220                signal.setitimer(signal.ITIMER_REAL, 1e-6)
1221
1222        self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
1223        self.setsig(signal.SIGALRM, handler)
1224        handler()
1225        while len(times) < N:
1226            time.sleep(1e-3)
1227
1228        durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
1229        med = statistics.median(durations)
1230        if support.verbose:
1231            print("detected median itimer() resolution: %.6f s." % (med,))
1232        return med
1233
1234    def decide_itimer_count(self):
1235        # Some systems have poor setitimer() resolution (for example
1236        # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
1237        # number of sequential timers based on that.
1238        reso = self.measure_itimer_resolution()
1239        if reso <= 1e-4:
1240            return 10000
1241        elif reso <= 1e-2:
1242            return 100
1243        else:
1244            self.skipTest("detected itimer resolution (%.3f s.) too high "
1245                          "(> 10 ms.) on this platform (or system too busy)"
1246                          % (reso,))
1247
1248    @unittest.skipUnless(hasattr(signal, "setitimer"),
1249                         "test needs setitimer()")
1250    def test_stress_delivery_dependent(self):
1251        """
1252        This test uses dependent signal handlers.
1253        """
1254        N = self.decide_itimer_count()
1255        sigs = []
1256
1257        def first_handler(signum, frame):
1258            # 1e-6 is the minimum non-zero value for `setitimer()`.
1259            # Choose a random delay so as to improve chances of
1260            # triggering a race condition.  Ideally the signal is received
1261            # when inside critical signal-handling routines such as
1262            # Py_MakePendingCalls().
1263            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1264
1265        def second_handler(signum=None, frame=None):
1266            sigs.append(signum)
1267
1268        # Here on Linux, SIGPROF > SIGALRM > SIGUSR1.  By using both
1269        # ascending and descending sequences (SIGUSR1 then SIGALRM,
1270        # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
1271        self.setsig(signal.SIGPROF, first_handler)
1272        self.setsig(signal.SIGUSR1, first_handler)
1273        self.setsig(signal.SIGALRM, second_handler)  # for ITIMER_REAL
1274
1275        expected_sigs = 0
1276        deadline = time.monotonic() + support.SHORT_TIMEOUT
1277
1278        while expected_sigs < N:
1279            os.kill(os.getpid(), signal.SIGPROF)
1280            expected_sigs += 1
1281            # Wait for handlers to run to avoid signal coalescing
1282            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1283                time.sleep(1e-5)
1284
1285            os.kill(os.getpid(), signal.SIGUSR1)
1286            expected_sigs += 1
1287            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1288                time.sleep(1e-5)
1289
1290        # All ITIMER_REAL signals should have been delivered to the
1291        # Python handler
1292        self.assertEqual(len(sigs), N, "Some signals were lost")
1293
1294    @unittest.skipUnless(hasattr(signal, "setitimer"),
1295                         "test needs setitimer()")
1296    def test_stress_delivery_simultaneous(self):
1297        """
1298        This test uses simultaneous signal handlers.
1299        """
1300        N = self.decide_itimer_count()
1301        sigs = []
1302
1303        def handler(signum, frame):
1304            sigs.append(signum)
1305
1306        self.setsig(signal.SIGUSR1, handler)
1307        self.setsig(signal.SIGALRM, handler)  # for ITIMER_REAL
1308
1309        expected_sigs = 0
1310        deadline = time.monotonic() + support.SHORT_TIMEOUT
1311
1312        while expected_sigs < N:
1313            # Hopefully the SIGALRM will be received somewhere during
1314            # initial processing of SIGUSR1.
1315            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1316            os.kill(os.getpid(), signal.SIGUSR1)
1317
1318            expected_sigs += 2
1319            # Wait for handlers to run to avoid signal coalescing
1320            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1321                time.sleep(1e-5)
1322
1323        # All ITIMER_REAL signals should have been delivered to the
1324        # Python handler
1325        self.assertEqual(len(sigs), N, "Some signals were lost")
1326
1327    @unittest.skipUnless(hasattr(signal, "SIGUSR1"),
1328                         "test needs SIGUSR1")
1329    @threading_helper.requires_working_threading()
1330    def test_stress_modifying_handlers(self):
1331        # bpo-43406: race condition between trip_signal() and signal.signal
1332        signum = signal.SIGUSR1
1333        num_sent_signals = 0
1334        num_received_signals = 0
1335        do_stop = False
1336
1337        def custom_handler(signum, frame):
1338            nonlocal num_received_signals
1339            num_received_signals += 1
1340
1341        def set_interrupts():
1342            nonlocal num_sent_signals
1343            while not do_stop:
1344                signal.raise_signal(signum)
1345                num_sent_signals += 1
1346
1347        def cycle_handlers():
1348            while num_sent_signals < 100:
1349                for i in range(20000):
1350                    # Cycle between a Python-defined and a non-Python handler
1351                    for handler in [custom_handler, signal.SIG_IGN]:
1352                        signal.signal(signum, handler)
1353
1354        old_handler = signal.signal(signum, custom_handler)
1355        self.addCleanup(signal.signal, signum, old_handler)
1356
1357        t = threading.Thread(target=set_interrupts)
1358        try:
1359            ignored = False
1360            with support.catch_unraisable_exception() as cm:
1361                t.start()
1362                cycle_handlers()
1363                do_stop = True
1364                t.join()
1365
1366                if cm.unraisable is not None:
1367                    # An unraisable exception may be printed out when
1368                    # a signal is ignored due to the aforementioned
1369                    # race condition, check it.
1370                    self.assertIsInstance(cm.unraisable.exc_value, OSError)
1371                    self.assertIn(
1372                        f"Signal {signum:d} ignored due to race condition",
1373                        str(cm.unraisable.exc_value))
1374                    ignored = True
1375
1376            # bpo-43406: Even if it is unlikely, it's technically possible that
1377            # all signals were ignored because of race conditions.
1378            if not ignored:
1379                # Sanity check that some signals were received, but not all
1380                self.assertGreater(num_received_signals, 0)
1381            self.assertLess(num_received_signals, num_sent_signals)
1382        finally:
1383            do_stop = True
1384            t.join()
1385
1386
1387class RaiseSignalTest(unittest.TestCase):
1388
1389    def test_sigint(self):
1390        with self.assertRaises(KeyboardInterrupt):
1391            signal.raise_signal(signal.SIGINT)
1392
1393    @unittest.skipIf(sys.platform != "win32", "Windows specific test")
1394    def test_invalid_argument(self):
1395        try:
1396            SIGHUP = 1 # not supported on win32
1397            signal.raise_signal(SIGHUP)
1398            self.fail("OSError (Invalid argument) expected")
1399        except OSError as e:
1400            if e.errno == errno.EINVAL:
1401                pass
1402            else:
1403                raise
1404
1405    def test_handler(self):
1406        is_ok = False
1407        def handler(a, b):
1408            nonlocal is_ok
1409            is_ok = True
1410        old_signal = signal.signal(signal.SIGINT, handler)
1411        self.addCleanup(signal.signal, signal.SIGINT, old_signal)
1412
1413        signal.raise_signal(signal.SIGINT)
1414        self.assertTrue(is_ok)
1415
1416    def test__thread_interrupt_main(self):
1417        # See https://github.com/python/cpython/issues/102397
1418        code = """if 1:
1419        import _thread
1420        class Foo():
1421            def __del__(self):
1422                _thread.interrupt_main()
1423
1424        x = Foo()
1425        """
1426
1427        rc, out, err = assert_python_ok('-c', code)
1428        self.assertIn(b'OSError: Signal 2 ignored due to race condition', err)
1429
1430
1431
1432class PidfdSignalTest(unittest.TestCase):
1433
1434    @unittest.skipUnless(
1435        hasattr(signal, "pidfd_send_signal"),
1436        "pidfd support not built in",
1437    )
1438    def test_pidfd_send_signal(self):
1439        with self.assertRaises(OSError) as cm:
1440            signal.pidfd_send_signal(0, signal.SIGINT)
1441        if cm.exception.errno == errno.ENOSYS:
1442            self.skipTest("kernel does not support pidfds")
1443        elif cm.exception.errno == errno.EPERM:
1444            self.skipTest("Not enough privileges to use pidfs")
1445        self.assertEqual(cm.exception.errno, errno.EBADF)
1446        my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY)
1447        self.addCleanup(os.close, my_pidfd)
1448        with self.assertRaisesRegex(TypeError, "^siginfo must be None$"):
1449            signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0)
1450        with self.assertRaises(KeyboardInterrupt):
1451            signal.pidfd_send_signal(my_pidfd, signal.SIGINT)
1452
1453def tearDownModule():
1454    support.reap_children()
1455
1456if __name__ == "__main__":
1457    unittest.main()
1458