1import os 2import unittest 3import random 4from test import support 5from test.support import threading_helper 6import _thread as thread 7import time 8import weakref 9 10from test import lock_tests 11 12threading_helper.requires_working_threading(module=True) 13 14NUMTASKS = 10 15NUMTRIPS = 3 16POLL_SLEEP = 0.010 # seconds = 10 ms 17 18_print_mutex = thread.allocate_lock() 19 20def verbose_print(arg): 21 """Helper function for printing out debugging output.""" 22 if support.verbose: 23 with _print_mutex: 24 print(arg) 25 26 27class BasicThreadTest(unittest.TestCase): 28 29 def setUp(self): 30 self.done_mutex = thread.allocate_lock() 31 self.done_mutex.acquire() 32 self.running_mutex = thread.allocate_lock() 33 self.random_mutex = thread.allocate_lock() 34 self.created = 0 35 self.running = 0 36 self.next_ident = 0 37 38 key = threading_helper.threading_setup() 39 self.addCleanup(threading_helper.threading_cleanup, *key) 40 41 42class ThreadRunningTests(BasicThreadTest): 43 44 def newtask(self): 45 with self.running_mutex: 46 self.next_ident += 1 47 verbose_print("creating task %s" % self.next_ident) 48 thread.start_new_thread(self.task, (self.next_ident,)) 49 self.created += 1 50 self.running += 1 51 52 def task(self, ident): 53 with self.random_mutex: 54 delay = random.random() / 10000.0 55 verbose_print("task %s will run for %sus" % (ident, round(delay*1e6))) 56 time.sleep(delay) 57 verbose_print("task %s done" % ident) 58 with self.running_mutex: 59 self.running -= 1 60 if self.created == NUMTASKS and self.running == 0: 61 self.done_mutex.release() 62 63 def test_starting_threads(self): 64 with threading_helper.wait_threads_exit(): 65 # Basic test for thread creation. 66 for i in range(NUMTASKS): 67 self.newtask() 68 verbose_print("waiting for tasks to complete...") 69 self.done_mutex.acquire() 70 verbose_print("all tasks done") 71 72 def test_stack_size(self): 73 # Various stack size tests. 74 self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") 75 76 thread.stack_size(0) 77 self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") 78 79 @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix') 80 def test_nt_and_posix_stack_size(self): 81 try: 82 thread.stack_size(4096) 83 except ValueError: 84 verbose_print("caught expected ValueError setting " 85 "stack_size(4096)") 86 except thread.error: 87 self.skipTest("platform does not support changing thread stack " 88 "size") 89 90 fail_msg = "stack_size(%d) failed - should succeed" 91 for tss in (262144, 0x100000, 0): 92 thread.stack_size(tss) 93 self.assertEqual(thread.stack_size(), tss, fail_msg % tss) 94 verbose_print("successfully set stack_size(%d)" % tss) 95 96 for tss in (262144, 0x100000): 97 verbose_print("trying stack_size = (%d)" % tss) 98 self.next_ident = 0 99 self.created = 0 100 with threading_helper.wait_threads_exit(): 101 for i in range(NUMTASKS): 102 self.newtask() 103 104 verbose_print("waiting for all tasks to complete") 105 self.done_mutex.acquire() 106 verbose_print("all tasks done") 107 108 thread.stack_size(0) 109 110 def test__count(self): 111 # Test the _count() function. 112 orig = thread._count() 113 mut = thread.allocate_lock() 114 mut.acquire() 115 started = [] 116 117 def task(): 118 started.append(None) 119 mut.acquire() 120 mut.release() 121 122 with threading_helper.wait_threads_exit(): 123 thread.start_new_thread(task, ()) 124 while not started: 125 time.sleep(POLL_SLEEP) 126 self.assertEqual(thread._count(), orig + 1) 127 # Allow the task to finish. 128 mut.release() 129 # The only reliable way to be sure that the thread ended from the 130 # interpreter's point of view is to wait for the function object to be 131 # destroyed. 132 done = [] 133 wr = weakref.ref(task, lambda _: done.append(None)) 134 del task 135 while not done: 136 time.sleep(POLL_SLEEP) 137 support.gc_collect() # For PyPy or other GCs. 138 self.assertEqual(thread._count(), orig) 139 140 def test_unraisable_exception(self): 141 def task(): 142 started.release() 143 raise ValueError("task failed") 144 145 started = thread.allocate_lock() 146 with support.catch_unraisable_exception() as cm: 147 with threading_helper.wait_threads_exit(): 148 started.acquire() 149 thread.start_new_thread(task, ()) 150 started.acquire() 151 152 self.assertEqual(str(cm.unraisable.exc_value), "task failed") 153 self.assertIs(cm.unraisable.object, task) 154 self.assertEqual(cm.unraisable.err_msg, 155 "Exception ignored in thread started by") 156 self.assertIsNotNone(cm.unraisable.exc_traceback) 157 158 159class Barrier: 160 def __init__(self, num_threads): 161 self.num_threads = num_threads 162 self.waiting = 0 163 self.checkin_mutex = thread.allocate_lock() 164 self.checkout_mutex = thread.allocate_lock() 165 self.checkout_mutex.acquire() 166 167 def enter(self): 168 self.checkin_mutex.acquire() 169 self.waiting = self.waiting + 1 170 if self.waiting == self.num_threads: 171 self.waiting = self.num_threads - 1 172 self.checkout_mutex.release() 173 return 174 self.checkin_mutex.release() 175 176 self.checkout_mutex.acquire() 177 self.waiting = self.waiting - 1 178 if self.waiting == 0: 179 self.checkin_mutex.release() 180 return 181 self.checkout_mutex.release() 182 183 184class BarrierTest(BasicThreadTest): 185 186 def test_barrier(self): 187 with threading_helper.wait_threads_exit(): 188 self.bar = Barrier(NUMTASKS) 189 self.running = NUMTASKS 190 for i in range(NUMTASKS): 191 thread.start_new_thread(self.task2, (i,)) 192 verbose_print("waiting for tasks to end") 193 self.done_mutex.acquire() 194 verbose_print("tasks done") 195 196 def task2(self, ident): 197 for i in range(NUMTRIPS): 198 if ident == 0: 199 # give it a good chance to enter the next 200 # barrier before the others are all out 201 # of the current one 202 delay = 0 203 else: 204 with self.random_mutex: 205 delay = random.random() / 10000.0 206 verbose_print("task %s will run for %sus" % 207 (ident, round(delay * 1e6))) 208 time.sleep(delay) 209 verbose_print("task %s entering %s" % (ident, i)) 210 self.bar.enter() 211 verbose_print("task %s leaving barrier" % ident) 212 with self.running_mutex: 213 self.running -= 1 214 # Must release mutex before releasing done, else the main thread can 215 # exit and set mutex to None as part of global teardown; then 216 # mutex.release() raises AttributeError. 217 finished = self.running == 0 218 if finished: 219 self.done_mutex.release() 220 221class LockTests(lock_tests.LockTests): 222 locktype = thread.allocate_lock 223 224 225class TestForkInThread(unittest.TestCase): 226 def setUp(self): 227 self.read_fd, self.write_fd = os.pipe() 228 229 @support.requires_fork() 230 @threading_helper.reap_threads 231 def test_forkinthread(self): 232 pid = None 233 234 def fork_thread(read_fd, write_fd): 235 nonlocal pid 236 237 # fork in a thread 238 pid = os.fork() 239 if pid: 240 # parent process 241 return 242 243 # child process 244 try: 245 os.close(read_fd) 246 os.write(write_fd, b"OK") 247 finally: 248 os._exit(0) 249 250 with threading_helper.wait_threads_exit(): 251 thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd)) 252 self.assertEqual(os.read(self.read_fd, 2), b"OK") 253 os.close(self.write_fd) 254 255 self.assertIsNotNone(pid) 256 support.wait_process(pid, exitcode=0) 257 258 def tearDown(self): 259 try: 260 os.close(self.read_fd) 261 except OSError: 262 pass 263 264 try: 265 os.close(self.write_fd) 266 except OSError: 267 pass 268 269 270if __name__ == "__main__": 271 unittest.main() 272