xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/multiprocessing/context.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1import os
2import sys
3import threading
4
5from . import process
6from . import reduction
7
8__all__ = ()
9
10#
11# Exceptions
12#
13
14class ProcessError(Exception):
15    pass
16
17class BufferTooShort(ProcessError):
18    pass
19
20class TimeoutError(ProcessError):
21    pass
22
23class AuthenticationError(ProcessError):
24    pass
25
26#
27# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
28#
29
30class BaseContext(object):
31
32    ProcessError = ProcessError
33    BufferTooShort = BufferTooShort
34    TimeoutError = TimeoutError
35    AuthenticationError = AuthenticationError
36
37    current_process = staticmethod(process.current_process)
38    parent_process = staticmethod(process.parent_process)
39    active_children = staticmethod(process.active_children)
40
41    def cpu_count(self):
42        '''Returns the number of CPUs in the system'''
43        num = os.cpu_count()
44        if num is None:
45            raise NotImplementedError('cannot determine number of cpus')
46        else:
47            return num
48
49    def Manager(self):
50        '''Returns a manager associated with a running server process
51
52        The managers methods such as `Lock()`, `Condition()` and `Queue()`
53        can be used to create shared objects.
54        '''
55        from .managers import SyncManager
56        m = SyncManager(ctx=self.get_context())
57        m.start()
58        return m
59
60    def Pipe(self, duplex=True):
61        '''Returns two connection object connected by a pipe'''
62        from .connection import Pipe
63        return Pipe(duplex)
64
65    def Lock(self):
66        '''Returns a non-recursive lock object'''
67        from .synchronize import Lock
68        return Lock(ctx=self.get_context())
69
70    def RLock(self):
71        '''Returns a recursive lock object'''
72        from .synchronize import RLock
73        return RLock(ctx=self.get_context())
74
75    def Condition(self, lock=None):
76        '''Returns a condition object'''
77        from .synchronize import Condition
78        return Condition(lock, ctx=self.get_context())
79
80    def Semaphore(self, value=1):
81        '''Returns a semaphore object'''
82        from .synchronize import Semaphore
83        return Semaphore(value, ctx=self.get_context())
84
85    def BoundedSemaphore(self, value=1):
86        '''Returns a bounded semaphore object'''
87        from .synchronize import BoundedSemaphore
88        return BoundedSemaphore(value, ctx=self.get_context())
89
90    def Event(self):
91        '''Returns an event object'''
92        from .synchronize import Event
93        return Event(ctx=self.get_context())
94
95    def Barrier(self, parties, action=None, timeout=None):
96        '''Returns a barrier object'''
97        from .synchronize import Barrier
98        return Barrier(parties, action, timeout, ctx=self.get_context())
99
100    def Queue(self, maxsize=0):
101        '''Returns a queue object'''
102        from .queues import Queue
103        return Queue(maxsize, ctx=self.get_context())
104
105    def JoinableQueue(self, maxsize=0):
106        '''Returns a queue object'''
107        from .queues import JoinableQueue
108        return JoinableQueue(maxsize, ctx=self.get_context())
109
110    def SimpleQueue(self):
111        '''Returns a queue object'''
112        from .queues import SimpleQueue
113        return SimpleQueue(ctx=self.get_context())
114
115    def Pool(self, processes=None, initializer=None, initargs=(),
116             maxtasksperchild=None):
117        '''Returns a process pool object'''
118        from .pool import Pool
119        return Pool(processes, initializer, initargs, maxtasksperchild,
120                    context=self.get_context())
121
122    def RawValue(self, typecode_or_type, *args):
123        '''Returns a shared object'''
124        from .sharedctypes import RawValue
125        return RawValue(typecode_or_type, *args)
126
127    def RawArray(self, typecode_or_type, size_or_initializer):
128        '''Returns a shared array'''
129        from .sharedctypes import RawArray
130        return RawArray(typecode_or_type, size_or_initializer)
131
132    def Value(self, typecode_or_type, *args, lock=True):
133        '''Returns a synchronized shared object'''
134        from .sharedctypes import Value
135        return Value(typecode_or_type, *args, lock=lock,
136                     ctx=self.get_context())
137
138    def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
139        '''Returns a synchronized shared array'''
140        from .sharedctypes import Array
141        return Array(typecode_or_type, size_or_initializer, lock=lock,
142                     ctx=self.get_context())
143
144    def freeze_support(self):
145        '''Check whether this is a fake forked process in a frozen executable.
146        If so then run code specified by commandline and exit.
147        '''
148        if sys.platform == 'win32' and getattr(sys, 'frozen', False):
149            from .spawn import freeze_support
150            freeze_support()
151
152    def get_logger(self):
153        '''Return package logger -- if it does not already exist then
154        it is created.
155        '''
156        from .util import get_logger
157        return get_logger()
158
159    def log_to_stderr(self, level=None):
160        '''Turn on logging and add a handler which prints to stderr'''
161        from .util import log_to_stderr
162        return log_to_stderr(level)
163
164    def allow_connection_pickling(self):
165        '''Install support for sending connections and sockets
166        between processes
167        '''
168        # This is undocumented.  In previous versions of multiprocessing
169        # its only effect was to make socket objects inheritable on Windows.
170        from . import connection
171
172    def set_executable(self, executable):
173        '''Sets the path to a python.exe or pythonw.exe binary used to run
174        child processes instead of sys.executable when using the 'spawn'
175        start method.  Useful for people embedding Python.
176        '''
177        from .spawn import set_executable
178        set_executable(executable)
179
180    def set_forkserver_preload(self, module_names):
181        '''Set list of module names to try to load in forkserver process.
182        This is really just a hint.
183        '''
184        from .forkserver import set_forkserver_preload
185        set_forkserver_preload(module_names)
186
187    def get_context(self, method=None):
188        if method is None:
189            return self
190        try:
191            ctx = _concrete_contexts[method]
192        except KeyError:
193            raise ValueError('cannot find context for %r' % method) from None
194        ctx._check_available()
195        return ctx
196
197    def get_start_method(self, allow_none=False):
198        return self._name
199
200    def set_start_method(self, method, force=False):
201        raise ValueError('cannot set start method of concrete context')
202
203    @property
204    def reducer(self):
205        '''Controls how objects will be reduced to a form that can be
206        shared with other processes.'''
207        return globals().get('reduction')
208
209    @reducer.setter
210    def reducer(self, reduction):
211        globals()['reduction'] = reduction
212
213    def _check_available(self):
214        pass
215
216#
217# Type of default context -- underlying context can be set at most once
218#
219
220class Process(process.BaseProcess):
221    _start_method = None
222    @staticmethod
223    def _Popen(process_obj):
224        return _default_context.get_context().Process._Popen(process_obj)
225
226    @staticmethod
227    def _after_fork():
228        return _default_context.get_context().Process._after_fork()
229
230class DefaultContext(BaseContext):
231    Process = Process
232
233    def __init__(self, context):
234        self._default_context = context
235        self._actual_context = None
236
237    def get_context(self, method=None):
238        if method is None:
239            if self._actual_context is None:
240                self._actual_context = self._default_context
241            return self._actual_context
242        else:
243            return super().get_context(method)
244
245    def set_start_method(self, method, force=False):
246        if self._actual_context is not None and not force:
247            raise RuntimeError('context has already been set')
248        if method is None and force:
249            self._actual_context = None
250            return
251        self._actual_context = self.get_context(method)
252
253    def get_start_method(self, allow_none=False):
254        if self._actual_context is None:
255            if allow_none:
256                return None
257            self._actual_context = self._default_context
258        return self._actual_context._name
259
260    def get_all_start_methods(self):
261        if sys.platform == 'win32':
262            return ['spawn']
263        else:
264            methods = ['spawn', 'fork'] if sys.platform == 'darwin' else ['fork', 'spawn']
265            if reduction.HAVE_SEND_HANDLE:
266                methods.append('forkserver')
267            return methods
268
269
270#
271# Context types for fixed start method
272#
273
274if sys.platform != 'win32':
275
276    class ForkProcess(process.BaseProcess):
277        _start_method = 'fork'
278        @staticmethod
279        def _Popen(process_obj):
280            from .popen_fork import Popen
281            return Popen(process_obj)
282
283    class SpawnProcess(process.BaseProcess):
284        _start_method = 'spawn'
285        @staticmethod
286        def _Popen(process_obj):
287            from .popen_spawn_posix import Popen
288            return Popen(process_obj)
289
290        @staticmethod
291        def _after_fork():
292            # process is spawned, nothing to do
293            pass
294
295    class ForkServerProcess(process.BaseProcess):
296        _start_method = 'forkserver'
297        @staticmethod
298        def _Popen(process_obj):
299            from .popen_forkserver import Popen
300            return Popen(process_obj)
301
302    class ForkContext(BaseContext):
303        _name = 'fork'
304        Process = ForkProcess
305
306    class SpawnContext(BaseContext):
307        _name = 'spawn'
308        Process = SpawnProcess
309
310    class ForkServerContext(BaseContext):
311        _name = 'forkserver'
312        Process = ForkServerProcess
313        def _check_available(self):
314            if not reduction.HAVE_SEND_HANDLE:
315                raise ValueError('forkserver start method not available')
316
317    _concrete_contexts = {
318        'fork': ForkContext(),
319        'spawn': SpawnContext(),
320        'forkserver': ForkServerContext(),
321    }
322    if sys.platform == 'darwin':
323        # bpo-33725: running arbitrary code after fork() is no longer reliable
324        # on macOS since macOS 10.14 (Mojave). Use spawn by default instead.
325        _default_context = DefaultContext(_concrete_contexts['spawn'])
326    else:
327        _default_context = DefaultContext(_concrete_contexts['fork'])
328
329else:
330
331    class SpawnProcess(process.BaseProcess):
332        _start_method = 'spawn'
333        @staticmethod
334        def _Popen(process_obj):
335            from .popen_spawn_win32 import Popen
336            return Popen(process_obj)
337
338        @staticmethod
339        def _after_fork():
340            # process is spawned, nothing to do
341            pass
342
343    class SpawnContext(BaseContext):
344        _name = 'spawn'
345        Process = SpawnProcess
346
347    _concrete_contexts = {
348        'spawn': SpawnContext(),
349    }
350    _default_context = DefaultContext(_concrete_contexts['spawn'])
351
352#
353# Force the start method
354#
355
356def _force_start_method(method):
357    _default_context._actual_context = _concrete_contexts[method]
358
359#
360# Check that the current thread is spawning a child process
361#
362
363_tls = threading.local()
364
365def get_spawning_popen():
366    return getattr(_tls, 'spawning_popen', None)
367
368def set_spawning_popen(popen):
369    _tls.spawning_popen = popen
370
371def assert_spawning(obj):
372    if get_spawning_popen() is None:
373        raise RuntimeError(
374            '%s objects should only be shared between processes'
375            ' through inheritance' % type(obj).__name__
376            )
377