1"""Thread module emulating a subset of Java's threading model.""" 2 3import os as _os 4import sys as _sys 5import _thread 6import functools 7 8from time import monotonic as _time 9from _weakrefset import WeakSet 10from itertools import islice as _islice, count as _count 11try: 12 from _collections import deque as _deque 13except ImportError: 14 from collections import deque as _deque 15 16# Note regarding PEP 8 compliant names 17# This threading model was originally inspired by Java, and inherited 18# the convention of camelCase function and method names from that 19# language. Those original names are not in any imminent danger of 20# being deprecated (even for Py3k),so this module provides them as an 21# alias for the PEP 8 compliant names 22# Note that using the new PEP 8 compliant names facilitates substitution 23# with the multiprocessing module, which doesn't provide the old 24# Java inspired names. 25 26__all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 27 'enumerate', 'main_thread', 'TIMEOUT_MAX', 28 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 29 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 30 'setprofile', 'settrace', 'local', 'stack_size', 31 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile'] 32 33# Rename some stuff so "from threading import *" is safe 34_start_new_thread = _thread.start_new_thread 35_allocate_lock = _thread.allocate_lock 36_set_sentinel = _thread._set_sentinel 37get_ident = _thread.get_ident 38try: 39 get_native_id = _thread.get_native_id 40 _HAVE_THREAD_NATIVE_ID = True 41 __all__.append('get_native_id') 42except AttributeError: 43 _HAVE_THREAD_NATIVE_ID = False 44ThreadError = _thread.error 45try: 46 _CRLock = _thread.RLock 47except AttributeError: 48 _CRLock = None 49TIMEOUT_MAX = _thread.TIMEOUT_MAX 50del _thread 51 52 53# Support for profile and trace hooks 54 55_profile_hook = None 56_trace_hook = None 57 58def setprofile(func): 59 """Set a profile function for all threads started from the threading module. 60 61 The func will be passed to sys.setprofile() for each thread, before its 62 run() method is called. 63 64 """ 65 global _profile_hook 66 _profile_hook = func 67 68def getprofile(): 69 """Get the profiler function as set by threading.setprofile().""" 70 return _profile_hook 71 72def settrace(func): 73 """Set a trace function for all threads started from the threading module. 74 75 The func will be passed to sys.settrace() for each thread, before its run() 76 method is called. 77 78 """ 79 global _trace_hook 80 _trace_hook = func 81 82def gettrace(): 83 """Get the trace function as set by threading.settrace().""" 84 return _trace_hook 85 86# Synchronization classes 87 88Lock = _allocate_lock 89 90def RLock(*args, **kwargs): 91 """Factory function that returns a new reentrant lock. 92 93 A reentrant lock must be released by the thread that acquired it. Once a 94 thread has acquired a reentrant lock, the same thread may acquire it again 95 without blocking; the thread must release it once for each time it has 96 acquired it. 97 98 """ 99 if _CRLock is None: 100 return _PyRLock(*args, **kwargs) 101 return _CRLock(*args, **kwargs) 102 103class _RLock: 104 """This class implements reentrant lock objects. 105 106 A reentrant lock must be released by the thread that acquired it. Once a 107 thread has acquired a reentrant lock, the same thread may acquire it 108 again without blocking; the thread must release it once for each time it 109 has acquired it. 110 111 """ 112 113 def __init__(self): 114 self._block = _allocate_lock() 115 self._owner = None 116 self._count = 0 117 118 def __repr__(self): 119 owner = self._owner 120 try: 121 owner = _active[owner].name 122 except KeyError: 123 pass 124 return "<%s %s.%s object owner=%r count=%d at %s>" % ( 125 "locked" if self._block.locked() else "unlocked", 126 self.__class__.__module__, 127 self.__class__.__qualname__, 128 owner, 129 self._count, 130 hex(id(self)) 131 ) 132 133 def _at_fork_reinit(self): 134 self._block._at_fork_reinit() 135 self._owner = None 136 self._count = 0 137 138 def acquire(self, blocking=True, timeout=-1): 139 """Acquire a lock, blocking or non-blocking. 140 141 When invoked without arguments: if this thread already owns the lock, 142 increment the recursion level by one, and return immediately. Otherwise, 143 if another thread owns the lock, block until the lock is unlocked. Once 144 the lock is unlocked (not owned by any thread), then grab ownership, set 145 the recursion level to one, and return. If more than one thread is 146 blocked waiting until the lock is unlocked, only one at a time will be 147 able to grab ownership of the lock. There is no return value in this 148 case. 149 150 When invoked with the blocking argument set to true, do the same thing 151 as when called without arguments, and return true. 152 153 When invoked with the blocking argument set to false, do not block. If a 154 call without an argument would block, return false immediately; 155 otherwise, do the same thing as when called without arguments, and 156 return true. 157 158 When invoked with the floating-point timeout argument set to a positive 159 value, block for at most the number of seconds specified by timeout 160 and as long as the lock cannot be acquired. Return true if the lock has 161 been acquired, false if the timeout has elapsed. 162 163 """ 164 me = get_ident() 165 if self._owner == me: 166 self._count += 1 167 return 1 168 rc = self._block.acquire(blocking, timeout) 169 if rc: 170 self._owner = me 171 self._count = 1 172 return rc 173 174 __enter__ = acquire 175 176 def release(self): 177 """Release a lock, decrementing the recursion level. 178 179 If after the decrement it is zero, reset the lock to unlocked (not owned 180 by any thread), and if any other threads are blocked waiting for the 181 lock to become unlocked, allow exactly one of them to proceed. If after 182 the decrement the recursion level is still nonzero, the lock remains 183 locked and owned by the calling thread. 184 185 Only call this method when the calling thread owns the lock. A 186 RuntimeError is raised if this method is called when the lock is 187 unlocked. 188 189 There is no return value. 190 191 """ 192 if self._owner != get_ident(): 193 raise RuntimeError("cannot release un-acquired lock") 194 self._count = count = self._count - 1 195 if not count: 196 self._owner = None 197 self._block.release() 198 199 def __exit__(self, t, v, tb): 200 self.release() 201 202 # Internal methods used by condition variables 203 204 def _acquire_restore(self, state): 205 self._block.acquire() 206 self._count, self._owner = state 207 208 def _release_save(self): 209 if self._count == 0: 210 raise RuntimeError("cannot release un-acquired lock") 211 count = self._count 212 self._count = 0 213 owner = self._owner 214 self._owner = None 215 self._block.release() 216 return (count, owner) 217 218 def _is_owned(self): 219 return self._owner == get_ident() 220 221_PyRLock = _RLock 222 223 224class Condition: 225 """Class that implements a condition variable. 226 227 A condition variable allows one or more threads to wait until they are 228 notified by another thread. 229 230 If the lock argument is given and not None, it must be a Lock or RLock 231 object, and it is used as the underlying lock. Otherwise, a new RLock object 232 is created and used as the underlying lock. 233 234 """ 235 236 def __init__(self, lock=None): 237 if lock is None: 238 lock = RLock() 239 self._lock = lock 240 # Export the lock's acquire() and release() methods 241 self.acquire = lock.acquire 242 self.release = lock.release 243 # If the lock defines _release_save() and/or _acquire_restore(), 244 # these override the default implementations (which just call 245 # release() and acquire() on the lock). Ditto for _is_owned(). 246 try: 247 self._release_save = lock._release_save 248 except AttributeError: 249 pass 250 try: 251 self._acquire_restore = lock._acquire_restore 252 except AttributeError: 253 pass 254 try: 255 self._is_owned = lock._is_owned 256 except AttributeError: 257 pass 258 self._waiters = _deque() 259 260 def _at_fork_reinit(self): 261 self._lock._at_fork_reinit() 262 self._waiters.clear() 263 264 def __enter__(self): 265 return self._lock.__enter__() 266 267 def __exit__(self, *args): 268 return self._lock.__exit__(*args) 269 270 def __repr__(self): 271 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters)) 272 273 def _release_save(self): 274 self._lock.release() # No state to save 275 276 def _acquire_restore(self, x): 277 self._lock.acquire() # Ignore saved state 278 279 def _is_owned(self): 280 # Return True if lock is owned by current_thread. 281 # This method is called only if _lock doesn't have _is_owned(). 282 if self._lock.acquire(False): 283 self._lock.release() 284 return False 285 else: 286 return True 287 288 def wait(self, timeout=None): 289 """Wait until notified or until a timeout occurs. 290 291 If the calling thread has not acquired the lock when this method is 292 called, a RuntimeError is raised. 293 294 This method releases the underlying lock, and then blocks until it is 295 awakened by a notify() or notify_all() call for the same condition 296 variable in another thread, or until the optional timeout occurs. Once 297 awakened or timed out, it re-acquires the lock and returns. 298 299 When the timeout argument is present and not None, it should be a 300 floating point number specifying a timeout for the operation in seconds 301 (or fractions thereof). 302 303 When the underlying lock is an RLock, it is not released using its 304 release() method, since this may not actually unlock the lock when it 305 was acquired multiple times recursively. Instead, an internal interface 306 of the RLock class is used, which really unlocks it even when it has 307 been recursively acquired several times. Another internal interface is 308 then used to restore the recursion level when the lock is reacquired. 309 310 """ 311 if not self._is_owned(): 312 raise RuntimeError("cannot wait on un-acquired lock") 313 waiter = _allocate_lock() 314 waiter.acquire() 315 self._waiters.append(waiter) 316 saved_state = self._release_save() 317 gotit = False 318 try: # restore state no matter what (e.g., KeyboardInterrupt) 319 if timeout is None: 320 waiter.acquire() 321 gotit = True 322 else: 323 if timeout > 0: 324 gotit = waiter.acquire(True, timeout) 325 else: 326 gotit = waiter.acquire(False) 327 return gotit 328 finally: 329 self._acquire_restore(saved_state) 330 if not gotit: 331 try: 332 self._waiters.remove(waiter) 333 except ValueError: 334 pass 335 336 def wait_for(self, predicate, timeout=None): 337 """Wait until a condition evaluates to True. 338 339 predicate should be a callable which result will be interpreted as a 340 boolean value. A timeout may be provided giving the maximum time to 341 wait. 342 343 """ 344 endtime = None 345 waittime = timeout 346 result = predicate() 347 while not result: 348 if waittime is not None: 349 if endtime is None: 350 endtime = _time() + waittime 351 else: 352 waittime = endtime - _time() 353 if waittime <= 0: 354 break 355 self.wait(waittime) 356 result = predicate() 357 return result 358 359 def notify(self, n=1): 360 """Wake up one or more threads waiting on this condition, if any. 361 362 If the calling thread has not acquired the lock when this method is 363 called, a RuntimeError is raised. 364 365 This method wakes up at most n of the threads waiting for the condition 366 variable; it is a no-op if no threads are waiting. 367 368 """ 369 if not self._is_owned(): 370 raise RuntimeError("cannot notify on un-acquired lock") 371 waiters = self._waiters 372 while waiters and n > 0: 373 waiter = waiters[0] 374 try: 375 waiter.release() 376 except RuntimeError: 377 # gh-92530: The previous call of notify() released the lock, 378 # but was interrupted before removing it from the queue. 379 # It can happen if a signal handler raises an exception, 380 # like CTRL+C which raises KeyboardInterrupt. 381 pass 382 else: 383 n -= 1 384 try: 385 waiters.remove(waiter) 386 except ValueError: 387 pass 388 389 def notify_all(self): 390 """Wake up all threads waiting on this condition. 391 392 If the calling thread has not acquired the lock when this method 393 is called, a RuntimeError is raised. 394 395 """ 396 self.notify(len(self._waiters)) 397 398 def notifyAll(self): 399 """Wake up all threads waiting on this condition. 400 401 This method is deprecated, use notify_all() instead. 402 403 """ 404 import warnings 405 warnings.warn('notifyAll() is deprecated, use notify_all() instead', 406 DeprecationWarning, stacklevel=2) 407 self.notify_all() 408 409 410class Semaphore: 411 """This class implements semaphore objects. 412 413 Semaphores manage a counter representing the number of release() calls minus 414 the number of acquire() calls, plus an initial value. The acquire() method 415 blocks if necessary until it can return without making the counter 416 negative. If not given, value defaults to 1. 417 418 """ 419 420 # After Tim Peters' semaphore class, but not quite the same (no maximum) 421 422 def __init__(self, value=1): 423 if value < 0: 424 raise ValueError("semaphore initial value must be >= 0") 425 self._cond = Condition(Lock()) 426 self._value = value 427 428 def __repr__(self): 429 cls = self.__class__ 430 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:" 431 f" value={self._value}>") 432 433 def acquire(self, blocking=True, timeout=None): 434 """Acquire a semaphore, decrementing the internal counter by one. 435 436 When invoked without arguments: if the internal counter is larger than 437 zero on entry, decrement it by one and return immediately. If it is zero 438 on entry, block, waiting until some other thread has called release() to 439 make it larger than zero. This is done with proper interlocking so that 440 if multiple acquire() calls are blocked, release() will wake exactly one 441 of them up. The implementation may pick one at random, so the order in 442 which blocked threads are awakened should not be relied on. There is no 443 return value in this case. 444 445 When invoked with blocking set to true, do the same thing as when called 446 without arguments, and return true. 447 448 When invoked with blocking set to false, do not block. If a call without 449 an argument would block, return false immediately; otherwise, do the 450 same thing as when called without arguments, and return true. 451 452 When invoked with a timeout other than None, it will block for at 453 most timeout seconds. If acquire does not complete successfully in 454 that interval, return false. Return true otherwise. 455 456 """ 457 if not blocking and timeout is not None: 458 raise ValueError("can't specify timeout for non-blocking acquire") 459 rc = False 460 endtime = None 461 with self._cond: 462 while self._value == 0: 463 if not blocking: 464 break 465 if timeout is not None: 466 if endtime is None: 467 endtime = _time() + timeout 468 else: 469 timeout = endtime - _time() 470 if timeout <= 0: 471 break 472 self._cond.wait(timeout) 473 else: 474 self._value -= 1 475 rc = True 476 return rc 477 478 __enter__ = acquire 479 480 def release(self, n=1): 481 """Release a semaphore, incrementing the internal counter by one or more. 482 483 When the counter is zero on entry and another thread is waiting for it 484 to become larger than zero again, wake up that thread. 485 486 """ 487 if n < 1: 488 raise ValueError('n must be one or more') 489 with self._cond: 490 self._value += n 491 for i in range(n): 492 self._cond.notify() 493 494 def __exit__(self, t, v, tb): 495 self.release() 496 497 498class BoundedSemaphore(Semaphore): 499 """Implements a bounded semaphore. 500 501 A bounded semaphore checks to make sure its current value doesn't exceed its 502 initial value. If it does, ValueError is raised. In most situations 503 semaphores are used to guard resources with limited capacity. 504 505 If the semaphore is released too many times it's a sign of a bug. If not 506 given, value defaults to 1. 507 508 Like regular semaphores, bounded semaphores manage a counter representing 509 the number of release() calls minus the number of acquire() calls, plus an 510 initial value. The acquire() method blocks if necessary until it can return 511 without making the counter negative. If not given, value defaults to 1. 512 513 """ 514 515 def __init__(self, value=1): 516 Semaphore.__init__(self, value) 517 self._initial_value = value 518 519 def __repr__(self): 520 cls = self.__class__ 521 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:" 522 f" value={self._value}/{self._initial_value}>") 523 524 def release(self, n=1): 525 """Release a semaphore, incrementing the internal counter by one or more. 526 527 When the counter is zero on entry and another thread is waiting for it 528 to become larger than zero again, wake up that thread. 529 530 If the number of releases exceeds the number of acquires, 531 raise a ValueError. 532 533 """ 534 if n < 1: 535 raise ValueError('n must be one or more') 536 with self._cond: 537 if self._value + n > self._initial_value: 538 raise ValueError("Semaphore released too many times") 539 self._value += n 540 for i in range(n): 541 self._cond.notify() 542 543 544class Event: 545 """Class implementing event objects. 546 547 Events manage a flag that can be set to true with the set() method and reset 548 to false with the clear() method. The wait() method blocks until the flag is 549 true. The flag is initially false. 550 551 """ 552 553 # After Tim Peters' event class (without is_posted()) 554 555 def __init__(self): 556 self._cond = Condition(Lock()) 557 self._flag = False 558 559 def __repr__(self): 560 cls = self.__class__ 561 status = 'set' if self._flag else 'unset' 562 return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>" 563 564 def _at_fork_reinit(self): 565 # Private method called by Thread._reset_internal_locks() 566 self._cond._at_fork_reinit() 567 568 def is_set(self): 569 """Return true if and only if the internal flag is true.""" 570 return self._flag 571 572 def isSet(self): 573 """Return true if and only if the internal flag is true. 574 575 This method is deprecated, use is_set() instead. 576 577 """ 578 import warnings 579 warnings.warn('isSet() is deprecated, use is_set() instead', 580 DeprecationWarning, stacklevel=2) 581 return self.is_set() 582 583 def set(self): 584 """Set the internal flag to true. 585 586 All threads waiting for it to become true are awakened. Threads 587 that call wait() once the flag is true will not block at all. 588 589 """ 590 with self._cond: 591 self._flag = True 592 self._cond.notify_all() 593 594 def clear(self): 595 """Reset the internal flag to false. 596 597 Subsequently, threads calling wait() will block until set() is called to 598 set the internal flag to true again. 599 600 """ 601 with self._cond: 602 self._flag = False 603 604 def wait(self, timeout=None): 605 """Block until the internal flag is true. 606 607 If the internal flag is true on entry, return immediately. Otherwise, 608 block until another thread calls set() to set the flag to true, or until 609 the optional timeout occurs. 610 611 When the timeout argument is present and not None, it should be a 612 floating point number specifying a timeout for the operation in seconds 613 (or fractions thereof). 614 615 This method returns the internal flag on exit, so it will always return 616 True except if a timeout is given and the operation times out. 617 618 """ 619 with self._cond: 620 signaled = self._flag 621 if not signaled: 622 signaled = self._cond.wait(timeout) 623 return signaled 624 625 626# A barrier class. Inspired in part by the pthread_barrier_* api and 627# the CyclicBarrier class from Java. See 628# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and 629# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ 630# CyclicBarrier.html 631# for information. 632# We maintain two main states, 'filling' and 'draining' enabling the barrier 633# to be cyclic. Threads are not allowed into it until it has fully drained 634# since the previous cycle. In addition, a 'resetting' state exists which is 635# similar to 'draining' except that threads leave with a BrokenBarrierError, 636# and a 'broken' state in which all threads get the exception. 637class Barrier: 638 """Implements a Barrier. 639 640 Useful for synchronizing a fixed number of threads at known synchronization 641 points. Threads block on 'wait()' and are simultaneously awoken once they 642 have all made that call. 643 644 """ 645 646 def __init__(self, parties, action=None, timeout=None): 647 """Create a barrier, initialised to 'parties' threads. 648 649 'action' is a callable which, when supplied, will be called by one of 650 the threads after they have all entered the barrier and just prior to 651 releasing them all. If a 'timeout' is provided, it is used as the 652 default for all subsequent 'wait()' calls. 653 654 """ 655 self._cond = Condition(Lock()) 656 self._action = action 657 self._timeout = timeout 658 self._parties = parties 659 self._state = 0 # 0 filling, 1 draining, -1 resetting, -2 broken 660 self._count = 0 661 662 def __repr__(self): 663 cls = self.__class__ 664 if self.broken: 665 return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: broken>" 666 return (f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}:" 667 f" waiters={self.n_waiting}/{self.parties}>") 668 669 def wait(self, timeout=None): 670 """Wait for the barrier. 671 672 When the specified number of threads have started waiting, they are all 673 simultaneously awoken. If an 'action' was provided for the barrier, one 674 of the threads will have executed that callback prior to returning. 675 Returns an individual index number from 0 to 'parties-1'. 676 677 """ 678 if timeout is None: 679 timeout = self._timeout 680 with self._cond: 681 self._enter() # Block while the barrier drains. 682 index = self._count 683 self._count += 1 684 try: 685 if index + 1 == self._parties: 686 # We release the barrier 687 self._release() 688 else: 689 # We wait until someone releases us 690 self._wait(timeout) 691 return index 692 finally: 693 self._count -= 1 694 # Wake up any threads waiting for barrier to drain. 695 self._exit() 696 697 # Block until the barrier is ready for us, or raise an exception 698 # if it is broken. 699 def _enter(self): 700 while self._state in (-1, 1): 701 # It is draining or resetting, wait until done 702 self._cond.wait() 703 #see if the barrier is in a broken state 704 if self._state < 0: 705 raise BrokenBarrierError 706 assert self._state == 0 707 708 # Optionally run the 'action' and release the threads waiting 709 # in the barrier. 710 def _release(self): 711 try: 712 if self._action: 713 self._action() 714 # enter draining state 715 self._state = 1 716 self._cond.notify_all() 717 except: 718 #an exception during the _action handler. Break and reraise 719 self._break() 720 raise 721 722 # Wait in the barrier until we are released. Raise an exception 723 # if the barrier is reset or broken. 724 def _wait(self, timeout): 725 if not self._cond.wait_for(lambda : self._state != 0, timeout): 726 #timed out. Break the barrier 727 self._break() 728 raise BrokenBarrierError 729 if self._state < 0: 730 raise BrokenBarrierError 731 assert self._state == 1 732 733 # If we are the last thread to exit the barrier, signal any threads 734 # waiting for the barrier to drain. 735 def _exit(self): 736 if self._count == 0: 737 if self._state in (-1, 1): 738 #resetting or draining 739 self._state = 0 740 self._cond.notify_all() 741 742 def reset(self): 743 """Reset the barrier to the initial state. 744 745 Any threads currently waiting will get the BrokenBarrier exception 746 raised. 747 748 """ 749 with self._cond: 750 if self._count > 0: 751 if self._state == 0: 752 #reset the barrier, waking up threads 753 self._state = -1 754 elif self._state == -2: 755 #was broken, set it to reset state 756 #which clears when the last thread exits 757 self._state = -1 758 else: 759 self._state = 0 760 self._cond.notify_all() 761 762 def abort(self): 763 """Place the barrier into a 'broken' state. 764 765 Useful in case of error. Any currently waiting threads and threads 766 attempting to 'wait()' will have BrokenBarrierError raised. 767 768 """ 769 with self._cond: 770 self._break() 771 772 def _break(self): 773 # An internal error was detected. The barrier is set to 774 # a broken state all parties awakened. 775 self._state = -2 776 self._cond.notify_all() 777 778 @property 779 def parties(self): 780 """Return the number of threads required to trip the barrier.""" 781 return self._parties 782 783 @property 784 def n_waiting(self): 785 """Return the number of threads currently waiting at the barrier.""" 786 # We don't need synchronization here since this is an ephemeral result 787 # anyway. It returns the correct value in the steady state. 788 if self._state == 0: 789 return self._count 790 return 0 791 792 @property 793 def broken(self): 794 """Return True if the barrier is in a broken state.""" 795 return self._state == -2 796 797# exception raised by the Barrier class 798class BrokenBarrierError(RuntimeError): 799 pass 800 801 802# Helper to generate new thread names 803_counter = _count(1).__next__ 804def _newname(name_template): 805 return name_template % _counter() 806 807# Active thread administration. 808# 809# bpo-44422: Use a reentrant lock to allow reentrant calls to functions like 810# threading.enumerate(). 811_active_limbo_lock = RLock() 812_active = {} # maps thread id to Thread object 813_limbo = {} 814_dangling = WeakSet() 815 816# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown() 817# to wait until all Python thread states get deleted: 818# see Thread._set_tstate_lock(). 819_shutdown_locks_lock = _allocate_lock() 820_shutdown_locks = set() 821 822def _maintain_shutdown_locks(): 823 """ 824 Drop any shutdown locks that don't correspond to running threads anymore. 825 826 Calling this from time to time avoids an ever-growing _shutdown_locks 827 set when Thread objects are not joined explicitly. See bpo-37788. 828 829 This must be called with _shutdown_locks_lock acquired. 830 """ 831 # If a lock was released, the corresponding thread has exited 832 to_remove = [lock for lock in _shutdown_locks if not lock.locked()] 833 _shutdown_locks.difference_update(to_remove) 834 835 836# Main class for threads 837 838class Thread: 839 """A class that represents a thread of control. 840 841 This class can be safely subclassed in a limited fashion. There are two ways 842 to specify the activity: by passing a callable object to the constructor, or 843 by overriding the run() method in a subclass. 844 845 """ 846 847 _initialized = False 848 849 def __init__(self, group=None, target=None, name=None, 850 args=(), kwargs=None, *, daemon=None): 851 """This constructor should always be called with keyword arguments. Arguments are: 852 853 *group* should be None; reserved for future extension when a ThreadGroup 854 class is implemented. 855 856 *target* is the callable object to be invoked by the run() 857 method. Defaults to None, meaning nothing is called. 858 859 *name* is the thread name. By default, a unique name is constructed of 860 the form "Thread-N" where N is a small decimal number. 861 862 *args* is a list or tuple of arguments for the target invocation. Defaults to (). 863 864 *kwargs* is a dictionary of keyword arguments for the target 865 invocation. Defaults to {}. 866 867 If a subclass overrides the constructor, it must make sure to invoke 868 the base class constructor (Thread.__init__()) before doing anything 869 else to the thread. 870 871 """ 872 assert group is None, "group argument must be None for now" 873 if kwargs is None: 874 kwargs = {} 875 if name: 876 name = str(name) 877 else: 878 name = _newname("Thread-%d") 879 if target is not None: 880 try: 881 target_name = target.__name__ 882 name += f" ({target_name})" 883 except AttributeError: 884 pass 885 886 self._target = target 887 self._name = name 888 self._args = args 889 self._kwargs = kwargs 890 if daemon is not None: 891 self._daemonic = daemon 892 else: 893 self._daemonic = current_thread().daemon 894 self._ident = None 895 if _HAVE_THREAD_NATIVE_ID: 896 self._native_id = None 897 self._tstate_lock = None 898 self._started = Event() 899 self._is_stopped = False 900 self._initialized = True 901 # Copy of sys.stderr used by self._invoke_excepthook() 902 self._stderr = _sys.stderr 903 self._invoke_excepthook = _make_invoke_excepthook() 904 # For debugging and _after_fork() 905 _dangling.add(self) 906 907 def _reset_internal_locks(self, is_alive): 908 # private! Called by _after_fork() to reset our internal locks as 909 # they may be in an invalid state leading to a deadlock or crash. 910 self._started._at_fork_reinit() 911 if is_alive: 912 # bpo-42350: If the fork happens when the thread is already stopped 913 # (ex: after threading._shutdown() has been called), _tstate_lock 914 # is None. Do nothing in this case. 915 if self._tstate_lock is not None: 916 self._tstate_lock._at_fork_reinit() 917 self._tstate_lock.acquire() 918 else: 919 # The thread isn't alive after fork: it doesn't have a tstate 920 # anymore. 921 self._is_stopped = True 922 self._tstate_lock = None 923 924 def __repr__(self): 925 assert self._initialized, "Thread.__init__() was not called" 926 status = "initial" 927 if self._started.is_set(): 928 status = "started" 929 self.is_alive() # easy way to get ._is_stopped set when appropriate 930 if self._is_stopped: 931 status = "stopped" 932 if self._daemonic: 933 status += " daemon" 934 if self._ident is not None: 935 status += " %s" % self._ident 936 return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status) 937 938 def start(self): 939 """Start the thread's activity. 940 941 It must be called at most once per thread object. It arranges for the 942 object's run() method to be invoked in a separate thread of control. 943 944 This method will raise a RuntimeError if called more than once on the 945 same thread object. 946 947 """ 948 if not self._initialized: 949 raise RuntimeError("thread.__init__() not called") 950 951 if self._started.is_set(): 952 raise RuntimeError("threads can only be started once") 953 954 with _active_limbo_lock: 955 _limbo[self] = self 956 try: 957 _start_new_thread(self._bootstrap, ()) 958 except Exception: 959 with _active_limbo_lock: 960 del _limbo[self] 961 raise 962 self._started.wait() 963 964 def run(self): 965 """Method representing the thread's activity. 966 967 You may override this method in a subclass. The standard run() method 968 invokes the callable object passed to the object's constructor as the 969 target argument, if any, with sequential and keyword arguments taken 970 from the args and kwargs arguments, respectively. 971 972 """ 973 try: 974 if self._target is not None: 975 self._target(*self._args, **self._kwargs) 976 finally: 977 # Avoid a refcycle if the thread is running a function with 978 # an argument that has a member that points to the thread. 979 del self._target, self._args, self._kwargs 980 981 def _bootstrap(self): 982 # Wrapper around the real bootstrap code that ignores 983 # exceptions during interpreter cleanup. Those typically 984 # happen when a daemon thread wakes up at an unfortunate 985 # moment, finds the world around it destroyed, and raises some 986 # random exception *** while trying to report the exception in 987 # _bootstrap_inner() below ***. Those random exceptions 988 # don't help anybody, and they confuse users, so we suppress 989 # them. We suppress them only when it appears that the world 990 # indeed has already been destroyed, so that exceptions in 991 # _bootstrap_inner() during normal business hours are properly 992 # reported. Also, we only suppress them for daemonic threads; 993 # if a non-daemonic encounters this, something else is wrong. 994 try: 995 self._bootstrap_inner() 996 except: 997 if self._daemonic and _sys is None: 998 return 999 raise 1000 1001 def _set_ident(self): 1002 self._ident = get_ident() 1003 1004 if _HAVE_THREAD_NATIVE_ID: 1005 def _set_native_id(self): 1006 self._native_id = get_native_id() 1007 1008 def _set_tstate_lock(self): 1009 """ 1010 Set a lock object which will be released by the interpreter when 1011 the underlying thread state (see pystate.h) gets deleted. 1012 """ 1013 self._tstate_lock = _set_sentinel() 1014 self._tstate_lock.acquire() 1015 1016 if not self.daemon: 1017 with _shutdown_locks_lock: 1018 _maintain_shutdown_locks() 1019 _shutdown_locks.add(self._tstate_lock) 1020 1021 def _bootstrap_inner(self): 1022 try: 1023 self._set_ident() 1024 self._set_tstate_lock() 1025 if _HAVE_THREAD_NATIVE_ID: 1026 self._set_native_id() 1027 self._started.set() 1028 with _active_limbo_lock: 1029 _active[self._ident] = self 1030 del _limbo[self] 1031 1032 if _trace_hook: 1033 _sys.settrace(_trace_hook) 1034 if _profile_hook: 1035 _sys.setprofile(_profile_hook) 1036 1037 try: 1038 self.run() 1039 except: 1040 self._invoke_excepthook(self) 1041 finally: 1042 self._delete() 1043 1044 def _stop(self): 1045 # After calling ._stop(), .is_alive() returns False and .join() returns 1046 # immediately. ._tstate_lock must be released before calling ._stop(). 1047 # 1048 # Normal case: C code at the end of the thread's life 1049 # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and 1050 # that's detected by our ._wait_for_tstate_lock(), called by .join() 1051 # and .is_alive(). Any number of threads _may_ call ._stop() 1052 # simultaneously (for example, if multiple threads are blocked in 1053 # .join() calls), and they're not serialized. That's harmless - 1054 # they'll just make redundant rebindings of ._is_stopped and 1055 # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the 1056 # "assert self._is_stopped" in ._wait_for_tstate_lock() always works 1057 # (the assert is executed only if ._tstate_lock is None). 1058 # 1059 # Special case: _main_thread releases ._tstate_lock via this 1060 # module's _shutdown() function. 1061 lock = self._tstate_lock 1062 if lock is not None: 1063 assert not lock.locked() 1064 self._is_stopped = True 1065 self._tstate_lock = None 1066 if not self.daemon: 1067 with _shutdown_locks_lock: 1068 # Remove our lock and other released locks from _shutdown_locks 1069 _maintain_shutdown_locks() 1070 1071 def _delete(self): 1072 "Remove current thread from the dict of currently running threads." 1073 with _active_limbo_lock: 1074 del _active[get_ident()] 1075 # There must not be any python code between the previous line 1076 # and after the lock is released. Otherwise a tracing function 1077 # could try to acquire the lock again in the same thread, (in 1078 # current_thread()), and would block. 1079 1080 def join(self, timeout=None): 1081 """Wait until the thread terminates. 1082 1083 This blocks the calling thread until the thread whose join() method is 1084 called terminates -- either normally or through an unhandled exception 1085 or until the optional timeout occurs. 1086 1087 When the timeout argument is present and not None, it should be a 1088 floating point number specifying a timeout for the operation in seconds 1089 (or fractions thereof). As join() always returns None, you must call 1090 is_alive() after join() to decide whether a timeout happened -- if the 1091 thread is still alive, the join() call timed out. 1092 1093 When the timeout argument is not present or None, the operation will 1094 block until the thread terminates. 1095 1096 A thread can be join()ed many times. 1097 1098 join() raises a RuntimeError if an attempt is made to join the current 1099 thread as that would cause a deadlock. It is also an error to join() a 1100 thread before it has been started and attempts to do so raises the same 1101 exception. 1102 1103 """ 1104 if not self._initialized: 1105 raise RuntimeError("Thread.__init__() not called") 1106 if not self._started.is_set(): 1107 raise RuntimeError("cannot join thread before it is started") 1108 if self is current_thread(): 1109 raise RuntimeError("cannot join current thread") 1110 1111 if timeout is None: 1112 self._wait_for_tstate_lock() 1113 else: 1114 # the behavior of a negative timeout isn't documented, but 1115 # historically .join(timeout=x) for x<0 has acted as if timeout=0 1116 self._wait_for_tstate_lock(timeout=max(timeout, 0)) 1117 1118 def _wait_for_tstate_lock(self, block=True, timeout=-1): 1119 # Issue #18808: wait for the thread state to be gone. 1120 # At the end of the thread's life, after all knowledge of the thread 1121 # is removed from C data structures, C code releases our _tstate_lock. 1122 # This method passes its arguments to _tstate_lock.acquire(). 1123 # If the lock is acquired, the C code is done, and self._stop() is 1124 # called. That sets ._is_stopped to True, and ._tstate_lock to None. 1125 lock = self._tstate_lock 1126 if lock is None: 1127 # already determined that the C code is done 1128 assert self._is_stopped 1129 return 1130 1131 try: 1132 if lock.acquire(block, timeout): 1133 lock.release() 1134 self._stop() 1135 except: 1136 if lock.locked(): 1137 # bpo-45274: lock.acquire() acquired the lock, but the function 1138 # was interrupted with an exception before reaching the 1139 # lock.release(). It can happen if a signal handler raises an 1140 # exception, like CTRL+C which raises KeyboardInterrupt. 1141 lock.release() 1142 self._stop() 1143 raise 1144 1145 @property 1146 def name(self): 1147 """A string used for identification purposes only. 1148 1149 It has no semantics. Multiple threads may be given the same name. The 1150 initial name is set by the constructor. 1151 1152 """ 1153 assert self._initialized, "Thread.__init__() not called" 1154 return self._name 1155 1156 @name.setter 1157 def name(self, name): 1158 assert self._initialized, "Thread.__init__() not called" 1159 self._name = str(name) 1160 1161 @property 1162 def ident(self): 1163 """Thread identifier of this thread or None if it has not been started. 1164 1165 This is a nonzero integer. See the get_ident() function. Thread 1166 identifiers may be recycled when a thread exits and another thread is 1167 created. The identifier is available even after the thread has exited. 1168 1169 """ 1170 assert self._initialized, "Thread.__init__() not called" 1171 return self._ident 1172 1173 if _HAVE_THREAD_NATIVE_ID: 1174 @property 1175 def native_id(self): 1176 """Native integral thread ID of this thread, or None if it has not been started. 1177 1178 This is a non-negative integer. See the get_native_id() function. 1179 This represents the Thread ID as reported by the kernel. 1180 1181 """ 1182 assert self._initialized, "Thread.__init__() not called" 1183 return self._native_id 1184 1185 def is_alive(self): 1186 """Return whether the thread is alive. 1187 1188 This method returns True just before the run() method starts until just 1189 after the run() method terminates. See also the module function 1190 enumerate(). 1191 1192 """ 1193 assert self._initialized, "Thread.__init__() not called" 1194 if self._is_stopped or not self._started.is_set(): 1195 return False 1196 self._wait_for_tstate_lock(False) 1197 return not self._is_stopped 1198 1199 @property 1200 def daemon(self): 1201 """A boolean value indicating whether this thread is a daemon thread. 1202 1203 This must be set before start() is called, otherwise RuntimeError is 1204 raised. Its initial value is inherited from the creating thread; the 1205 main thread is not a daemon thread and therefore all threads created in 1206 the main thread default to daemon = False. 1207 1208 The entire Python program exits when only daemon threads are left. 1209 1210 """ 1211 assert self._initialized, "Thread.__init__() not called" 1212 return self._daemonic 1213 1214 @daemon.setter 1215 def daemon(self, daemonic): 1216 if not self._initialized: 1217 raise RuntimeError("Thread.__init__() not called") 1218 if self._started.is_set(): 1219 raise RuntimeError("cannot set daemon status of active thread") 1220 self._daemonic = daemonic 1221 1222 def isDaemon(self): 1223 """Return whether this thread is a daemon. 1224 1225 This method is deprecated, use the daemon attribute instead. 1226 1227 """ 1228 import warnings 1229 warnings.warn('isDaemon() is deprecated, get the daemon attribute instead', 1230 DeprecationWarning, stacklevel=2) 1231 return self.daemon 1232 1233 def setDaemon(self, daemonic): 1234 """Set whether this thread is a daemon. 1235 1236 This method is deprecated, use the .daemon property instead. 1237 1238 """ 1239 import warnings 1240 warnings.warn('setDaemon() is deprecated, set the daemon attribute instead', 1241 DeprecationWarning, stacklevel=2) 1242 self.daemon = daemonic 1243 1244 def getName(self): 1245 """Return a string used for identification purposes only. 1246 1247 This method is deprecated, use the name attribute instead. 1248 1249 """ 1250 import warnings 1251 warnings.warn('getName() is deprecated, get the name attribute instead', 1252 DeprecationWarning, stacklevel=2) 1253 return self.name 1254 1255 def setName(self, name): 1256 """Set the name string for this thread. 1257 1258 This method is deprecated, use the name attribute instead. 1259 1260 """ 1261 import warnings 1262 warnings.warn('setName() is deprecated, set the name attribute instead', 1263 DeprecationWarning, stacklevel=2) 1264 self.name = name 1265 1266 1267try: 1268 from _thread import (_excepthook as excepthook, 1269 _ExceptHookArgs as ExceptHookArgs) 1270except ImportError: 1271 # Simple Python implementation if _thread._excepthook() is not available 1272 from traceback import print_exception as _print_exception 1273 from collections import namedtuple 1274 1275 _ExceptHookArgs = namedtuple( 1276 'ExceptHookArgs', 1277 'exc_type exc_value exc_traceback thread') 1278 1279 def ExceptHookArgs(args): 1280 return _ExceptHookArgs(*args) 1281 1282 def excepthook(args, /): 1283 """ 1284 Handle uncaught Thread.run() exception. 1285 """ 1286 if args.exc_type == SystemExit: 1287 # silently ignore SystemExit 1288 return 1289 1290 if _sys is not None and _sys.stderr is not None: 1291 stderr = _sys.stderr 1292 elif args.thread is not None: 1293 stderr = args.thread._stderr 1294 if stderr is None: 1295 # do nothing if sys.stderr is None and sys.stderr was None 1296 # when the thread was created 1297 return 1298 else: 1299 # do nothing if sys.stderr is None and args.thread is None 1300 return 1301 1302 if args.thread is not None: 1303 name = args.thread.name 1304 else: 1305 name = get_ident() 1306 print(f"Exception in thread {name}:", 1307 file=stderr, flush=True) 1308 _print_exception(args.exc_type, args.exc_value, args.exc_traceback, 1309 file=stderr) 1310 stderr.flush() 1311 1312 1313# Original value of threading.excepthook 1314__excepthook__ = excepthook 1315 1316 1317def _make_invoke_excepthook(): 1318 # Create a local namespace to ensure that variables remain alive 1319 # when _invoke_excepthook() is called, even if it is called late during 1320 # Python shutdown. It is mostly needed for daemon threads. 1321 1322 old_excepthook = excepthook 1323 old_sys_excepthook = _sys.excepthook 1324 if old_excepthook is None: 1325 raise RuntimeError("threading.excepthook is None") 1326 if old_sys_excepthook is None: 1327 raise RuntimeError("sys.excepthook is None") 1328 1329 sys_exc_info = _sys.exc_info 1330 local_print = print 1331 local_sys = _sys 1332 1333 def invoke_excepthook(thread): 1334 global excepthook 1335 try: 1336 hook = excepthook 1337 if hook is None: 1338 hook = old_excepthook 1339 1340 args = ExceptHookArgs([*sys_exc_info(), thread]) 1341 1342 hook(args) 1343 except Exception as exc: 1344 exc.__suppress_context__ = True 1345 del exc 1346 1347 if local_sys is not None and local_sys.stderr is not None: 1348 stderr = local_sys.stderr 1349 else: 1350 stderr = thread._stderr 1351 1352 local_print("Exception in threading.excepthook:", 1353 file=stderr, flush=True) 1354 1355 if local_sys is not None and local_sys.excepthook is not None: 1356 sys_excepthook = local_sys.excepthook 1357 else: 1358 sys_excepthook = old_sys_excepthook 1359 1360 sys_excepthook(*sys_exc_info()) 1361 finally: 1362 # Break reference cycle (exception stored in a variable) 1363 args = None 1364 1365 return invoke_excepthook 1366 1367 1368# The timer class was contributed by Itamar Shtull-Trauring 1369 1370class Timer(Thread): 1371 """Call a function after a specified number of seconds: 1372 1373 t = Timer(30.0, f, args=None, kwargs=None) 1374 t.start() 1375 t.cancel() # stop the timer's action if it's still waiting 1376 1377 """ 1378 1379 def __init__(self, interval, function, args=None, kwargs=None): 1380 Thread.__init__(self) 1381 self.interval = interval 1382 self.function = function 1383 self.args = args if args is not None else [] 1384 self.kwargs = kwargs if kwargs is not None else {} 1385 self.finished = Event() 1386 1387 def cancel(self): 1388 """Stop the timer if it hasn't finished yet.""" 1389 self.finished.set() 1390 1391 def run(self): 1392 self.finished.wait(self.interval) 1393 if not self.finished.is_set(): 1394 self.function(*self.args, **self.kwargs) 1395 self.finished.set() 1396 1397 1398# Special thread class to represent the main thread 1399 1400class _MainThread(Thread): 1401 1402 def __init__(self): 1403 Thread.__init__(self, name="MainThread", daemon=False) 1404 self._set_tstate_lock() 1405 self._started.set() 1406 self._set_ident() 1407 if _HAVE_THREAD_NATIVE_ID: 1408 self._set_native_id() 1409 with _active_limbo_lock: 1410 _active[self._ident] = self 1411 1412 1413# Dummy thread class to represent threads not started here. 1414# These aren't garbage collected when they die, nor can they be waited for. 1415# If they invoke anything in threading.py that calls current_thread(), they 1416# leave an entry in the _active dict forever after. 1417# Their purpose is to return *something* from current_thread(). 1418# They are marked as daemon threads so we won't wait for them 1419# when we exit (conform previous semantics). 1420 1421class _DummyThread(Thread): 1422 1423 def __init__(self): 1424 Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) 1425 1426 self._started.set() 1427 self._set_ident() 1428 if _HAVE_THREAD_NATIVE_ID: 1429 self._set_native_id() 1430 with _active_limbo_lock: 1431 _active[self._ident] = self 1432 1433 def _stop(self): 1434 pass 1435 1436 def is_alive(self): 1437 assert not self._is_stopped and self._started.is_set() 1438 return True 1439 1440 def join(self, timeout=None): 1441 assert False, "cannot join a dummy thread" 1442 1443 1444# Global API functions 1445 1446def current_thread(): 1447 """Return the current Thread object, corresponding to the caller's thread of control. 1448 1449 If the caller's thread of control was not created through the threading 1450 module, a dummy thread object with limited functionality is returned. 1451 1452 """ 1453 try: 1454 return _active[get_ident()] 1455 except KeyError: 1456 return _DummyThread() 1457 1458def currentThread(): 1459 """Return the current Thread object, corresponding to the caller's thread of control. 1460 1461 This function is deprecated, use current_thread() instead. 1462 1463 """ 1464 import warnings 1465 warnings.warn('currentThread() is deprecated, use current_thread() instead', 1466 DeprecationWarning, stacklevel=2) 1467 return current_thread() 1468 1469def active_count(): 1470 """Return the number of Thread objects currently alive. 1471 1472 The returned count is equal to the length of the list returned by 1473 enumerate(). 1474 1475 """ 1476 with _active_limbo_lock: 1477 return len(_active) + len(_limbo) 1478 1479def activeCount(): 1480 """Return the number of Thread objects currently alive. 1481 1482 This function is deprecated, use active_count() instead. 1483 1484 """ 1485 import warnings 1486 warnings.warn('activeCount() is deprecated, use active_count() instead', 1487 DeprecationWarning, stacklevel=2) 1488 return active_count() 1489 1490def _enumerate(): 1491 # Same as enumerate(), but without the lock. Internal use only. 1492 return list(_active.values()) + list(_limbo.values()) 1493 1494def enumerate(): 1495 """Return a list of all Thread objects currently alive. 1496 1497 The list includes daemonic threads, dummy thread objects created by 1498 current_thread(), and the main thread. It excludes terminated threads and 1499 threads that have not yet been started. 1500 1501 """ 1502 with _active_limbo_lock: 1503 return list(_active.values()) + list(_limbo.values()) 1504 1505 1506_threading_atexits = [] 1507_SHUTTING_DOWN = False 1508 1509def _register_atexit(func, *arg, **kwargs): 1510 """CPython internal: register *func* to be called before joining threads. 1511 1512 The registered *func* is called with its arguments just before all 1513 non-daemon threads are joined in `_shutdown()`. It provides a similar 1514 purpose to `atexit.register()`, but its functions are called prior to 1515 threading shutdown instead of interpreter shutdown. 1516 1517 For similarity to atexit, the registered functions are called in reverse. 1518 """ 1519 if _SHUTTING_DOWN: 1520 raise RuntimeError("can't register atexit after shutdown") 1521 1522 call = functools.partial(func, *arg, **kwargs) 1523 _threading_atexits.append(call) 1524 1525 1526from _thread import stack_size 1527 1528# Create the main thread object, 1529# and make it available for the interpreter 1530# (Py_Main) as threading._shutdown. 1531 1532_main_thread = _MainThread() 1533 1534def _shutdown(): 1535 """ 1536 Wait until the Python thread state of all non-daemon threads get deleted. 1537 """ 1538 # Obscure: other threads may be waiting to join _main_thread. That's 1539 # dubious, but some code does it. We can't wait for C code to release 1540 # the main thread's tstate_lock - that won't happen until the interpreter 1541 # is nearly dead. So we release it here. Note that just calling _stop() 1542 # isn't enough: other threads may already be waiting on _tstate_lock. 1543 if _main_thread._is_stopped: 1544 # _shutdown() was already called 1545 return 1546 1547 global _SHUTTING_DOWN 1548 _SHUTTING_DOWN = True 1549 1550 # Call registered threading atexit functions before threads are joined. 1551 # Order is reversed, similar to atexit. 1552 for atexit_call in reversed(_threading_atexits): 1553 atexit_call() 1554 1555 # Main thread 1556 if _main_thread.ident == get_ident(): 1557 tlock = _main_thread._tstate_lock 1558 # The main thread isn't finished yet, so its thread state lock can't 1559 # have been released. 1560 assert tlock is not None 1561 assert tlock.locked() 1562 tlock.release() 1563 _main_thread._stop() 1564 else: 1565 # bpo-1596321: _shutdown() must be called in the main thread. 1566 # If the threading module was not imported by the main thread, 1567 # _main_thread is the thread which imported the threading module. 1568 # In this case, ignore _main_thread, similar behavior than for threads 1569 # spawned by C libraries or using _thread.start_new_thread(). 1570 pass 1571 1572 # Join all non-deamon threads 1573 while True: 1574 with _shutdown_locks_lock: 1575 locks = list(_shutdown_locks) 1576 _shutdown_locks.clear() 1577 1578 if not locks: 1579 break 1580 1581 for lock in locks: 1582 # mimic Thread.join() 1583 lock.acquire() 1584 lock.release() 1585 1586 # new threads can be spawned while we were waiting for the other 1587 # threads to complete 1588 1589 1590def main_thread(): 1591 """Return the main thread object. 1592 1593 In normal conditions, the main thread is the thread from which the 1594 Python interpreter was started. 1595 """ 1596 return _main_thread 1597 1598# get thread-local implementation, either from the thread 1599# module, or from the python fallback 1600 1601try: 1602 from _thread import _local as local 1603except ImportError: 1604 from _threading_local import local 1605 1606 1607def _after_fork(): 1608 """ 1609 Cleanup threading module state that should not exist after a fork. 1610 """ 1611 # Reset _active_limbo_lock, in case we forked while the lock was held 1612 # by another (non-forked) thread. http://bugs.python.org/issue874900 1613 global _active_limbo_lock, _main_thread 1614 global _shutdown_locks_lock, _shutdown_locks 1615 _active_limbo_lock = RLock() 1616 1617 # fork() only copied the current thread; clear references to others. 1618 new_active = {} 1619 1620 try: 1621 current = _active[get_ident()] 1622 except KeyError: 1623 # fork() was called in a thread which was not spawned 1624 # by threading.Thread. For example, a thread spawned 1625 # by thread.start_new_thread(). 1626 current = _MainThread() 1627 1628 _main_thread = current 1629 1630 # reset _shutdown() locks: threads re-register their _tstate_lock below 1631 _shutdown_locks_lock = _allocate_lock() 1632 _shutdown_locks = set() 1633 1634 with _active_limbo_lock: 1635 # Dangling thread instances must still have their locks reset, 1636 # because someone may join() them. 1637 threads = set(_enumerate()) 1638 threads.update(_dangling) 1639 for thread in threads: 1640 # Any lock/condition variable may be currently locked or in an 1641 # invalid state, so we reinitialize them. 1642 if thread is current: 1643 # There is only one active thread. We reset the ident to 1644 # its new value since it can have changed. 1645 thread._reset_internal_locks(True) 1646 ident = get_ident() 1647 thread._ident = ident 1648 new_active[ident] = thread 1649 else: 1650 # All the others are already stopped. 1651 thread._reset_internal_locks(False) 1652 thread._stop() 1653 1654 _limbo.clear() 1655 _active.clear() 1656 _active.update(new_active) 1657 assert len(_active) == 1 1658 1659 1660if hasattr(_os, "register_at_fork"): 1661 _os.register_at_fork(after_in_child=_after_fork) 1662