1import os
2import unittest
3import random
4from test import support
5from test.support import threading_helper
6import _thread as thread
7import time
8import weakref
9
10from test import lock_tests
11
12threading_helper.requires_working_threading(module=True)
13
14NUMTASKS = 10
15NUMTRIPS = 3
16POLL_SLEEP = 0.010 # seconds = 10 ms
17
18_print_mutex = thread.allocate_lock()
19
20def verbose_print(arg):
21    """Helper function for printing out debugging output."""
22    if support.verbose:
23        with _print_mutex:
24            print(arg)
25
26
27class BasicThreadTest(unittest.TestCase):
28
29    def setUp(self):
30        self.done_mutex = thread.allocate_lock()
31        self.done_mutex.acquire()
32        self.running_mutex = thread.allocate_lock()
33        self.random_mutex = thread.allocate_lock()
34        self.created = 0
35        self.running = 0
36        self.next_ident = 0
37
38        key = threading_helper.threading_setup()
39        self.addCleanup(threading_helper.threading_cleanup, *key)
40
41
42class ThreadRunningTests(BasicThreadTest):
43
44    def newtask(self):
45        with self.running_mutex:
46            self.next_ident += 1
47            verbose_print("creating task %s" % self.next_ident)
48            thread.start_new_thread(self.task, (self.next_ident,))
49            self.created += 1
50            self.running += 1
51
52    def task(self, ident):
53        with self.random_mutex:
54            delay = random.random() / 10000.0
55        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
56        time.sleep(delay)
57        verbose_print("task %s done" % ident)
58        with self.running_mutex:
59            self.running -= 1
60            if self.created == NUMTASKS and self.running == 0:
61                self.done_mutex.release()
62
63    def test_starting_threads(self):
64        with threading_helper.wait_threads_exit():
65            # Basic test for thread creation.
66            for i in range(NUMTASKS):
67                self.newtask()
68            verbose_print("waiting for tasks to complete...")
69            self.done_mutex.acquire()
70            verbose_print("all tasks done")
71
72    def test_stack_size(self):
73        # Various stack size tests.
74        self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
75
76        thread.stack_size(0)
77        self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
78
79    @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
80    def test_nt_and_posix_stack_size(self):
81        try:
82            thread.stack_size(4096)
83        except ValueError:
84            verbose_print("caught expected ValueError setting "
85                            "stack_size(4096)")
86        except thread.error:
87            self.skipTest("platform does not support changing thread stack "
88                          "size")
89
90        fail_msg = "stack_size(%d) failed - should succeed"
91        for tss in (262144, 0x100000, 0):
92            thread.stack_size(tss)
93            self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
94            verbose_print("successfully set stack_size(%d)" % tss)
95
96        for tss in (262144, 0x100000):
97            verbose_print("trying stack_size = (%d)" % tss)
98            self.next_ident = 0
99            self.created = 0
100            with threading_helper.wait_threads_exit():
101                for i in range(NUMTASKS):
102                    self.newtask()
103
104                verbose_print("waiting for all tasks to complete")
105                self.done_mutex.acquire()
106                verbose_print("all tasks done")
107
108        thread.stack_size(0)
109
110    def test__count(self):
111        # Test the _count() function.
112        orig = thread._count()
113        mut = thread.allocate_lock()
114        mut.acquire()
115        started = []
116
117        def task():
118            started.append(None)
119            mut.acquire()
120            mut.release()
121
122        with threading_helper.wait_threads_exit():
123            thread.start_new_thread(task, ())
124            while not started:
125                time.sleep(POLL_SLEEP)
126            self.assertEqual(thread._count(), orig + 1)
127            # Allow the task to finish.
128            mut.release()
129            # The only reliable way to be sure that the thread ended from the
130            # interpreter's point of view is to wait for the function object to be
131            # destroyed.
132            done = []
133            wr = weakref.ref(task, lambda _: done.append(None))
134            del task
135            while not done:
136                time.sleep(POLL_SLEEP)
137                support.gc_collect()  # For PyPy or other GCs.
138            self.assertEqual(thread._count(), orig)
139
140    def test_unraisable_exception(self):
141        def task():
142            started.release()
143            raise ValueError("task failed")
144
145        started = thread.allocate_lock()
146        with support.catch_unraisable_exception() as cm:
147            with threading_helper.wait_threads_exit():
148                started.acquire()
149                thread.start_new_thread(task, ())
150                started.acquire()
151
152            self.assertEqual(str(cm.unraisable.exc_value), "task failed")
153            self.assertIs(cm.unraisable.object, task)
154            self.assertEqual(cm.unraisable.err_msg,
155                             "Exception ignored in thread started by")
156            self.assertIsNotNone(cm.unraisable.exc_traceback)
157
158
159class Barrier:
160    def __init__(self, num_threads):
161        self.num_threads = num_threads
162        self.waiting = 0
163        self.checkin_mutex  = thread.allocate_lock()
164        self.checkout_mutex = thread.allocate_lock()
165        self.checkout_mutex.acquire()
166
167    def enter(self):
168        self.checkin_mutex.acquire()
169        self.waiting = self.waiting + 1
170        if self.waiting == self.num_threads:
171            self.waiting = self.num_threads - 1
172            self.checkout_mutex.release()
173            return
174        self.checkin_mutex.release()
175
176        self.checkout_mutex.acquire()
177        self.waiting = self.waiting - 1
178        if self.waiting == 0:
179            self.checkin_mutex.release()
180            return
181        self.checkout_mutex.release()
182
183
184class BarrierTest(BasicThreadTest):
185
186    def test_barrier(self):
187        with threading_helper.wait_threads_exit():
188            self.bar = Barrier(NUMTASKS)
189            self.running = NUMTASKS
190            for i in range(NUMTASKS):
191                thread.start_new_thread(self.task2, (i,))
192            verbose_print("waiting for tasks to end")
193            self.done_mutex.acquire()
194            verbose_print("tasks done")
195
196    def task2(self, ident):
197        for i in range(NUMTRIPS):
198            if ident == 0:
199                # give it a good chance to enter the next
200                # barrier before the others are all out
201                # of the current one
202                delay = 0
203            else:
204                with self.random_mutex:
205                    delay = random.random() / 10000.0
206            verbose_print("task %s will run for %sus" %
207                          (ident, round(delay * 1e6)))
208            time.sleep(delay)
209            verbose_print("task %s entering %s" % (ident, i))
210            self.bar.enter()
211            verbose_print("task %s leaving barrier" % ident)
212        with self.running_mutex:
213            self.running -= 1
214            # Must release mutex before releasing done, else the main thread can
215            # exit and set mutex to None as part of global teardown; then
216            # mutex.release() raises AttributeError.
217            finished = self.running == 0
218        if finished:
219            self.done_mutex.release()
220
221class LockTests(lock_tests.LockTests):
222    locktype = thread.allocate_lock
223
224
225class TestForkInThread(unittest.TestCase):
226    def setUp(self):
227        self.read_fd, self.write_fd = os.pipe()
228
229    @support.requires_fork()
230    @threading_helper.reap_threads
231    def test_forkinthread(self):
232        pid = None
233
234        def fork_thread(read_fd, write_fd):
235            nonlocal pid
236
237            # fork in a thread
238            pid = os.fork()
239            if pid:
240                # parent process
241                return
242
243            # child process
244            try:
245                os.close(read_fd)
246                os.write(write_fd, b"OK")
247            finally:
248                os._exit(0)
249
250        with threading_helper.wait_threads_exit():
251            thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd))
252            self.assertEqual(os.read(self.read_fd, 2), b"OK")
253            os.close(self.write_fd)
254
255        self.assertIsNotNone(pid)
256        support.wait_process(pid, exitcode=0)
257
258    def tearDown(self):
259        try:
260            os.close(self.read_fd)
261        except OSError:
262            pass
263
264        try:
265            os.close(self.write_fd)
266        except OSError:
267            pass
268
269
270if __name__ == "__main__":
271    unittest.main()
272