1import os 2import sys 3import threading 4 5from . import process 6from . import reduction 7 8__all__ = () 9 10# 11# Exceptions 12# 13 14class ProcessError(Exception): 15 pass 16 17class BufferTooShort(ProcessError): 18 pass 19 20class TimeoutError(ProcessError): 21 pass 22 23class AuthenticationError(ProcessError): 24 pass 25 26# 27# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py 28# 29 30class BaseContext(object): 31 32 ProcessError = ProcessError 33 BufferTooShort = BufferTooShort 34 TimeoutError = TimeoutError 35 AuthenticationError = AuthenticationError 36 37 current_process = staticmethod(process.current_process) 38 parent_process = staticmethod(process.parent_process) 39 active_children = staticmethod(process.active_children) 40 41 def cpu_count(self): 42 '''Returns the number of CPUs in the system''' 43 num = os.cpu_count() 44 if num is None: 45 raise NotImplementedError('cannot determine number of cpus') 46 else: 47 return num 48 49 def Manager(self): 50 '''Returns a manager associated with a running server process 51 52 The managers methods such as `Lock()`, `Condition()` and `Queue()` 53 can be used to create shared objects. 54 ''' 55 from .managers import SyncManager 56 m = SyncManager(ctx=self.get_context()) 57 m.start() 58 return m 59 60 def Pipe(self, duplex=True): 61 '''Returns two connection object connected by a pipe''' 62 from .connection import Pipe 63 return Pipe(duplex) 64 65 def Lock(self): 66 '''Returns a non-recursive lock object''' 67 from .synchronize import Lock 68 return Lock(ctx=self.get_context()) 69 70 def RLock(self): 71 '''Returns a recursive lock object''' 72 from .synchronize import RLock 73 return RLock(ctx=self.get_context()) 74 75 def Condition(self, lock=None): 76 '''Returns a condition object''' 77 from .synchronize import Condition 78 return Condition(lock, ctx=self.get_context()) 79 80 def Semaphore(self, value=1): 81 '''Returns a semaphore object''' 82 from .synchronize import Semaphore 83 return Semaphore(value, ctx=self.get_context()) 84 85 def BoundedSemaphore(self, value=1): 86 '''Returns a bounded semaphore object''' 87 from .synchronize import BoundedSemaphore 88 return BoundedSemaphore(value, ctx=self.get_context()) 89 90 def Event(self): 91 '''Returns an event object''' 92 from .synchronize import Event 93 return Event(ctx=self.get_context()) 94 95 def Barrier(self, parties, action=None, timeout=None): 96 '''Returns a barrier object''' 97 from .synchronize import Barrier 98 return Barrier(parties, action, timeout, ctx=self.get_context()) 99 100 def Queue(self, maxsize=0): 101 '''Returns a queue object''' 102 from .queues import Queue 103 return Queue(maxsize, ctx=self.get_context()) 104 105 def JoinableQueue(self, maxsize=0): 106 '''Returns a queue object''' 107 from .queues import JoinableQueue 108 return JoinableQueue(maxsize, ctx=self.get_context()) 109 110 def SimpleQueue(self): 111 '''Returns a queue object''' 112 from .queues import SimpleQueue 113 return SimpleQueue(ctx=self.get_context()) 114 115 def Pool(self, processes=None, initializer=None, initargs=(), 116 maxtasksperchild=None): 117 '''Returns a process pool object''' 118 from .pool import Pool 119 return Pool(processes, initializer, initargs, maxtasksperchild, 120 context=self.get_context()) 121 122 def RawValue(self, typecode_or_type, *args): 123 '''Returns a shared object''' 124 from .sharedctypes import RawValue 125 return RawValue(typecode_or_type, *args) 126 127 def RawArray(self, typecode_or_type, size_or_initializer): 128 '''Returns a shared array''' 129 from .sharedctypes import RawArray 130 return RawArray(typecode_or_type, size_or_initializer) 131 132 def Value(self, typecode_or_type, *args, lock=True): 133 '''Returns a synchronized shared object''' 134 from .sharedctypes import Value 135 return Value(typecode_or_type, *args, lock=lock, 136 ctx=self.get_context()) 137 138 def Array(self, typecode_or_type, size_or_initializer, *, lock=True): 139 '''Returns a synchronized shared array''' 140 from .sharedctypes import Array 141 return Array(typecode_or_type, size_or_initializer, lock=lock, 142 ctx=self.get_context()) 143 144 def freeze_support(self): 145 '''Check whether this is a fake forked process in a frozen executable. 146 If so then run code specified by commandline and exit. 147 ''' 148 if sys.platform == 'win32' and getattr(sys, 'frozen', False): 149 from .spawn import freeze_support 150 freeze_support() 151 152 def get_logger(self): 153 '''Return package logger -- if it does not already exist then 154 it is created. 155 ''' 156 from .util import get_logger 157 return get_logger() 158 159 def log_to_stderr(self, level=None): 160 '''Turn on logging and add a handler which prints to stderr''' 161 from .util import log_to_stderr 162 return log_to_stderr(level) 163 164 def allow_connection_pickling(self): 165 '''Install support for sending connections and sockets 166 between processes 167 ''' 168 # This is undocumented. In previous versions of multiprocessing 169 # its only effect was to make socket objects inheritable on Windows. 170 from . import connection 171 172 def set_executable(self, executable): 173 '''Sets the path to a python.exe or pythonw.exe binary used to run 174 child processes instead of sys.executable when using the 'spawn' 175 start method. Useful for people embedding Python. 176 ''' 177 from .spawn import set_executable 178 set_executable(executable) 179 180 def set_forkserver_preload(self, module_names): 181 '''Set list of module names to try to load in forkserver process. 182 This is really just a hint. 183 ''' 184 from .forkserver import set_forkserver_preload 185 set_forkserver_preload(module_names) 186 187 def get_context(self, method=None): 188 if method is None: 189 return self 190 try: 191 ctx = _concrete_contexts[method] 192 except KeyError: 193 raise ValueError('cannot find context for %r' % method) from None 194 ctx._check_available() 195 return ctx 196 197 def get_start_method(self, allow_none=False): 198 return self._name 199 200 def set_start_method(self, method, force=False): 201 raise ValueError('cannot set start method of concrete context') 202 203 @property 204 def reducer(self): 205 '''Controls how objects will be reduced to a form that can be 206 shared with other processes.''' 207 return globals().get('reduction') 208 209 @reducer.setter 210 def reducer(self, reduction): 211 globals()['reduction'] = reduction 212 213 def _check_available(self): 214 pass 215 216# 217# Type of default context -- underlying context can be set at most once 218# 219 220class Process(process.BaseProcess): 221 _start_method = None 222 @staticmethod 223 def _Popen(process_obj): 224 return _default_context.get_context().Process._Popen(process_obj) 225 226 @staticmethod 227 def _after_fork(): 228 return _default_context.get_context().Process._after_fork() 229 230class DefaultContext(BaseContext): 231 Process = Process 232 233 def __init__(self, context): 234 self._default_context = context 235 self._actual_context = None 236 237 def get_context(self, method=None): 238 if method is None: 239 if self._actual_context is None: 240 self._actual_context = self._default_context 241 return self._actual_context 242 else: 243 return super().get_context(method) 244 245 def set_start_method(self, method, force=False): 246 if self._actual_context is not None and not force: 247 raise RuntimeError('context has already been set') 248 if method is None and force: 249 self._actual_context = None 250 return 251 self._actual_context = self.get_context(method) 252 253 def get_start_method(self, allow_none=False): 254 if self._actual_context is None: 255 if allow_none: 256 return None 257 self._actual_context = self._default_context 258 return self._actual_context._name 259 260 def get_all_start_methods(self): 261 if sys.platform == 'win32': 262 return ['spawn'] 263 else: 264 methods = ['spawn', 'fork'] if sys.platform == 'darwin' else ['fork', 'spawn'] 265 if reduction.HAVE_SEND_HANDLE: 266 methods.append('forkserver') 267 return methods 268 269 270# 271# Context types for fixed start method 272# 273 274if sys.platform != 'win32': 275 276 class ForkProcess(process.BaseProcess): 277 _start_method = 'fork' 278 @staticmethod 279 def _Popen(process_obj): 280 from .popen_fork import Popen 281 return Popen(process_obj) 282 283 class SpawnProcess(process.BaseProcess): 284 _start_method = 'spawn' 285 @staticmethod 286 def _Popen(process_obj): 287 from .popen_spawn_posix import Popen 288 return Popen(process_obj) 289 290 @staticmethod 291 def _after_fork(): 292 # process is spawned, nothing to do 293 pass 294 295 class ForkServerProcess(process.BaseProcess): 296 _start_method = 'forkserver' 297 @staticmethod 298 def _Popen(process_obj): 299 from .popen_forkserver import Popen 300 return Popen(process_obj) 301 302 class ForkContext(BaseContext): 303 _name = 'fork' 304 Process = ForkProcess 305 306 class SpawnContext(BaseContext): 307 _name = 'spawn' 308 Process = SpawnProcess 309 310 class ForkServerContext(BaseContext): 311 _name = 'forkserver' 312 Process = ForkServerProcess 313 def _check_available(self): 314 if not reduction.HAVE_SEND_HANDLE: 315 raise ValueError('forkserver start method not available') 316 317 _concrete_contexts = { 318 'fork': ForkContext(), 319 'spawn': SpawnContext(), 320 'forkserver': ForkServerContext(), 321 } 322 if sys.platform == 'darwin': 323 # bpo-33725: running arbitrary code after fork() is no longer reliable 324 # on macOS since macOS 10.14 (Mojave). Use spawn by default instead. 325 _default_context = DefaultContext(_concrete_contexts['spawn']) 326 else: 327 _default_context = DefaultContext(_concrete_contexts['fork']) 328 329else: 330 331 class SpawnProcess(process.BaseProcess): 332 _start_method = 'spawn' 333 @staticmethod 334 def _Popen(process_obj): 335 from .popen_spawn_win32 import Popen 336 return Popen(process_obj) 337 338 @staticmethod 339 def _after_fork(): 340 # process is spawned, nothing to do 341 pass 342 343 class SpawnContext(BaseContext): 344 _name = 'spawn' 345 Process = SpawnProcess 346 347 _concrete_contexts = { 348 'spawn': SpawnContext(), 349 } 350 _default_context = DefaultContext(_concrete_contexts['spawn']) 351 352# 353# Force the start method 354# 355 356def _force_start_method(method): 357 _default_context._actual_context = _concrete_contexts[method] 358 359# 360# Check that the current thread is spawning a child process 361# 362 363_tls = threading.local() 364 365def get_spawning_popen(): 366 return getattr(_tls, 'spawning_popen', None) 367 368def set_spawning_popen(popen): 369 _tls.spawning_popen = popen 370 371def assert_spawning(obj): 372 if get_spawning_popen() is None: 373 raise RuntimeError( 374 '%s objects should only be shared between processes' 375 ' through inheritance' % type(obj).__name__ 376 ) 377