1"""PyUnit testing that threads honor our signal semantics""" 2 3import unittest 4import signal 5import os 6import sys 7from test.support import threading_helper 8import _thread as thread 9import time 10 11if (sys.platform[:3] == 'win'): 12 raise unittest.SkipTest("Can't test signal on %s" % sys.platform) 13 14process_pid = os.getpid() 15signalled_all=thread.allocate_lock() 16 17USING_PTHREAD_COND = (sys.thread_info.name == 'pthread' 18 and sys.thread_info.lock == 'mutex+cond') 19 20def registerSignals(for_usr1, for_usr2, for_alrm): 21 usr1 = signal.signal(signal.SIGUSR1, for_usr1) 22 usr2 = signal.signal(signal.SIGUSR2, for_usr2) 23 alrm = signal.signal(signal.SIGALRM, for_alrm) 24 return usr1, usr2, alrm 25 26 27# The signal handler. Just note that the signal occurred and 28# from who. 29def handle_signals(sig,frame): 30 signal_blackboard[sig]['tripped'] += 1 31 signal_blackboard[sig]['tripped_by'] = thread.get_ident() 32 33# a function that will be spawned as a separate thread. 34def send_signals(): 35 os.kill(process_pid, signal.SIGUSR1) 36 os.kill(process_pid, signal.SIGUSR2) 37 signalled_all.release() 38 39 40@threading_helper.requires_working_threading() 41@unittest.skipUnless(hasattr(signal, "alarm"), "test requires signal.alarm") 42class ThreadSignals(unittest.TestCase): 43 44 def test_signals(self): 45 with threading_helper.wait_threads_exit(): 46 # Test signal handling semantics of threads. 47 # We spawn a thread, have the thread send two signals, and 48 # wait for it to finish. Check that we got both signals 49 # and that they were run by the main thread. 50 signalled_all.acquire() 51 self.spawnSignallingThread() 52 signalled_all.acquire() 53 54 # the signals that we asked the kernel to send 55 # will come back, but we don't know when. 56 # (it might even be after the thread exits 57 # and might be out of order.) If we haven't seen 58 # the signals yet, send yet another signal and 59 # wait for it return. 60 if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \ 61 or signal_blackboard[signal.SIGUSR2]['tripped'] == 0: 62 try: 63 signal.alarm(1) 64 signal.pause() 65 finally: 66 signal.alarm(0) 67 68 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1) 69 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'], 70 thread.get_ident()) 71 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1) 72 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'], 73 thread.get_ident()) 74 signalled_all.release() 75 76 def spawnSignallingThread(self): 77 thread.start_new_thread(send_signals, ()) 78 79 def alarm_interrupt(self, sig, frame): 80 raise KeyboardInterrupt 81 82 @unittest.skipIf(USING_PTHREAD_COND, 83 'POSIX condition variables cannot be interrupted') 84 @unittest.skipIf(sys.platform.startswith('linux') and 85 not sys.thread_info.version, 86 'Issue 34004: musl does not allow interruption of locks ' 87 'by signals.') 88 # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD 89 @unittest.skipIf(sys.platform.startswith('openbsd'), 90 'lock cannot be interrupted on OpenBSD') 91 def test_lock_acquire_interruption(self): 92 # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck 93 # in a deadlock. 94 # XXX this test can fail when the legacy (non-semaphore) implementation 95 # of locks is used in thread_pthread.h, see issue #11223. 96 oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 97 try: 98 lock = thread.allocate_lock() 99 lock.acquire() 100 signal.alarm(1) 101 t1 = time.monotonic() 102 self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5) 103 dt = time.monotonic() - t1 104 # Checking that KeyboardInterrupt was raised is not sufficient. 105 # We want to assert that lock.acquire() was interrupted because 106 # of the signal, not that the signal handler was called immediately 107 # after timeout return of lock.acquire() (which can fool assertRaises). 108 self.assertLess(dt, 3.0) 109 finally: 110 signal.alarm(0) 111 signal.signal(signal.SIGALRM, oldalrm) 112 113 @unittest.skipIf(USING_PTHREAD_COND, 114 'POSIX condition variables cannot be interrupted') 115 @unittest.skipIf(sys.platform.startswith('linux') and 116 not sys.thread_info.version, 117 'Issue 34004: musl does not allow interruption of locks ' 118 'by signals.') 119 # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD 120 @unittest.skipIf(sys.platform.startswith('openbsd'), 121 'lock cannot be interrupted on OpenBSD') 122 def test_rlock_acquire_interruption(self): 123 # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck 124 # in a deadlock. 125 # XXX this test can fail when the legacy (non-semaphore) implementation 126 # of locks is used in thread_pthread.h, see issue #11223. 127 oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 128 try: 129 rlock = thread.RLock() 130 # For reentrant locks, the initial acquisition must be in another 131 # thread. 132 def other_thread(): 133 rlock.acquire() 134 135 with threading_helper.wait_threads_exit(): 136 thread.start_new_thread(other_thread, ()) 137 # Wait until we can't acquire it without blocking... 138 while rlock.acquire(blocking=False): 139 rlock.release() 140 time.sleep(0.01) 141 signal.alarm(1) 142 t1 = time.monotonic() 143 self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5) 144 dt = time.monotonic() - t1 145 # See rationale above in test_lock_acquire_interruption 146 self.assertLess(dt, 3.0) 147 finally: 148 signal.alarm(0) 149 signal.signal(signal.SIGALRM, oldalrm) 150 151 def acquire_retries_on_intr(self, lock): 152 self.sig_recvd = False 153 def my_handler(signal, frame): 154 self.sig_recvd = True 155 156 old_handler = signal.signal(signal.SIGUSR1, my_handler) 157 try: 158 def other_thread(): 159 # Acquire the lock in a non-main thread, so this test works for 160 # RLocks. 161 lock.acquire() 162 # Wait until the main thread is blocked in the lock acquire, and 163 # then wake it up with this. 164 time.sleep(0.5) 165 os.kill(process_pid, signal.SIGUSR1) 166 # Let the main thread take the interrupt, handle it, and retry 167 # the lock acquisition. Then we'll let it run. 168 time.sleep(0.5) 169 lock.release() 170 171 with threading_helper.wait_threads_exit(): 172 thread.start_new_thread(other_thread, ()) 173 # Wait until we can't acquire it without blocking... 174 while lock.acquire(blocking=False): 175 lock.release() 176 time.sleep(0.01) 177 result = lock.acquire() # Block while we receive a signal. 178 self.assertTrue(self.sig_recvd) 179 self.assertTrue(result) 180 finally: 181 signal.signal(signal.SIGUSR1, old_handler) 182 183 def test_lock_acquire_retries_on_intr(self): 184 self.acquire_retries_on_intr(thread.allocate_lock()) 185 186 def test_rlock_acquire_retries_on_intr(self): 187 self.acquire_retries_on_intr(thread.RLock()) 188 189 def test_interrupted_timed_acquire(self): 190 # Test to make sure we recompute lock acquisition timeouts when we 191 # receive a signal. Check this by repeatedly interrupting a lock 192 # acquire in the main thread, and make sure that the lock acquire times 193 # out after the right amount of time. 194 # NOTE: this test only behaves as expected if C signals get delivered 195 # to the main thread. Otherwise lock.acquire() itself doesn't get 196 # interrupted and the test trivially succeeds. 197 self.start = None 198 self.end = None 199 self.sigs_recvd = 0 200 done = thread.allocate_lock() 201 done.acquire() 202 lock = thread.allocate_lock() 203 lock.acquire() 204 def my_handler(signum, frame): 205 self.sigs_recvd += 1 206 old_handler = signal.signal(signal.SIGUSR1, my_handler) 207 try: 208 def timed_acquire(): 209 self.start = time.monotonic() 210 lock.acquire(timeout=0.5) 211 self.end = time.monotonic() 212 def send_signals(): 213 for _ in range(40): 214 time.sleep(0.02) 215 os.kill(process_pid, signal.SIGUSR1) 216 done.release() 217 218 with threading_helper.wait_threads_exit(): 219 # Send the signals from the non-main thread, since the main thread 220 # is the only one that can process signals. 221 thread.start_new_thread(send_signals, ()) 222 timed_acquire() 223 # Wait for thread to finish 224 done.acquire() 225 # This allows for some timing and scheduling imprecision 226 self.assertLess(self.end - self.start, 2.0) 227 self.assertGreater(self.end - self.start, 0.3) 228 # If the signal is received several times before PyErr_CheckSignals() 229 # is called, the handler will get called less than 40 times. Just 230 # check it's been called at least once. 231 self.assertGreater(self.sigs_recvd, 0) 232 finally: 233 signal.signal(signal.SIGUSR1, old_handler) 234 235 236def setUpModule(): 237 global signal_blackboard 238 239 signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 }, 240 signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 }, 241 signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } } 242 243 oldsigs = registerSignals(handle_signals, handle_signals, handle_signals) 244 unittest.addModuleCleanup(registerSignals, *oldsigs) 245 246 247if __name__ == '__main__': 248 unittest.main() 249