1"""PyUnit testing that threads honor our signal semantics"""
2
3import unittest
4import signal
5import os
6import sys
7from test.support import threading_helper
8import _thread as thread
9import time
10
11if (sys.platform[:3] == 'win'):
12    raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
13
14process_pid = os.getpid()
15signalled_all=thread.allocate_lock()
16
17USING_PTHREAD_COND = (sys.thread_info.name == 'pthread'
18                      and sys.thread_info.lock == 'mutex+cond')
19
20def registerSignals(for_usr1, for_usr2, for_alrm):
21    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
22    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
23    alrm = signal.signal(signal.SIGALRM, for_alrm)
24    return usr1, usr2, alrm
25
26
27# The signal handler. Just note that the signal occurred and
28# from who.
29def handle_signals(sig,frame):
30    signal_blackboard[sig]['tripped'] += 1
31    signal_blackboard[sig]['tripped_by'] = thread.get_ident()
32
33# a function that will be spawned as a separate thread.
34def send_signals():
35    os.kill(process_pid, signal.SIGUSR1)
36    os.kill(process_pid, signal.SIGUSR2)
37    signalled_all.release()
38
39
40@threading_helper.requires_working_threading()
41@unittest.skipUnless(hasattr(signal, "alarm"), "test requires signal.alarm")
42class ThreadSignals(unittest.TestCase):
43
44    def test_signals(self):
45        with threading_helper.wait_threads_exit():
46            # Test signal handling semantics of threads.
47            # We spawn a thread, have the thread send two signals, and
48            # wait for it to finish. Check that we got both signals
49            # and that they were run by the main thread.
50            signalled_all.acquire()
51            self.spawnSignallingThread()
52            signalled_all.acquire()
53
54        # the signals that we asked the kernel to send
55        # will come back, but we don't know when.
56        # (it might even be after the thread exits
57        # and might be out of order.)  If we haven't seen
58        # the signals yet, send yet another signal and
59        # wait for it return.
60        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
61           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
62            try:
63                signal.alarm(1)
64                signal.pause()
65            finally:
66                signal.alarm(0)
67
68        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
69        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
70                           thread.get_ident())
71        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
72        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
73                           thread.get_ident())
74        signalled_all.release()
75
76    def spawnSignallingThread(self):
77        thread.start_new_thread(send_signals, ())
78
79    def alarm_interrupt(self, sig, frame):
80        raise KeyboardInterrupt
81
82    @unittest.skipIf(USING_PTHREAD_COND,
83                     'POSIX condition variables cannot be interrupted')
84    @unittest.skipIf(sys.platform.startswith('linux') and
85                     not sys.thread_info.version,
86                     'Issue 34004: musl does not allow interruption of locks '
87                     'by signals.')
88    # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
89    @unittest.skipIf(sys.platform.startswith('openbsd'),
90                     'lock cannot be interrupted on OpenBSD')
91    def test_lock_acquire_interruption(self):
92        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
93        # in a deadlock.
94        # XXX this test can fail when the legacy (non-semaphore) implementation
95        # of locks is used in thread_pthread.h, see issue #11223.
96        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
97        try:
98            lock = thread.allocate_lock()
99            lock.acquire()
100            signal.alarm(1)
101            t1 = time.monotonic()
102            self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
103            dt = time.monotonic() - t1
104            # Checking that KeyboardInterrupt was raised is not sufficient.
105            # We want to assert that lock.acquire() was interrupted because
106            # of the signal, not that the signal handler was called immediately
107            # after timeout return of lock.acquire() (which can fool assertRaises).
108            self.assertLess(dt, 3.0)
109        finally:
110            signal.alarm(0)
111            signal.signal(signal.SIGALRM, oldalrm)
112
113    @unittest.skipIf(USING_PTHREAD_COND,
114                     'POSIX condition variables cannot be interrupted')
115    @unittest.skipIf(sys.platform.startswith('linux') and
116                     not sys.thread_info.version,
117                     'Issue 34004: musl does not allow interruption of locks '
118                     'by signals.')
119    # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
120    @unittest.skipIf(sys.platform.startswith('openbsd'),
121                     'lock cannot be interrupted on OpenBSD')
122    def test_rlock_acquire_interruption(self):
123        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
124        # in a deadlock.
125        # XXX this test can fail when the legacy (non-semaphore) implementation
126        # of locks is used in thread_pthread.h, see issue #11223.
127        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
128        try:
129            rlock = thread.RLock()
130            # For reentrant locks, the initial acquisition must be in another
131            # thread.
132            def other_thread():
133                rlock.acquire()
134
135            with threading_helper.wait_threads_exit():
136                thread.start_new_thread(other_thread, ())
137                # Wait until we can't acquire it without blocking...
138                while rlock.acquire(blocking=False):
139                    rlock.release()
140                    time.sleep(0.01)
141                signal.alarm(1)
142                t1 = time.monotonic()
143                self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
144                dt = time.monotonic() - t1
145                # See rationale above in test_lock_acquire_interruption
146                self.assertLess(dt, 3.0)
147        finally:
148            signal.alarm(0)
149            signal.signal(signal.SIGALRM, oldalrm)
150
151    def acquire_retries_on_intr(self, lock):
152        self.sig_recvd = False
153        def my_handler(signal, frame):
154            self.sig_recvd = True
155
156        old_handler = signal.signal(signal.SIGUSR1, my_handler)
157        try:
158            def other_thread():
159                # Acquire the lock in a non-main thread, so this test works for
160                # RLocks.
161                lock.acquire()
162                # Wait until the main thread is blocked in the lock acquire, and
163                # then wake it up with this.
164                time.sleep(0.5)
165                os.kill(process_pid, signal.SIGUSR1)
166                # Let the main thread take the interrupt, handle it, and retry
167                # the lock acquisition.  Then we'll let it run.
168                time.sleep(0.5)
169                lock.release()
170
171            with threading_helper.wait_threads_exit():
172                thread.start_new_thread(other_thread, ())
173                # Wait until we can't acquire it without blocking...
174                while lock.acquire(blocking=False):
175                    lock.release()
176                    time.sleep(0.01)
177                result = lock.acquire()  # Block while we receive a signal.
178                self.assertTrue(self.sig_recvd)
179                self.assertTrue(result)
180        finally:
181            signal.signal(signal.SIGUSR1, old_handler)
182
183    def test_lock_acquire_retries_on_intr(self):
184        self.acquire_retries_on_intr(thread.allocate_lock())
185
186    def test_rlock_acquire_retries_on_intr(self):
187        self.acquire_retries_on_intr(thread.RLock())
188
189    def test_interrupted_timed_acquire(self):
190        # Test to make sure we recompute lock acquisition timeouts when we
191        # receive a signal.  Check this by repeatedly interrupting a lock
192        # acquire in the main thread, and make sure that the lock acquire times
193        # out after the right amount of time.
194        # NOTE: this test only behaves as expected if C signals get delivered
195        # to the main thread.  Otherwise lock.acquire() itself doesn't get
196        # interrupted and the test trivially succeeds.
197        self.start = None
198        self.end = None
199        self.sigs_recvd = 0
200        done = thread.allocate_lock()
201        done.acquire()
202        lock = thread.allocate_lock()
203        lock.acquire()
204        def my_handler(signum, frame):
205            self.sigs_recvd += 1
206        old_handler = signal.signal(signal.SIGUSR1, my_handler)
207        try:
208            def timed_acquire():
209                self.start = time.monotonic()
210                lock.acquire(timeout=0.5)
211                self.end = time.monotonic()
212            def send_signals():
213                for _ in range(40):
214                    time.sleep(0.02)
215                    os.kill(process_pid, signal.SIGUSR1)
216                done.release()
217
218            with threading_helper.wait_threads_exit():
219                # Send the signals from the non-main thread, since the main thread
220                # is the only one that can process signals.
221                thread.start_new_thread(send_signals, ())
222                timed_acquire()
223                # Wait for thread to finish
224                done.acquire()
225                # This allows for some timing and scheduling imprecision
226                self.assertLess(self.end - self.start, 2.0)
227                self.assertGreater(self.end - self.start, 0.3)
228                # If the signal is received several times before PyErr_CheckSignals()
229                # is called, the handler will get called less than 40 times. Just
230                # check it's been called at least once.
231                self.assertGreater(self.sigs_recvd, 0)
232        finally:
233            signal.signal(signal.SIGUSR1, old_handler)
234
235
236def setUpModule():
237    global signal_blackboard
238
239    signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
240                          signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
241                          signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }
242
243    oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
244    unittest.addModuleCleanup(registerSignals, *oldsigs)
245
246
247if __name__ == '__main__':
248    unittest.main()
249