1import enum 2import errno 3import inspect 4import os 5import random 6import signal 7import socket 8import statistics 9import subprocess 10import sys 11import threading 12import time 13import unittest 14from test import support 15from test.support import os_helper 16from test.support.script_helper import assert_python_ok, spawn_python 17from test.support import threading_helper 18try: 19 import _testcapi 20except ImportError: 21 _testcapi = None 22 23 24class GenericTests(unittest.TestCase): 25 26 def test_enums(self): 27 for name in dir(signal): 28 sig = getattr(signal, name) 29 if name in {'SIG_DFL', 'SIG_IGN'}: 30 self.assertIsInstance(sig, signal.Handlers) 31 elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: 32 self.assertIsInstance(sig, signal.Sigmasks) 33 elif name.startswith('SIG') and not name.startswith('SIG_'): 34 self.assertIsInstance(sig, signal.Signals) 35 elif name.startswith('CTRL_'): 36 self.assertIsInstance(sig, signal.Signals) 37 self.assertEqual(sys.platform, "win32") 38 39 CheckedSignals = enum._old_convert_( 40 enum.IntEnum, 'Signals', 'signal', 41 lambda name: 42 name.isupper() 43 and (name.startswith('SIG') and not name.startswith('SIG_')) 44 or name.startswith('CTRL_'), 45 source=signal, 46 ) 47 enum._test_simple_enum(CheckedSignals, signal.Signals) 48 49 CheckedHandlers = enum._old_convert_( 50 enum.IntEnum, 'Handlers', 'signal', 51 lambda name: name in ('SIG_DFL', 'SIG_IGN'), 52 source=signal, 53 ) 54 enum._test_simple_enum(CheckedHandlers, signal.Handlers) 55 56 Sigmasks = getattr(signal, 'Sigmasks', None) 57 if Sigmasks is not None: 58 CheckedSigmasks = enum._old_convert_( 59 enum.IntEnum, 'Sigmasks', 'signal', 60 lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'), 61 source=signal, 62 ) 63 enum._test_simple_enum(CheckedSigmasks, Sigmasks) 64 65 def test_functions_module_attr(self): 66 # Issue #27718: If __all__ is not defined all non-builtin functions 67 # should have correct __module__ to be displayed by pydoc. 68 for name in dir(signal): 69 value = getattr(signal, name) 70 if inspect.isroutine(value) and not inspect.isbuiltin(value): 71 self.assertEqual(value.__module__, 'signal') 72 73 74@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 75class PosixTests(unittest.TestCase): 76 def trivial_signal_handler(self, *args): 77 pass 78 79 def test_out_of_range_signal_number_raises_error(self): 80 self.assertRaises(ValueError, signal.getsignal, 4242) 81 82 self.assertRaises(ValueError, signal.signal, 4242, 83 self.trivial_signal_handler) 84 85 self.assertRaises(ValueError, signal.strsignal, 4242) 86 87 def test_setting_signal_handler_to_none_raises_error(self): 88 self.assertRaises(TypeError, signal.signal, 89 signal.SIGUSR1, None) 90 91 def test_getsignal(self): 92 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 93 self.assertIsInstance(hup, signal.Handlers) 94 self.assertEqual(signal.getsignal(signal.SIGHUP), 95 self.trivial_signal_handler) 96 signal.signal(signal.SIGHUP, hup) 97 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 98 99 def test_strsignal(self): 100 self.assertIn("Interrupt", signal.strsignal(signal.SIGINT)) 101 self.assertIn("Terminated", signal.strsignal(signal.SIGTERM)) 102 self.assertIn("Hangup", signal.strsignal(signal.SIGHUP)) 103 104 # Issue 3864, unknown if this affects earlier versions of freebsd also 105 def test_interprocess_signal(self): 106 dirname = os.path.dirname(__file__) 107 script = os.path.join(dirname, 'signalinterproctester.py') 108 assert_python_ok(script) 109 110 @unittest.skipUnless( 111 hasattr(signal, "valid_signals"), 112 "requires signal.valid_signals" 113 ) 114 def test_valid_signals(self): 115 s = signal.valid_signals() 116 self.assertIsInstance(s, set) 117 self.assertIn(signal.Signals.SIGINT, s) 118 self.assertIn(signal.Signals.SIGALRM, s) 119 self.assertNotIn(0, s) 120 self.assertNotIn(signal.NSIG, s) 121 self.assertLess(len(s), signal.NSIG) 122 123 # gh-91145: Make sure that all SIGxxx constants exposed by the Python 124 # signal module have a number in the [0; signal.NSIG-1] range. 125 for name in dir(signal): 126 if not name.startswith("SIG"): 127 continue 128 if name in {"SIG_IGN", "SIG_DFL"}: 129 # SIG_IGN and SIG_DFL are pointers 130 continue 131 with self.subTest(name=name): 132 signum = getattr(signal, name) 133 self.assertGreaterEqual(signum, 0) 134 self.assertLess(signum, signal.NSIG) 135 136 @unittest.skipUnless(sys.executable, "sys.executable required.") 137 @support.requires_subprocess() 138 def test_keyboard_interrupt_exit_code(self): 139 """KeyboardInterrupt triggers exit via SIGINT.""" 140 process = subprocess.run( 141 [sys.executable, "-c", 142 "import os, signal, time\n" 143 "os.kill(os.getpid(), signal.SIGINT)\n" 144 "for _ in range(999): time.sleep(0.01)"], 145 stderr=subprocess.PIPE) 146 self.assertIn(b"KeyboardInterrupt", process.stderr) 147 self.assertEqual(process.returncode, -signal.SIGINT) 148 # Caveat: The exit code is insufficient to guarantee we actually died 149 # via a signal. POSIX shells do more than look at the 8 bit value. 150 # Writing an automation friendly test of an interactive shell 151 # to confirm that our process died via a SIGINT proved too complex. 152 153 154@unittest.skipUnless(sys.platform == "win32", "Windows specific") 155class WindowsSignalTests(unittest.TestCase): 156 157 def test_valid_signals(self): 158 s = signal.valid_signals() 159 self.assertIsInstance(s, set) 160 self.assertGreaterEqual(len(s), 6) 161 self.assertIn(signal.Signals.SIGINT, s) 162 self.assertNotIn(0, s) 163 self.assertNotIn(signal.NSIG, s) 164 self.assertLess(len(s), signal.NSIG) 165 166 def test_issue9324(self): 167 # Updated for issue #10003, adding SIGBREAK 168 handler = lambda x, y: None 169 checked = set() 170 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 171 signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 172 signal.SIGTERM): 173 # Set and then reset a handler for signals that work on windows. 174 # Issue #18396, only for signals without a C-level handler. 175 if signal.getsignal(sig) is not None: 176 signal.signal(sig, signal.signal(sig, handler)) 177 checked.add(sig) 178 # Issue #18396: Ensure the above loop at least tested *something* 179 self.assertTrue(checked) 180 181 with self.assertRaises(ValueError): 182 signal.signal(-1, handler) 183 184 with self.assertRaises(ValueError): 185 signal.signal(7, handler) 186 187 @unittest.skipUnless(sys.executable, "sys.executable required.") 188 @support.requires_subprocess() 189 def test_keyboard_interrupt_exit_code(self): 190 """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.""" 191 # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here 192 # as that requires setting up a console control handler in a child 193 # in its own process group. Doable, but quite complicated. (see 194 # @eryksun on https://github.com/python/cpython/pull/11862) 195 process = subprocess.run( 196 [sys.executable, "-c", "raise KeyboardInterrupt"], 197 stderr=subprocess.PIPE) 198 self.assertIn(b"KeyboardInterrupt", process.stderr) 199 STATUS_CONTROL_C_EXIT = 0xC000013A 200 self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT) 201 202 203class WakeupFDTests(unittest.TestCase): 204 205 def test_invalid_call(self): 206 # First parameter is positional-only 207 with self.assertRaises(TypeError): 208 signal.set_wakeup_fd(signum=signal.SIGINT) 209 210 # warn_on_full_buffer is a keyword-only parameter 211 with self.assertRaises(TypeError): 212 signal.set_wakeup_fd(signal.SIGINT, False) 213 214 def test_invalid_fd(self): 215 fd = os_helper.make_bad_fd() 216 self.assertRaises((ValueError, OSError), 217 signal.set_wakeup_fd, fd) 218 219 @unittest.skipUnless(support.has_socket_support, "needs working sockets.") 220 def test_invalid_socket(self): 221 sock = socket.socket() 222 fd = sock.fileno() 223 sock.close() 224 self.assertRaises((ValueError, OSError), 225 signal.set_wakeup_fd, fd) 226 227 # Emscripten does not support fstat on pipes yet. 228 # https://github.com/emscripten-core/emscripten/issues/16414 229 @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.") 230 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 231 def test_set_wakeup_fd_result(self): 232 r1, w1 = os.pipe() 233 self.addCleanup(os.close, r1) 234 self.addCleanup(os.close, w1) 235 r2, w2 = os.pipe() 236 self.addCleanup(os.close, r2) 237 self.addCleanup(os.close, w2) 238 239 if hasattr(os, 'set_blocking'): 240 os.set_blocking(w1, False) 241 os.set_blocking(w2, False) 242 243 signal.set_wakeup_fd(w1) 244 self.assertEqual(signal.set_wakeup_fd(w2), w1) 245 self.assertEqual(signal.set_wakeup_fd(-1), w2) 246 self.assertEqual(signal.set_wakeup_fd(-1), -1) 247 248 @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.") 249 @unittest.skipUnless(support.has_socket_support, "needs working sockets.") 250 def test_set_wakeup_fd_socket_result(self): 251 sock1 = socket.socket() 252 self.addCleanup(sock1.close) 253 sock1.setblocking(False) 254 fd1 = sock1.fileno() 255 256 sock2 = socket.socket() 257 self.addCleanup(sock2.close) 258 sock2.setblocking(False) 259 fd2 = sock2.fileno() 260 261 signal.set_wakeup_fd(fd1) 262 self.assertEqual(signal.set_wakeup_fd(fd2), fd1) 263 self.assertEqual(signal.set_wakeup_fd(-1), fd2) 264 self.assertEqual(signal.set_wakeup_fd(-1), -1) 265 266 # On Windows, files are always blocking and Windows does not provide a 267 # function to test if a socket is in non-blocking mode. 268 @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX") 269 @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.") 270 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 271 def test_set_wakeup_fd_blocking(self): 272 rfd, wfd = os.pipe() 273 self.addCleanup(os.close, rfd) 274 self.addCleanup(os.close, wfd) 275 276 # fd must be non-blocking 277 os.set_blocking(wfd, True) 278 with self.assertRaises(ValueError) as cm: 279 signal.set_wakeup_fd(wfd) 280 self.assertEqual(str(cm.exception), 281 "the fd %s must be in non-blocking mode" % wfd) 282 283 # non-blocking is ok 284 os.set_blocking(wfd, False) 285 signal.set_wakeup_fd(wfd) 286 signal.set_wakeup_fd(-1) 287 288 289@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 290class WakeupSignalTests(unittest.TestCase): 291 @unittest.skipIf(_testcapi is None, 'need _testcapi') 292 def check_wakeup(self, test_body, *signals, ordered=True): 293 # use a subprocess to have only one thread 294 code = """if 1: 295 import _testcapi 296 import os 297 import signal 298 import struct 299 300 signals = {!r} 301 302 def handler(signum, frame): 303 pass 304 305 def check_signum(signals): 306 data = os.read(read, len(signals)+1) 307 raised = struct.unpack('%uB' % len(data), data) 308 if not {!r}: 309 raised = set(raised) 310 signals = set(signals) 311 if raised != signals: 312 raise Exception("%r != %r" % (raised, signals)) 313 314 {} 315 316 signal.signal(signal.SIGALRM, handler) 317 read, write = os.pipe() 318 os.set_blocking(write, False) 319 signal.set_wakeup_fd(write) 320 321 test() 322 check_signum(signals) 323 324 os.close(read) 325 os.close(write) 326 """.format(tuple(map(int, signals)), ordered, test_body) 327 328 assert_python_ok('-c', code) 329 330 @unittest.skipIf(_testcapi is None, 'need _testcapi') 331 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 332 def test_wakeup_write_error(self): 333 # Issue #16105: write() errors in the C signal handler should not 334 # pass silently. 335 # Use a subprocess to have only one thread. 336 code = """if 1: 337 import _testcapi 338 import errno 339 import os 340 import signal 341 import sys 342 from test.support import captured_stderr 343 344 def handler(signum, frame): 345 1/0 346 347 signal.signal(signal.SIGALRM, handler) 348 r, w = os.pipe() 349 os.set_blocking(r, False) 350 351 # Set wakeup_fd a read-only file descriptor to trigger the error 352 signal.set_wakeup_fd(r) 353 try: 354 with captured_stderr() as err: 355 signal.raise_signal(signal.SIGALRM) 356 except ZeroDivisionError: 357 # An ignored exception should have been printed out on stderr 358 err = err.getvalue() 359 if ('Exception ignored when trying to write to the signal wakeup fd' 360 not in err): 361 raise AssertionError(err) 362 if ('OSError: [Errno %d]' % errno.EBADF) not in err: 363 raise AssertionError(err) 364 else: 365 raise AssertionError("ZeroDivisionError not raised") 366 367 os.close(r) 368 os.close(w) 369 """ 370 r, w = os.pipe() 371 try: 372 os.write(r, b'x') 373 except OSError: 374 pass 375 else: 376 self.skipTest("OS doesn't report write() error on the read end of a pipe") 377 finally: 378 os.close(r) 379 os.close(w) 380 381 assert_python_ok('-c', code) 382 383 def test_wakeup_fd_early(self): 384 self.check_wakeup("""def test(): 385 import select 386 import time 387 388 TIMEOUT_FULL = 10 389 TIMEOUT_HALF = 5 390 391 class InterruptSelect(Exception): 392 pass 393 394 def handler(signum, frame): 395 raise InterruptSelect 396 signal.signal(signal.SIGALRM, handler) 397 398 signal.alarm(1) 399 400 # We attempt to get a signal during the sleep, 401 # before select is called 402 try: 403 select.select([], [], [], TIMEOUT_FULL) 404 except InterruptSelect: 405 pass 406 else: 407 raise Exception("select() was not interrupted") 408 409 before_time = time.monotonic() 410 select.select([read], [], [], TIMEOUT_FULL) 411 after_time = time.monotonic() 412 dt = after_time - before_time 413 if dt >= TIMEOUT_HALF: 414 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 415 """, signal.SIGALRM) 416 417 def test_wakeup_fd_during(self): 418 self.check_wakeup("""def test(): 419 import select 420 import time 421 422 TIMEOUT_FULL = 10 423 TIMEOUT_HALF = 5 424 425 class InterruptSelect(Exception): 426 pass 427 428 def handler(signum, frame): 429 raise InterruptSelect 430 signal.signal(signal.SIGALRM, handler) 431 432 signal.alarm(1) 433 before_time = time.monotonic() 434 # We attempt to get a signal during the select call 435 try: 436 select.select([read], [], [], TIMEOUT_FULL) 437 except InterruptSelect: 438 pass 439 else: 440 raise Exception("select() was not interrupted") 441 after_time = time.monotonic() 442 dt = after_time - before_time 443 if dt >= TIMEOUT_HALF: 444 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 445 """, signal.SIGALRM) 446 447 def test_signum(self): 448 self.check_wakeup("""def test(): 449 signal.signal(signal.SIGUSR1, handler) 450 signal.raise_signal(signal.SIGUSR1) 451 signal.raise_signal(signal.SIGALRM) 452 """, signal.SIGUSR1, signal.SIGALRM) 453 454 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 455 'need signal.pthread_sigmask()') 456 def test_pending(self): 457 self.check_wakeup("""def test(): 458 signum1 = signal.SIGUSR1 459 signum2 = signal.SIGUSR2 460 461 signal.signal(signum1, handler) 462 signal.signal(signum2, handler) 463 464 signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) 465 signal.raise_signal(signum1) 466 signal.raise_signal(signum2) 467 # Unblocking the 2 signals calls the C signal handler twice 468 signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) 469 """, signal.SIGUSR1, signal.SIGUSR2, ordered=False) 470 471 472@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair') 473class WakeupSocketSignalTests(unittest.TestCase): 474 475 @unittest.skipIf(_testcapi is None, 'need _testcapi') 476 def test_socket(self): 477 # use a subprocess to have only one thread 478 code = """if 1: 479 import signal 480 import socket 481 import struct 482 import _testcapi 483 484 signum = signal.SIGINT 485 signals = (signum,) 486 487 def handler(signum, frame): 488 pass 489 490 signal.signal(signum, handler) 491 492 read, write = socket.socketpair() 493 write.setblocking(False) 494 signal.set_wakeup_fd(write.fileno()) 495 496 signal.raise_signal(signum) 497 498 data = read.recv(1) 499 if not data: 500 raise Exception("no signum written") 501 raised = struct.unpack('B', data) 502 if raised != signals: 503 raise Exception("%r != %r" % (raised, signals)) 504 505 read.close() 506 write.close() 507 """ 508 509 assert_python_ok('-c', code) 510 511 @unittest.skipIf(_testcapi is None, 'need _testcapi') 512 def test_send_error(self): 513 # Use a subprocess to have only one thread. 514 if os.name == 'nt': 515 action = 'send' 516 else: 517 action = 'write' 518 code = """if 1: 519 import errno 520 import signal 521 import socket 522 import sys 523 import time 524 import _testcapi 525 from test.support import captured_stderr 526 527 signum = signal.SIGINT 528 529 def handler(signum, frame): 530 pass 531 532 signal.signal(signum, handler) 533 534 read, write = socket.socketpair() 535 read.setblocking(False) 536 write.setblocking(False) 537 538 signal.set_wakeup_fd(write.fileno()) 539 540 # Close sockets: send() will fail 541 read.close() 542 write.close() 543 544 with captured_stderr() as err: 545 signal.raise_signal(signum) 546 547 err = err.getvalue() 548 if ('Exception ignored when trying to {action} to the signal wakeup fd' 549 not in err): 550 raise AssertionError(err) 551 """.format(action=action) 552 assert_python_ok('-c', code) 553 554 @unittest.skipIf(_testcapi is None, 'need _testcapi') 555 def test_warn_on_full_buffer(self): 556 # Use a subprocess to have only one thread. 557 if os.name == 'nt': 558 action = 'send' 559 else: 560 action = 'write' 561 code = """if 1: 562 import errno 563 import signal 564 import socket 565 import sys 566 import time 567 import _testcapi 568 from test.support import captured_stderr 569 570 signum = signal.SIGINT 571 572 # This handler will be called, but we intentionally won't read from 573 # the wakeup fd. 574 def handler(signum, frame): 575 pass 576 577 signal.signal(signum, handler) 578 579 read, write = socket.socketpair() 580 581 # Fill the socketpair buffer 582 if sys.platform == 'win32': 583 # bpo-34130: On Windows, sometimes non-blocking send fails to fill 584 # the full socketpair buffer, so use a timeout of 50 ms instead. 585 write.settimeout(0.050) 586 else: 587 write.setblocking(False) 588 589 written = 0 590 if sys.platform == "vxworks": 591 CHUNK_SIZES = (1,) 592 else: 593 # Start with large chunk size to reduce the 594 # number of send needed to fill the buffer. 595 CHUNK_SIZES = (2 ** 16, 2 ** 8, 1) 596 for chunk_size in CHUNK_SIZES: 597 chunk = b"x" * chunk_size 598 try: 599 while True: 600 write.send(chunk) 601 written += chunk_size 602 except (BlockingIOError, TimeoutError): 603 pass 604 605 print(f"%s bytes written into the socketpair" % written, flush=True) 606 607 write.setblocking(False) 608 try: 609 write.send(b"x") 610 except BlockingIOError: 611 # The socketpair buffer seems full 612 pass 613 else: 614 raise AssertionError("%s bytes failed to fill the socketpair " 615 "buffer" % written) 616 617 # By default, we get a warning when a signal arrives 618 msg = ('Exception ignored when trying to {action} ' 619 'to the signal wakeup fd') 620 signal.set_wakeup_fd(write.fileno()) 621 622 with captured_stderr() as err: 623 signal.raise_signal(signum) 624 625 err = err.getvalue() 626 if msg not in err: 627 raise AssertionError("first set_wakeup_fd() test failed, " 628 "stderr: %r" % err) 629 630 # And also if warn_on_full_buffer=True 631 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True) 632 633 with captured_stderr() as err: 634 signal.raise_signal(signum) 635 636 err = err.getvalue() 637 if msg not in err: 638 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) " 639 "test failed, stderr: %r" % err) 640 641 # But not if warn_on_full_buffer=False 642 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False) 643 644 with captured_stderr() as err: 645 signal.raise_signal(signum) 646 647 err = err.getvalue() 648 if err != "": 649 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) " 650 "test failed, stderr: %r" % err) 651 652 # And then check the default again, to make sure warn_on_full_buffer 653 # settings don't leak across calls. 654 signal.set_wakeup_fd(write.fileno()) 655 656 with captured_stderr() as err: 657 signal.raise_signal(signum) 658 659 err = err.getvalue() 660 if msg not in err: 661 raise AssertionError("second set_wakeup_fd() test failed, " 662 "stderr: %r" % err) 663 664 """.format(action=action) 665 assert_python_ok('-c', code) 666 667 668@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 669@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()") 670@support.requires_subprocess() 671@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 672class SiginterruptTest(unittest.TestCase): 673 674 def readpipe_interrupted(self, interrupt): 675 """Perform a read during which a signal will arrive. Return True if the 676 read is interrupted by the signal and raises an exception. Return False 677 if it returns normally. 678 """ 679 # use a subprocess to have only one thread, to have a timeout on the 680 # blocking read and to not touch signal handling in this process 681 code = """if 1: 682 import errno 683 import os 684 import signal 685 import sys 686 687 interrupt = %r 688 r, w = os.pipe() 689 690 def handler(signum, frame): 691 1 / 0 692 693 signal.signal(signal.SIGALRM, handler) 694 if interrupt is not None: 695 signal.siginterrupt(signal.SIGALRM, interrupt) 696 697 print("ready") 698 sys.stdout.flush() 699 700 # run the test twice 701 try: 702 for loop in range(2): 703 # send a SIGALRM in a second (during the read) 704 signal.alarm(1) 705 try: 706 # blocking call: read from a pipe without data 707 os.read(r, 1) 708 except ZeroDivisionError: 709 pass 710 else: 711 sys.exit(2) 712 sys.exit(3) 713 finally: 714 os.close(r) 715 os.close(w) 716 """ % (interrupt,) 717 with spawn_python('-c', code) as process: 718 try: 719 # wait until the child process is loaded and has started 720 first_line = process.stdout.readline() 721 722 stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT) 723 except subprocess.TimeoutExpired: 724 process.kill() 725 return False 726 else: 727 stdout = first_line + stdout 728 exitcode = process.wait() 729 if exitcode not in (2, 3): 730 raise Exception("Child error (exit code %s): %r" 731 % (exitcode, stdout)) 732 return (exitcode == 3) 733 734 def test_without_siginterrupt(self): 735 # If a signal handler is installed and siginterrupt is not called 736 # at all, when that signal arrives, it interrupts a syscall that's in 737 # progress. 738 interrupted = self.readpipe_interrupted(None) 739 self.assertTrue(interrupted) 740 741 def test_siginterrupt_on(self): 742 # If a signal handler is installed and siginterrupt is called with 743 # a true value for the second argument, when that signal arrives, it 744 # interrupts a syscall that's in progress. 745 interrupted = self.readpipe_interrupted(True) 746 self.assertTrue(interrupted) 747 748 def test_siginterrupt_off(self): 749 # If a signal handler is installed and siginterrupt is called with 750 # a false value for the second argument, when that signal arrives, it 751 # does not interrupt a syscall that's in progress. 752 interrupted = self.readpipe_interrupted(False) 753 self.assertFalse(interrupted) 754 755 756@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 757@unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'), 758 "needs signal.getitimer() and signal.setitimer()") 759class ItimerTest(unittest.TestCase): 760 def setUp(self): 761 self.hndl_called = False 762 self.hndl_count = 0 763 self.itimer = None 764 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) 765 766 def tearDown(self): 767 signal.signal(signal.SIGALRM, self.old_alarm) 768 if self.itimer is not None: # test_itimer_exc doesn't change this attr 769 # just ensure that itimer is stopped 770 signal.setitimer(self.itimer, 0) 771 772 def sig_alrm(self, *args): 773 self.hndl_called = True 774 775 def sig_vtalrm(self, *args): 776 self.hndl_called = True 777 778 if self.hndl_count > 3: 779 # it shouldn't be here, because it should have been disabled. 780 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " 781 "timer.") 782 elif self.hndl_count == 3: 783 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore 784 signal.setitimer(signal.ITIMER_VIRTUAL, 0) 785 786 self.hndl_count += 1 787 788 def sig_prof(self, *args): 789 self.hndl_called = True 790 signal.setitimer(signal.ITIMER_PROF, 0) 791 792 def test_itimer_exc(self): 793 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform 794 # defines it ? 795 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) 796 # Negative times are treated as zero on some platforms. 797 if 0: 798 self.assertRaises(signal.ItimerError, 799 signal.setitimer, signal.ITIMER_REAL, -1) 800 801 def test_itimer_real(self): 802 self.itimer = signal.ITIMER_REAL 803 signal.setitimer(self.itimer, 1.0) 804 signal.pause() 805 self.assertEqual(self.hndl_called, True) 806 807 # Issue 3864, unknown if this affects earlier versions of freebsd also 808 @unittest.skipIf(sys.platform in ('netbsd5',), 809 'itimer not reliable (does not mix well with threading) on some BSDs.') 810 def test_itimer_virtual(self): 811 self.itimer = signal.ITIMER_VIRTUAL 812 signal.signal(signal.SIGVTALRM, self.sig_vtalrm) 813 signal.setitimer(self.itimer, 0.3, 0.2) 814 815 start_time = time.monotonic() 816 while time.monotonic() - start_time < 60.0: 817 # use up some virtual time by doing real work 818 _ = pow(12345, 67890, 10000019) 819 if signal.getitimer(self.itimer) == (0.0, 0.0): 820 break # sig_vtalrm handler stopped this itimer 821 else: # Issue 8424 822 self.skipTest("timeout: likely cause: machine too slow or load too " 823 "high") 824 825 # virtual itimer should be (0.0, 0.0) now 826 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 827 # and the handler should have been called 828 self.assertEqual(self.hndl_called, True) 829 830 def test_itimer_prof(self): 831 self.itimer = signal.ITIMER_PROF 832 signal.signal(signal.SIGPROF, self.sig_prof) 833 signal.setitimer(self.itimer, 0.2, 0.2) 834 835 start_time = time.monotonic() 836 while time.monotonic() - start_time < 60.0: 837 # do some work 838 _ = pow(12345, 67890, 10000019) 839 if signal.getitimer(self.itimer) == (0.0, 0.0): 840 break # sig_prof handler stopped this itimer 841 else: # Issue 8424 842 self.skipTest("timeout: likely cause: machine too slow or load too " 843 "high") 844 845 # profiling itimer should be (0.0, 0.0) now 846 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 847 # and the handler should have been called 848 self.assertEqual(self.hndl_called, True) 849 850 def test_setitimer_tiny(self): 851 # bpo-30807: C setitimer() takes a microsecond-resolution interval. 852 # Check that float -> timeval conversion doesn't round 853 # the interval down to zero, which would disable the timer. 854 self.itimer = signal.ITIMER_REAL 855 signal.setitimer(self.itimer, 1e-6) 856 time.sleep(1) 857 self.assertEqual(self.hndl_called, True) 858 859 860class PendingSignalsTests(unittest.TestCase): 861 """ 862 Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() 863 functions. 864 """ 865 @unittest.skipUnless(hasattr(signal, 'sigpending'), 866 'need signal.sigpending()') 867 def test_sigpending_empty(self): 868 self.assertEqual(signal.sigpending(), set()) 869 870 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 871 'need signal.pthread_sigmask()') 872 @unittest.skipUnless(hasattr(signal, 'sigpending'), 873 'need signal.sigpending()') 874 def test_sigpending(self): 875 code = """if 1: 876 import os 877 import signal 878 879 def handler(signum, frame): 880 1/0 881 882 signum = signal.SIGUSR1 883 signal.signal(signum, handler) 884 885 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 886 os.kill(os.getpid(), signum) 887 pending = signal.sigpending() 888 for sig in pending: 889 assert isinstance(sig, signal.Signals), repr(pending) 890 if pending != {signum}: 891 raise Exception('%s != {%s}' % (pending, signum)) 892 try: 893 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 894 except ZeroDivisionError: 895 pass 896 else: 897 raise Exception("ZeroDivisionError not raised") 898 """ 899 assert_python_ok('-c', code) 900 901 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 902 'need signal.pthread_kill()') 903 @threading_helper.requires_working_threading() 904 def test_pthread_kill(self): 905 code = """if 1: 906 import signal 907 import threading 908 import sys 909 910 signum = signal.SIGUSR1 911 912 def handler(signum, frame): 913 1/0 914 915 signal.signal(signum, handler) 916 917 tid = threading.get_ident() 918 try: 919 signal.pthread_kill(tid, signum) 920 except ZeroDivisionError: 921 pass 922 else: 923 raise Exception("ZeroDivisionError not raised") 924 """ 925 assert_python_ok('-c', code) 926 927 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 928 'need signal.pthread_sigmask()') 929 def wait_helper(self, blocked, test): 930 """ 931 test: body of the "def test(signum):" function. 932 blocked: number of the blocked signal 933 """ 934 code = '''if 1: 935 import signal 936 import sys 937 from signal import Signals 938 939 def handler(signum, frame): 940 1/0 941 942 %s 943 944 blocked = %s 945 signum = signal.SIGALRM 946 947 # child: block and wait the signal 948 try: 949 signal.signal(signum, handler) 950 signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) 951 952 # Do the tests 953 test(signum) 954 955 # The handler must not be called on unblock 956 try: 957 signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) 958 except ZeroDivisionError: 959 print("the signal handler has been called", 960 file=sys.stderr) 961 sys.exit(1) 962 except BaseException as err: 963 print("error: {}".format(err), file=sys.stderr) 964 sys.stderr.flush() 965 sys.exit(1) 966 ''' % (test.strip(), blocked) 967 968 # sig*wait* must be called with the signal blocked: since the current 969 # process might have several threads running, use a subprocess to have 970 # a single thread. 971 assert_python_ok('-c', code) 972 973 @unittest.skipUnless(hasattr(signal, 'sigwait'), 974 'need signal.sigwait()') 975 def test_sigwait(self): 976 self.wait_helper(signal.SIGALRM, ''' 977 def test(signum): 978 signal.alarm(1) 979 received = signal.sigwait([signum]) 980 assert isinstance(received, signal.Signals), received 981 if received != signum: 982 raise Exception('received %s, not %s' % (received, signum)) 983 ''') 984 985 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 986 'need signal.sigwaitinfo()') 987 def test_sigwaitinfo(self): 988 self.wait_helper(signal.SIGALRM, ''' 989 def test(signum): 990 signal.alarm(1) 991 info = signal.sigwaitinfo([signum]) 992 if info.si_signo != signum: 993 raise Exception("info.si_signo != %s" % signum) 994 ''') 995 996 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 997 'need signal.sigtimedwait()') 998 def test_sigtimedwait(self): 999 self.wait_helper(signal.SIGALRM, ''' 1000 def test(signum): 1001 signal.alarm(1) 1002 info = signal.sigtimedwait([signum], 10.1000) 1003 if info.si_signo != signum: 1004 raise Exception('info.si_signo != %s' % signum) 1005 ''') 1006 1007 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 1008 'need signal.sigtimedwait()') 1009 def test_sigtimedwait_poll(self): 1010 # check that polling with sigtimedwait works 1011 self.wait_helper(signal.SIGALRM, ''' 1012 def test(signum): 1013 import os 1014 os.kill(os.getpid(), signum) 1015 info = signal.sigtimedwait([signum], 0) 1016 if info.si_signo != signum: 1017 raise Exception('info.si_signo != %s' % signum) 1018 ''') 1019 1020 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 1021 'need signal.sigtimedwait()') 1022 def test_sigtimedwait_timeout(self): 1023 self.wait_helper(signal.SIGALRM, ''' 1024 def test(signum): 1025 received = signal.sigtimedwait([signum], 1.0) 1026 if received is not None: 1027 raise Exception("received=%r" % (received,)) 1028 ''') 1029 1030 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 1031 'need signal.sigtimedwait()') 1032 def test_sigtimedwait_negative_timeout(self): 1033 signum = signal.SIGALRM 1034 self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0) 1035 1036 @unittest.skipUnless(hasattr(signal, 'sigwait'), 1037 'need signal.sigwait()') 1038 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1039 'need signal.pthread_sigmask()') 1040 @threading_helper.requires_working_threading() 1041 def test_sigwait_thread(self): 1042 # Check that calling sigwait() from a thread doesn't suspend the whole 1043 # process. A new interpreter is spawned to avoid problems when mixing 1044 # threads and fork(): only async-safe functions are allowed between 1045 # fork() and exec(). 1046 assert_python_ok("-c", """if True: 1047 import os, threading, sys, time, signal 1048 1049 # the default handler terminates the process 1050 signum = signal.SIGUSR1 1051 1052 def kill_later(): 1053 # wait until the main thread is waiting in sigwait() 1054 time.sleep(1) 1055 os.kill(os.getpid(), signum) 1056 1057 # the signal must be blocked by all the threads 1058 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 1059 killer = threading.Thread(target=kill_later) 1060 killer.start() 1061 received = signal.sigwait([signum]) 1062 if received != signum: 1063 print("sigwait() received %s, not %s" % (received, signum), 1064 file=sys.stderr) 1065 sys.exit(1) 1066 killer.join() 1067 # unblock the signal, which should have been cleared by sigwait() 1068 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1069 """) 1070 1071 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1072 'need signal.pthread_sigmask()') 1073 def test_pthread_sigmask_arguments(self): 1074 self.assertRaises(TypeError, signal.pthread_sigmask) 1075 self.assertRaises(TypeError, signal.pthread_sigmask, 1) 1076 self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) 1077 self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) 1078 with self.assertRaises(ValueError): 1079 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG]) 1080 with self.assertRaises(ValueError): 1081 signal.pthread_sigmask(signal.SIG_BLOCK, [0]) 1082 with self.assertRaises(ValueError): 1083 signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000]) 1084 1085 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1086 'need signal.pthread_sigmask()') 1087 def test_pthread_sigmask_valid_signals(self): 1088 s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) 1089 self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s) 1090 # Get current blocked set 1091 s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals()) 1092 self.assertLessEqual(s, signal.valid_signals()) 1093 1094 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1095 'need signal.pthread_sigmask()') 1096 @threading_helper.requires_working_threading() 1097 def test_pthread_sigmask(self): 1098 code = """if 1: 1099 import signal 1100 import os; import threading 1101 1102 def handler(signum, frame): 1103 1/0 1104 1105 def kill(signum): 1106 os.kill(os.getpid(), signum) 1107 1108 def check_mask(mask): 1109 for sig in mask: 1110 assert isinstance(sig, signal.Signals), repr(sig) 1111 1112 def read_sigmask(): 1113 sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) 1114 check_mask(sigmask) 1115 return sigmask 1116 1117 signum = signal.SIGUSR1 1118 1119 # Install our signal handler 1120 old_handler = signal.signal(signum, handler) 1121 1122 # Unblock SIGUSR1 (and copy the old mask) to test our signal handler 1123 old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1124 check_mask(old_mask) 1125 try: 1126 kill(signum) 1127 except ZeroDivisionError: 1128 pass 1129 else: 1130 raise Exception("ZeroDivisionError not raised") 1131 1132 # Block and then raise SIGUSR1. The signal is blocked: the signal 1133 # handler is not called, and the signal is now pending 1134 mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 1135 check_mask(mask) 1136 kill(signum) 1137 1138 # Check the new mask 1139 blocked = read_sigmask() 1140 check_mask(blocked) 1141 if signum not in blocked: 1142 raise Exception("%s not in %s" % (signum, blocked)) 1143 if old_mask ^ blocked != {signum}: 1144 raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) 1145 1146 # Unblock SIGUSR1 1147 try: 1148 # unblock the pending signal calls immediately the signal handler 1149 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1150 except ZeroDivisionError: 1151 pass 1152 else: 1153 raise Exception("ZeroDivisionError not raised") 1154 try: 1155 kill(signum) 1156 except ZeroDivisionError: 1157 pass 1158 else: 1159 raise Exception("ZeroDivisionError not raised") 1160 1161 # Check the new mask 1162 unblocked = read_sigmask() 1163 if signum in unblocked: 1164 raise Exception("%s in %s" % (signum, unblocked)) 1165 if blocked ^ unblocked != {signum}: 1166 raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) 1167 if old_mask != unblocked: 1168 raise Exception("%s != %s" % (old_mask, unblocked)) 1169 """ 1170 assert_python_ok('-c', code) 1171 1172 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 1173 'need signal.pthread_kill()') 1174 @threading_helper.requires_working_threading() 1175 def test_pthread_kill_main_thread(self): 1176 # Test that a signal can be sent to the main thread with pthread_kill() 1177 # before any other thread has been created (see issue #12392). 1178 code = """if True: 1179 import threading 1180 import signal 1181 import sys 1182 1183 def handler(signum, frame): 1184 sys.exit(3) 1185 1186 signal.signal(signal.SIGUSR1, handler) 1187 signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) 1188 sys.exit(2) 1189 """ 1190 1191 with spawn_python('-c', code) as process: 1192 stdout, stderr = process.communicate() 1193 exitcode = process.wait() 1194 if exitcode != 3: 1195 raise Exception("Child error (exit code %s): %s" % 1196 (exitcode, stdout)) 1197 1198 1199class StressTest(unittest.TestCase): 1200 """ 1201 Stress signal delivery, especially when a signal arrives in 1202 the middle of recomputing the signal state or executing 1203 previously tripped signal handlers. 1204 """ 1205 1206 def setsig(self, signum, handler): 1207 old_handler = signal.signal(signum, handler) 1208 self.addCleanup(signal.signal, signum, old_handler) 1209 1210 def measure_itimer_resolution(self): 1211 N = 20 1212 times = [] 1213 1214 def handler(signum=None, frame=None): 1215 if len(times) < N: 1216 times.append(time.perf_counter()) 1217 # 1 µs is the smallest possible timer interval, 1218 # we want to measure what the concrete duration 1219 # will be on this platform 1220 signal.setitimer(signal.ITIMER_REAL, 1e-6) 1221 1222 self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0) 1223 self.setsig(signal.SIGALRM, handler) 1224 handler() 1225 while len(times) < N: 1226 time.sleep(1e-3) 1227 1228 durations = [times[i+1] - times[i] for i in range(len(times) - 1)] 1229 med = statistics.median(durations) 1230 if support.verbose: 1231 print("detected median itimer() resolution: %.6f s." % (med,)) 1232 return med 1233 1234 def decide_itimer_count(self): 1235 # Some systems have poor setitimer() resolution (for example 1236 # measured around 20 ms. on FreeBSD 9), so decide on a reasonable 1237 # number of sequential timers based on that. 1238 reso = self.measure_itimer_resolution() 1239 if reso <= 1e-4: 1240 return 10000 1241 elif reso <= 1e-2: 1242 return 100 1243 else: 1244 self.skipTest("detected itimer resolution (%.3f s.) too high " 1245 "(> 10 ms.) on this platform (or system too busy)" 1246 % (reso,)) 1247 1248 @unittest.skipUnless(hasattr(signal, "setitimer"), 1249 "test needs setitimer()") 1250 def test_stress_delivery_dependent(self): 1251 """ 1252 This test uses dependent signal handlers. 1253 """ 1254 N = self.decide_itimer_count() 1255 sigs = [] 1256 1257 def first_handler(signum, frame): 1258 # 1e-6 is the minimum non-zero value for `setitimer()`. 1259 # Choose a random delay so as to improve chances of 1260 # triggering a race condition. Ideally the signal is received 1261 # when inside critical signal-handling routines such as 1262 # Py_MakePendingCalls(). 1263 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 1264 1265 def second_handler(signum=None, frame=None): 1266 sigs.append(signum) 1267 1268 # Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both 1269 # ascending and descending sequences (SIGUSR1 then SIGALRM, 1270 # SIGPROF then SIGALRM), we maximize chances of hitting a bug. 1271 self.setsig(signal.SIGPROF, first_handler) 1272 self.setsig(signal.SIGUSR1, first_handler) 1273 self.setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL 1274 1275 expected_sigs = 0 1276 deadline = time.monotonic() + support.SHORT_TIMEOUT 1277 1278 while expected_sigs < N: 1279 os.kill(os.getpid(), signal.SIGPROF) 1280 expected_sigs += 1 1281 # Wait for handlers to run to avoid signal coalescing 1282 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1283 time.sleep(1e-5) 1284 1285 os.kill(os.getpid(), signal.SIGUSR1) 1286 expected_sigs += 1 1287 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1288 time.sleep(1e-5) 1289 1290 # All ITIMER_REAL signals should have been delivered to the 1291 # Python handler 1292 self.assertEqual(len(sigs), N, "Some signals were lost") 1293 1294 @unittest.skipUnless(hasattr(signal, "setitimer"), 1295 "test needs setitimer()") 1296 def test_stress_delivery_simultaneous(self): 1297 """ 1298 This test uses simultaneous signal handlers. 1299 """ 1300 N = self.decide_itimer_count() 1301 sigs = [] 1302 1303 def handler(signum, frame): 1304 sigs.append(signum) 1305 1306 self.setsig(signal.SIGUSR1, handler) 1307 self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL 1308 1309 expected_sigs = 0 1310 deadline = time.monotonic() + support.SHORT_TIMEOUT 1311 1312 while expected_sigs < N: 1313 # Hopefully the SIGALRM will be received somewhere during 1314 # initial processing of SIGUSR1. 1315 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 1316 os.kill(os.getpid(), signal.SIGUSR1) 1317 1318 expected_sigs += 2 1319 # Wait for handlers to run to avoid signal coalescing 1320 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1321 time.sleep(1e-5) 1322 1323 # All ITIMER_REAL signals should have been delivered to the 1324 # Python handler 1325 self.assertEqual(len(sigs), N, "Some signals were lost") 1326 1327 @unittest.skipUnless(hasattr(signal, "SIGUSR1"), 1328 "test needs SIGUSR1") 1329 @threading_helper.requires_working_threading() 1330 def test_stress_modifying_handlers(self): 1331 # bpo-43406: race condition between trip_signal() and signal.signal 1332 signum = signal.SIGUSR1 1333 num_sent_signals = 0 1334 num_received_signals = 0 1335 do_stop = False 1336 1337 def custom_handler(signum, frame): 1338 nonlocal num_received_signals 1339 num_received_signals += 1 1340 1341 def set_interrupts(): 1342 nonlocal num_sent_signals 1343 while not do_stop: 1344 signal.raise_signal(signum) 1345 num_sent_signals += 1 1346 1347 def cycle_handlers(): 1348 while num_sent_signals < 100: 1349 for i in range(20000): 1350 # Cycle between a Python-defined and a non-Python handler 1351 for handler in [custom_handler, signal.SIG_IGN]: 1352 signal.signal(signum, handler) 1353 1354 old_handler = signal.signal(signum, custom_handler) 1355 self.addCleanup(signal.signal, signum, old_handler) 1356 1357 t = threading.Thread(target=set_interrupts) 1358 try: 1359 ignored = False 1360 with support.catch_unraisable_exception() as cm: 1361 t.start() 1362 cycle_handlers() 1363 do_stop = True 1364 t.join() 1365 1366 if cm.unraisable is not None: 1367 # An unraisable exception may be printed out when 1368 # a signal is ignored due to the aforementioned 1369 # race condition, check it. 1370 self.assertIsInstance(cm.unraisable.exc_value, OSError) 1371 self.assertIn( 1372 f"Signal {signum:d} ignored due to race condition", 1373 str(cm.unraisable.exc_value)) 1374 ignored = True 1375 1376 # bpo-43406: Even if it is unlikely, it's technically possible that 1377 # all signals were ignored because of race conditions. 1378 if not ignored: 1379 # Sanity check that some signals were received, but not all 1380 self.assertGreater(num_received_signals, 0) 1381 self.assertLess(num_received_signals, num_sent_signals) 1382 finally: 1383 do_stop = True 1384 t.join() 1385 1386 1387class RaiseSignalTest(unittest.TestCase): 1388 1389 def test_sigint(self): 1390 with self.assertRaises(KeyboardInterrupt): 1391 signal.raise_signal(signal.SIGINT) 1392 1393 @unittest.skipIf(sys.platform != "win32", "Windows specific test") 1394 def test_invalid_argument(self): 1395 try: 1396 SIGHUP = 1 # not supported on win32 1397 signal.raise_signal(SIGHUP) 1398 self.fail("OSError (Invalid argument) expected") 1399 except OSError as e: 1400 if e.errno == errno.EINVAL: 1401 pass 1402 else: 1403 raise 1404 1405 def test_handler(self): 1406 is_ok = False 1407 def handler(a, b): 1408 nonlocal is_ok 1409 is_ok = True 1410 old_signal = signal.signal(signal.SIGINT, handler) 1411 self.addCleanup(signal.signal, signal.SIGINT, old_signal) 1412 1413 signal.raise_signal(signal.SIGINT) 1414 self.assertTrue(is_ok) 1415 1416 def test__thread_interrupt_main(self): 1417 # See https://github.com/python/cpython/issues/102397 1418 code = """if 1: 1419 import _thread 1420 class Foo(): 1421 def __del__(self): 1422 _thread.interrupt_main() 1423 1424 x = Foo() 1425 """ 1426 1427 rc, out, err = assert_python_ok('-c', code) 1428 self.assertIn(b'OSError: Signal 2 ignored due to race condition', err) 1429 1430 1431 1432class PidfdSignalTest(unittest.TestCase): 1433 1434 @unittest.skipUnless( 1435 hasattr(signal, "pidfd_send_signal"), 1436 "pidfd support not built in", 1437 ) 1438 def test_pidfd_send_signal(self): 1439 with self.assertRaises(OSError) as cm: 1440 signal.pidfd_send_signal(0, signal.SIGINT) 1441 if cm.exception.errno == errno.ENOSYS: 1442 self.skipTest("kernel does not support pidfds") 1443 elif cm.exception.errno == errno.EPERM: 1444 self.skipTest("Not enough privileges to use pidfs") 1445 self.assertEqual(cm.exception.errno, errno.EBADF) 1446 my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY) 1447 self.addCleanup(os.close, my_pidfd) 1448 with self.assertRaisesRegex(TypeError, "^siginfo must be None$"): 1449 signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0) 1450 with self.assertRaises(KeyboardInterrupt): 1451 signal.pidfd_send_signal(my_pidfd, signal.SIGINT) 1452 1453def tearDownModule(): 1454 support.reap_children() 1455 1456if __name__ == "__main__": 1457 unittest.main() 1458