xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/multiprocessing/managers.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Worker#
2*cda5da8dSAndroid Build Coastguard Worker# Module providing manager classes for dealing
3*cda5da8dSAndroid Build Coastguard Worker# with shared objects
4*cda5da8dSAndroid Build Coastguard Worker#
5*cda5da8dSAndroid Build Coastguard Worker# multiprocessing/managers.py
6*cda5da8dSAndroid Build Coastguard Worker#
7*cda5da8dSAndroid Build Coastguard Worker# Copyright (c) 2006-2008, R Oudkerk
8*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement.
9*cda5da8dSAndroid Build Coastguard Worker#
10*cda5da8dSAndroid Build Coastguard Worker
11*cda5da8dSAndroid Build Coastguard Worker__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12*cda5da8dSAndroid Build Coastguard Worker
13*cda5da8dSAndroid Build Coastguard Worker#
14*cda5da8dSAndroid Build Coastguard Worker# Imports
15*cda5da8dSAndroid Build Coastguard Worker#
16*cda5da8dSAndroid Build Coastguard Worker
17*cda5da8dSAndroid Build Coastguard Workerimport sys
18*cda5da8dSAndroid Build Coastguard Workerimport threading
19*cda5da8dSAndroid Build Coastguard Workerimport signal
20*cda5da8dSAndroid Build Coastguard Workerimport array
21*cda5da8dSAndroid Build Coastguard Workerimport queue
22*cda5da8dSAndroid Build Coastguard Workerimport time
23*cda5da8dSAndroid Build Coastguard Workerimport types
24*cda5da8dSAndroid Build Coastguard Workerimport os
25*cda5da8dSAndroid Build Coastguard Workerfrom os import getpid
26*cda5da8dSAndroid Build Coastguard Worker
27*cda5da8dSAndroid Build Coastguard Workerfrom traceback import format_exc
28*cda5da8dSAndroid Build Coastguard Worker
29*cda5da8dSAndroid Build Coastguard Workerfrom . import connection
30*cda5da8dSAndroid Build Coastguard Workerfrom .context import reduction, get_spawning_popen, ProcessError
31*cda5da8dSAndroid Build Coastguard Workerfrom . import pool
32*cda5da8dSAndroid Build Coastguard Workerfrom . import process
33*cda5da8dSAndroid Build Coastguard Workerfrom . import util
34*cda5da8dSAndroid Build Coastguard Workerfrom . import get_context
35*cda5da8dSAndroid Build Coastguard Workertry:
36*cda5da8dSAndroid Build Coastguard Worker    from . import shared_memory
37*cda5da8dSAndroid Build Coastguard Workerexcept ImportError:
38*cda5da8dSAndroid Build Coastguard Worker    HAS_SHMEM = False
39*cda5da8dSAndroid Build Coastguard Workerelse:
40*cda5da8dSAndroid Build Coastguard Worker    HAS_SHMEM = True
41*cda5da8dSAndroid Build Coastguard Worker    __all__.append('SharedMemoryManager')
42*cda5da8dSAndroid Build Coastguard Worker
43*cda5da8dSAndroid Build Coastguard Worker#
44*cda5da8dSAndroid Build Coastguard Worker# Register some things for pickling
45*cda5da8dSAndroid Build Coastguard Worker#
46*cda5da8dSAndroid Build Coastguard Worker
47*cda5da8dSAndroid Build Coastguard Workerdef reduce_array(a):
48*cda5da8dSAndroid Build Coastguard Worker    return array.array, (a.typecode, a.tobytes())
49*cda5da8dSAndroid Build Coastguard Workerreduction.register(array.array, reduce_array)
50*cda5da8dSAndroid Build Coastguard Worker
51*cda5da8dSAndroid Build Coastguard Workerview_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
52*cda5da8dSAndroid Build Coastguard Workerdef rebuild_as_list(obj):
53*cda5da8dSAndroid Build Coastguard Worker    return list, (list(obj),)
54*cda5da8dSAndroid Build Coastguard Workerfor view_type in view_types:
55*cda5da8dSAndroid Build Coastguard Worker    reduction.register(view_type, rebuild_as_list)
56*cda5da8dSAndroid Build Coastguard Workerdel view_type, view_types
57*cda5da8dSAndroid Build Coastguard Worker
58*cda5da8dSAndroid Build Coastguard Worker#
59*cda5da8dSAndroid Build Coastguard Worker# Type for identifying shared objects
60*cda5da8dSAndroid Build Coastguard Worker#
61*cda5da8dSAndroid Build Coastguard Worker
62*cda5da8dSAndroid Build Coastguard Workerclass Token(object):
63*cda5da8dSAndroid Build Coastguard Worker    '''
64*cda5da8dSAndroid Build Coastguard Worker    Type to uniquely identify a shared object
65*cda5da8dSAndroid Build Coastguard Worker    '''
66*cda5da8dSAndroid Build Coastguard Worker    __slots__ = ('typeid', 'address', 'id')
67*cda5da8dSAndroid Build Coastguard Worker
68*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, typeid, address, id):
69*cda5da8dSAndroid Build Coastguard Worker        (self.typeid, self.address, self.id) = (typeid, address, id)
70*cda5da8dSAndroid Build Coastguard Worker
71*cda5da8dSAndroid Build Coastguard Worker    def __getstate__(self):
72*cda5da8dSAndroid Build Coastguard Worker        return (self.typeid, self.address, self.id)
73*cda5da8dSAndroid Build Coastguard Worker
74*cda5da8dSAndroid Build Coastguard Worker    def __setstate__(self, state):
75*cda5da8dSAndroid Build Coastguard Worker        (self.typeid, self.address, self.id) = state
76*cda5da8dSAndroid Build Coastguard Worker
77*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
78*cda5da8dSAndroid Build Coastguard Worker        return '%s(typeid=%r, address=%r, id=%r)' % \
79*cda5da8dSAndroid Build Coastguard Worker               (self.__class__.__name__, self.typeid, self.address, self.id)
80*cda5da8dSAndroid Build Coastguard Worker
81*cda5da8dSAndroid Build Coastguard Worker#
82*cda5da8dSAndroid Build Coastguard Worker# Function for communication with a manager's server process
83*cda5da8dSAndroid Build Coastguard Worker#
84*cda5da8dSAndroid Build Coastguard Worker
85*cda5da8dSAndroid Build Coastguard Workerdef dispatch(c, id, methodname, args=(), kwds={}):
86*cda5da8dSAndroid Build Coastguard Worker    '''
87*cda5da8dSAndroid Build Coastguard Worker    Send a message to manager using connection `c` and return response
88*cda5da8dSAndroid Build Coastguard Worker    '''
89*cda5da8dSAndroid Build Coastguard Worker    c.send((id, methodname, args, kwds))
90*cda5da8dSAndroid Build Coastguard Worker    kind, result = c.recv()
91*cda5da8dSAndroid Build Coastguard Worker    if kind == '#RETURN':
92*cda5da8dSAndroid Build Coastguard Worker        return result
93*cda5da8dSAndroid Build Coastguard Worker    raise convert_to_error(kind, result)
94*cda5da8dSAndroid Build Coastguard Worker
95*cda5da8dSAndroid Build Coastguard Workerdef convert_to_error(kind, result):
96*cda5da8dSAndroid Build Coastguard Worker    if kind == '#ERROR':
97*cda5da8dSAndroid Build Coastguard Worker        return result
98*cda5da8dSAndroid Build Coastguard Worker    elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
99*cda5da8dSAndroid Build Coastguard Worker        if not isinstance(result, str):
100*cda5da8dSAndroid Build Coastguard Worker            raise TypeError(
101*cda5da8dSAndroid Build Coastguard Worker                "Result {0!r} (kind '{1}') type is {2}, not str".format(
102*cda5da8dSAndroid Build Coastguard Worker                    result, kind, type(result)))
103*cda5da8dSAndroid Build Coastguard Worker        if kind == '#UNSERIALIZABLE':
104*cda5da8dSAndroid Build Coastguard Worker            return RemoteError('Unserializable message: %s\n' % result)
105*cda5da8dSAndroid Build Coastguard Worker        else:
106*cda5da8dSAndroid Build Coastguard Worker            return RemoteError(result)
107*cda5da8dSAndroid Build Coastguard Worker    else:
108*cda5da8dSAndroid Build Coastguard Worker        return ValueError('Unrecognized message type {!r}'.format(kind))
109*cda5da8dSAndroid Build Coastguard Worker
110*cda5da8dSAndroid Build Coastguard Workerclass RemoteError(Exception):
111*cda5da8dSAndroid Build Coastguard Worker    def __str__(self):
112*cda5da8dSAndroid Build Coastguard Worker        return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
113*cda5da8dSAndroid Build Coastguard Worker
114*cda5da8dSAndroid Build Coastguard Worker#
115*cda5da8dSAndroid Build Coastguard Worker# Functions for finding the method names of an object
116*cda5da8dSAndroid Build Coastguard Worker#
117*cda5da8dSAndroid Build Coastguard Worker
118*cda5da8dSAndroid Build Coastguard Workerdef all_methods(obj):
119*cda5da8dSAndroid Build Coastguard Worker    '''
120*cda5da8dSAndroid Build Coastguard Worker    Return a list of names of methods of `obj`
121*cda5da8dSAndroid Build Coastguard Worker    '''
122*cda5da8dSAndroid Build Coastguard Worker    temp = []
123*cda5da8dSAndroid Build Coastguard Worker    for name in dir(obj):
124*cda5da8dSAndroid Build Coastguard Worker        func = getattr(obj, name)
125*cda5da8dSAndroid Build Coastguard Worker        if callable(func):
126*cda5da8dSAndroid Build Coastguard Worker            temp.append(name)
127*cda5da8dSAndroid Build Coastguard Worker    return temp
128*cda5da8dSAndroid Build Coastguard Worker
129*cda5da8dSAndroid Build Coastguard Workerdef public_methods(obj):
130*cda5da8dSAndroid Build Coastguard Worker    '''
131*cda5da8dSAndroid Build Coastguard Worker    Return a list of names of methods of `obj` which do not start with '_'
132*cda5da8dSAndroid Build Coastguard Worker    '''
133*cda5da8dSAndroid Build Coastguard Worker    return [name for name in all_methods(obj) if name[0] != '_']
134*cda5da8dSAndroid Build Coastguard Worker
135*cda5da8dSAndroid Build Coastguard Worker#
136*cda5da8dSAndroid Build Coastguard Worker# Server which is run in a process controlled by a manager
137*cda5da8dSAndroid Build Coastguard Worker#
138*cda5da8dSAndroid Build Coastguard Worker
139*cda5da8dSAndroid Build Coastguard Workerclass Server(object):
140*cda5da8dSAndroid Build Coastguard Worker    '''
141*cda5da8dSAndroid Build Coastguard Worker    Server class which runs in a process controlled by a manager object
142*cda5da8dSAndroid Build Coastguard Worker    '''
143*cda5da8dSAndroid Build Coastguard Worker    public = ['shutdown', 'create', 'accept_connection', 'get_methods',
144*cda5da8dSAndroid Build Coastguard Worker              'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
145*cda5da8dSAndroid Build Coastguard Worker
146*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, registry, address, authkey, serializer):
147*cda5da8dSAndroid Build Coastguard Worker        if not isinstance(authkey, bytes):
148*cda5da8dSAndroid Build Coastguard Worker            raise TypeError(
149*cda5da8dSAndroid Build Coastguard Worker                "Authkey {0!r} is type {1!s}, not bytes".format(
150*cda5da8dSAndroid Build Coastguard Worker                    authkey, type(authkey)))
151*cda5da8dSAndroid Build Coastguard Worker        self.registry = registry
152*cda5da8dSAndroid Build Coastguard Worker        self.authkey = process.AuthenticationString(authkey)
153*cda5da8dSAndroid Build Coastguard Worker        Listener, Client = listener_client[serializer]
154*cda5da8dSAndroid Build Coastguard Worker
155*cda5da8dSAndroid Build Coastguard Worker        # do authentication later
156*cda5da8dSAndroid Build Coastguard Worker        self.listener = Listener(address=address, backlog=16)
157*cda5da8dSAndroid Build Coastguard Worker        self.address = self.listener.address
158*cda5da8dSAndroid Build Coastguard Worker
159*cda5da8dSAndroid Build Coastguard Worker        self.id_to_obj = {'0': (None, ())}
160*cda5da8dSAndroid Build Coastguard Worker        self.id_to_refcount = {}
161*cda5da8dSAndroid Build Coastguard Worker        self.id_to_local_proxy_obj = {}
162*cda5da8dSAndroid Build Coastguard Worker        self.mutex = threading.Lock()
163*cda5da8dSAndroid Build Coastguard Worker
164*cda5da8dSAndroid Build Coastguard Worker    def serve_forever(self):
165*cda5da8dSAndroid Build Coastguard Worker        '''
166*cda5da8dSAndroid Build Coastguard Worker        Run the server forever
167*cda5da8dSAndroid Build Coastguard Worker        '''
168*cda5da8dSAndroid Build Coastguard Worker        self.stop_event = threading.Event()
169*cda5da8dSAndroid Build Coastguard Worker        process.current_process()._manager_server = self
170*cda5da8dSAndroid Build Coastguard Worker        try:
171*cda5da8dSAndroid Build Coastguard Worker            accepter = threading.Thread(target=self.accepter)
172*cda5da8dSAndroid Build Coastguard Worker            accepter.daemon = True
173*cda5da8dSAndroid Build Coastguard Worker            accepter.start()
174*cda5da8dSAndroid Build Coastguard Worker            try:
175*cda5da8dSAndroid Build Coastguard Worker                while not self.stop_event.is_set():
176*cda5da8dSAndroid Build Coastguard Worker                    self.stop_event.wait(1)
177*cda5da8dSAndroid Build Coastguard Worker            except (KeyboardInterrupt, SystemExit):
178*cda5da8dSAndroid Build Coastguard Worker                pass
179*cda5da8dSAndroid Build Coastguard Worker        finally:
180*cda5da8dSAndroid Build Coastguard Worker            if sys.stdout != sys.__stdout__: # what about stderr?
181*cda5da8dSAndroid Build Coastguard Worker                util.debug('resetting stdout, stderr')
182*cda5da8dSAndroid Build Coastguard Worker                sys.stdout = sys.__stdout__
183*cda5da8dSAndroid Build Coastguard Worker                sys.stderr = sys.__stderr__
184*cda5da8dSAndroid Build Coastguard Worker            sys.exit(0)
185*cda5da8dSAndroid Build Coastguard Worker
186*cda5da8dSAndroid Build Coastguard Worker    def accepter(self):
187*cda5da8dSAndroid Build Coastguard Worker        while True:
188*cda5da8dSAndroid Build Coastguard Worker            try:
189*cda5da8dSAndroid Build Coastguard Worker                c = self.listener.accept()
190*cda5da8dSAndroid Build Coastguard Worker            except OSError:
191*cda5da8dSAndroid Build Coastguard Worker                continue
192*cda5da8dSAndroid Build Coastguard Worker            t = threading.Thread(target=self.handle_request, args=(c,))
193*cda5da8dSAndroid Build Coastguard Worker            t.daemon = True
194*cda5da8dSAndroid Build Coastguard Worker            t.start()
195*cda5da8dSAndroid Build Coastguard Worker
196*cda5da8dSAndroid Build Coastguard Worker    def _handle_request(self, c):
197*cda5da8dSAndroid Build Coastguard Worker        request = None
198*cda5da8dSAndroid Build Coastguard Worker        try:
199*cda5da8dSAndroid Build Coastguard Worker            connection.deliver_challenge(c, self.authkey)
200*cda5da8dSAndroid Build Coastguard Worker            connection.answer_challenge(c, self.authkey)
201*cda5da8dSAndroid Build Coastguard Worker            request = c.recv()
202*cda5da8dSAndroid Build Coastguard Worker            ignore, funcname, args, kwds = request
203*cda5da8dSAndroid Build Coastguard Worker            assert funcname in self.public, '%r unrecognized' % funcname
204*cda5da8dSAndroid Build Coastguard Worker            func = getattr(self, funcname)
205*cda5da8dSAndroid Build Coastguard Worker        except Exception:
206*cda5da8dSAndroid Build Coastguard Worker            msg = ('#TRACEBACK', format_exc())
207*cda5da8dSAndroid Build Coastguard Worker        else:
208*cda5da8dSAndroid Build Coastguard Worker            try:
209*cda5da8dSAndroid Build Coastguard Worker                result = func(c, *args, **kwds)
210*cda5da8dSAndroid Build Coastguard Worker            except Exception:
211*cda5da8dSAndroid Build Coastguard Worker                msg = ('#TRACEBACK', format_exc())
212*cda5da8dSAndroid Build Coastguard Worker            else:
213*cda5da8dSAndroid Build Coastguard Worker                msg = ('#RETURN', result)
214*cda5da8dSAndroid Build Coastguard Worker
215*cda5da8dSAndroid Build Coastguard Worker        try:
216*cda5da8dSAndroid Build Coastguard Worker            c.send(msg)
217*cda5da8dSAndroid Build Coastguard Worker        except Exception as e:
218*cda5da8dSAndroid Build Coastguard Worker            try:
219*cda5da8dSAndroid Build Coastguard Worker                c.send(('#TRACEBACK', format_exc()))
220*cda5da8dSAndroid Build Coastguard Worker            except Exception:
221*cda5da8dSAndroid Build Coastguard Worker                pass
222*cda5da8dSAndroid Build Coastguard Worker            util.info('Failure to send message: %r', msg)
223*cda5da8dSAndroid Build Coastguard Worker            util.info(' ... request was %r', request)
224*cda5da8dSAndroid Build Coastguard Worker            util.info(' ... exception was %r', e)
225*cda5da8dSAndroid Build Coastguard Worker
226*cda5da8dSAndroid Build Coastguard Worker    def handle_request(self, conn):
227*cda5da8dSAndroid Build Coastguard Worker        '''
228*cda5da8dSAndroid Build Coastguard Worker        Handle a new connection
229*cda5da8dSAndroid Build Coastguard Worker        '''
230*cda5da8dSAndroid Build Coastguard Worker        try:
231*cda5da8dSAndroid Build Coastguard Worker            self._handle_request(conn)
232*cda5da8dSAndroid Build Coastguard Worker        except SystemExit:
233*cda5da8dSAndroid Build Coastguard Worker            # Server.serve_client() calls sys.exit(0) on EOF
234*cda5da8dSAndroid Build Coastguard Worker            pass
235*cda5da8dSAndroid Build Coastguard Worker        finally:
236*cda5da8dSAndroid Build Coastguard Worker            conn.close()
237*cda5da8dSAndroid Build Coastguard Worker
238*cda5da8dSAndroid Build Coastguard Worker    def serve_client(self, conn):
239*cda5da8dSAndroid Build Coastguard Worker        '''
240*cda5da8dSAndroid Build Coastguard Worker        Handle requests from the proxies in a particular process/thread
241*cda5da8dSAndroid Build Coastguard Worker        '''
242*cda5da8dSAndroid Build Coastguard Worker        util.debug('starting server thread to service %r',
243*cda5da8dSAndroid Build Coastguard Worker                   threading.current_thread().name)
244*cda5da8dSAndroid Build Coastguard Worker
245*cda5da8dSAndroid Build Coastguard Worker        recv = conn.recv
246*cda5da8dSAndroid Build Coastguard Worker        send = conn.send
247*cda5da8dSAndroid Build Coastguard Worker        id_to_obj = self.id_to_obj
248*cda5da8dSAndroid Build Coastguard Worker
249*cda5da8dSAndroid Build Coastguard Worker        while not self.stop_event.is_set():
250*cda5da8dSAndroid Build Coastguard Worker
251*cda5da8dSAndroid Build Coastguard Worker            try:
252*cda5da8dSAndroid Build Coastguard Worker                methodname = obj = None
253*cda5da8dSAndroid Build Coastguard Worker                request = recv()
254*cda5da8dSAndroid Build Coastguard Worker                ident, methodname, args, kwds = request
255*cda5da8dSAndroid Build Coastguard Worker                try:
256*cda5da8dSAndroid Build Coastguard Worker                    obj, exposed, gettypeid = id_to_obj[ident]
257*cda5da8dSAndroid Build Coastguard Worker                except KeyError as ke:
258*cda5da8dSAndroid Build Coastguard Worker                    try:
259*cda5da8dSAndroid Build Coastguard Worker                        obj, exposed, gettypeid = \
260*cda5da8dSAndroid Build Coastguard Worker                            self.id_to_local_proxy_obj[ident]
261*cda5da8dSAndroid Build Coastguard Worker                    except KeyError:
262*cda5da8dSAndroid Build Coastguard Worker                        raise ke
263*cda5da8dSAndroid Build Coastguard Worker
264*cda5da8dSAndroid Build Coastguard Worker                if methodname not in exposed:
265*cda5da8dSAndroid Build Coastguard Worker                    raise AttributeError(
266*cda5da8dSAndroid Build Coastguard Worker                        'method %r of %r object is not in exposed=%r' %
267*cda5da8dSAndroid Build Coastguard Worker                        (methodname, type(obj), exposed)
268*cda5da8dSAndroid Build Coastguard Worker                        )
269*cda5da8dSAndroid Build Coastguard Worker
270*cda5da8dSAndroid Build Coastguard Worker                function = getattr(obj, methodname)
271*cda5da8dSAndroid Build Coastguard Worker
272*cda5da8dSAndroid Build Coastguard Worker                try:
273*cda5da8dSAndroid Build Coastguard Worker                    res = function(*args, **kwds)
274*cda5da8dSAndroid Build Coastguard Worker                except Exception as e:
275*cda5da8dSAndroid Build Coastguard Worker                    msg = ('#ERROR', e)
276*cda5da8dSAndroid Build Coastguard Worker                else:
277*cda5da8dSAndroid Build Coastguard Worker                    typeid = gettypeid and gettypeid.get(methodname, None)
278*cda5da8dSAndroid Build Coastguard Worker                    if typeid:
279*cda5da8dSAndroid Build Coastguard Worker                        rident, rexposed = self.create(conn, typeid, res)
280*cda5da8dSAndroid Build Coastguard Worker                        token = Token(typeid, self.address, rident)
281*cda5da8dSAndroid Build Coastguard Worker                        msg = ('#PROXY', (rexposed, token))
282*cda5da8dSAndroid Build Coastguard Worker                    else:
283*cda5da8dSAndroid Build Coastguard Worker                        msg = ('#RETURN', res)
284*cda5da8dSAndroid Build Coastguard Worker
285*cda5da8dSAndroid Build Coastguard Worker            except AttributeError:
286*cda5da8dSAndroid Build Coastguard Worker                if methodname is None:
287*cda5da8dSAndroid Build Coastguard Worker                    msg = ('#TRACEBACK', format_exc())
288*cda5da8dSAndroid Build Coastguard Worker                else:
289*cda5da8dSAndroid Build Coastguard Worker                    try:
290*cda5da8dSAndroid Build Coastguard Worker                        fallback_func = self.fallback_mapping[methodname]
291*cda5da8dSAndroid Build Coastguard Worker                        result = fallback_func(
292*cda5da8dSAndroid Build Coastguard Worker                            self, conn, ident, obj, *args, **kwds
293*cda5da8dSAndroid Build Coastguard Worker                            )
294*cda5da8dSAndroid Build Coastguard Worker                        msg = ('#RETURN', result)
295*cda5da8dSAndroid Build Coastguard Worker                    except Exception:
296*cda5da8dSAndroid Build Coastguard Worker                        msg = ('#TRACEBACK', format_exc())
297*cda5da8dSAndroid Build Coastguard Worker
298*cda5da8dSAndroid Build Coastguard Worker            except EOFError:
299*cda5da8dSAndroid Build Coastguard Worker                util.debug('got EOF -- exiting thread serving %r',
300*cda5da8dSAndroid Build Coastguard Worker                           threading.current_thread().name)
301*cda5da8dSAndroid Build Coastguard Worker                sys.exit(0)
302*cda5da8dSAndroid Build Coastguard Worker
303*cda5da8dSAndroid Build Coastguard Worker            except Exception:
304*cda5da8dSAndroid Build Coastguard Worker                msg = ('#TRACEBACK', format_exc())
305*cda5da8dSAndroid Build Coastguard Worker
306*cda5da8dSAndroid Build Coastguard Worker            try:
307*cda5da8dSAndroid Build Coastguard Worker                try:
308*cda5da8dSAndroid Build Coastguard Worker                    send(msg)
309*cda5da8dSAndroid Build Coastguard Worker                except Exception:
310*cda5da8dSAndroid Build Coastguard Worker                    send(('#UNSERIALIZABLE', format_exc()))
311*cda5da8dSAndroid Build Coastguard Worker            except Exception as e:
312*cda5da8dSAndroid Build Coastguard Worker                util.info('exception in thread serving %r',
313*cda5da8dSAndroid Build Coastguard Worker                        threading.current_thread().name)
314*cda5da8dSAndroid Build Coastguard Worker                util.info(' ... message was %r', msg)
315*cda5da8dSAndroid Build Coastguard Worker                util.info(' ... exception was %r', e)
316*cda5da8dSAndroid Build Coastguard Worker                conn.close()
317*cda5da8dSAndroid Build Coastguard Worker                sys.exit(1)
318*cda5da8dSAndroid Build Coastguard Worker
319*cda5da8dSAndroid Build Coastguard Worker    def fallback_getvalue(self, conn, ident, obj):
320*cda5da8dSAndroid Build Coastguard Worker        return obj
321*cda5da8dSAndroid Build Coastguard Worker
322*cda5da8dSAndroid Build Coastguard Worker    def fallback_str(self, conn, ident, obj):
323*cda5da8dSAndroid Build Coastguard Worker        return str(obj)
324*cda5da8dSAndroid Build Coastguard Worker
325*cda5da8dSAndroid Build Coastguard Worker    def fallback_repr(self, conn, ident, obj):
326*cda5da8dSAndroid Build Coastguard Worker        return repr(obj)
327*cda5da8dSAndroid Build Coastguard Worker
328*cda5da8dSAndroid Build Coastguard Worker    fallback_mapping = {
329*cda5da8dSAndroid Build Coastguard Worker        '__str__':fallback_str,
330*cda5da8dSAndroid Build Coastguard Worker        '__repr__':fallback_repr,
331*cda5da8dSAndroid Build Coastguard Worker        '#GETVALUE':fallback_getvalue
332*cda5da8dSAndroid Build Coastguard Worker        }
333*cda5da8dSAndroid Build Coastguard Worker
334*cda5da8dSAndroid Build Coastguard Worker    def dummy(self, c):
335*cda5da8dSAndroid Build Coastguard Worker        pass
336*cda5da8dSAndroid Build Coastguard Worker
337*cda5da8dSAndroid Build Coastguard Worker    def debug_info(self, c):
338*cda5da8dSAndroid Build Coastguard Worker        '''
339*cda5da8dSAndroid Build Coastguard Worker        Return some info --- useful to spot problems with refcounting
340*cda5da8dSAndroid Build Coastguard Worker        '''
341*cda5da8dSAndroid Build Coastguard Worker        # Perhaps include debug info about 'c'?
342*cda5da8dSAndroid Build Coastguard Worker        with self.mutex:
343*cda5da8dSAndroid Build Coastguard Worker            result = []
344*cda5da8dSAndroid Build Coastguard Worker            keys = list(self.id_to_refcount.keys())
345*cda5da8dSAndroid Build Coastguard Worker            keys.sort()
346*cda5da8dSAndroid Build Coastguard Worker            for ident in keys:
347*cda5da8dSAndroid Build Coastguard Worker                if ident != '0':
348*cda5da8dSAndroid Build Coastguard Worker                    result.append('  %s:       refcount=%s\n    %s' %
349*cda5da8dSAndroid Build Coastguard Worker                                  (ident, self.id_to_refcount[ident],
350*cda5da8dSAndroid Build Coastguard Worker                                   str(self.id_to_obj[ident][0])[:75]))
351*cda5da8dSAndroid Build Coastguard Worker            return '\n'.join(result)
352*cda5da8dSAndroid Build Coastguard Worker
353*cda5da8dSAndroid Build Coastguard Worker    def number_of_objects(self, c):
354*cda5da8dSAndroid Build Coastguard Worker        '''
355*cda5da8dSAndroid Build Coastguard Worker        Number of shared objects
356*cda5da8dSAndroid Build Coastguard Worker        '''
357*cda5da8dSAndroid Build Coastguard Worker        # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
358*cda5da8dSAndroid Build Coastguard Worker        return len(self.id_to_refcount)
359*cda5da8dSAndroid Build Coastguard Worker
360*cda5da8dSAndroid Build Coastguard Worker    def shutdown(self, c):
361*cda5da8dSAndroid Build Coastguard Worker        '''
362*cda5da8dSAndroid Build Coastguard Worker        Shutdown this process
363*cda5da8dSAndroid Build Coastguard Worker        '''
364*cda5da8dSAndroid Build Coastguard Worker        try:
365*cda5da8dSAndroid Build Coastguard Worker            util.debug('manager received shutdown message')
366*cda5da8dSAndroid Build Coastguard Worker            c.send(('#RETURN', None))
367*cda5da8dSAndroid Build Coastguard Worker        except:
368*cda5da8dSAndroid Build Coastguard Worker            import traceback
369*cda5da8dSAndroid Build Coastguard Worker            traceback.print_exc()
370*cda5da8dSAndroid Build Coastguard Worker        finally:
371*cda5da8dSAndroid Build Coastguard Worker            self.stop_event.set()
372*cda5da8dSAndroid Build Coastguard Worker
373*cda5da8dSAndroid Build Coastguard Worker    def create(self, c, typeid, /, *args, **kwds):
374*cda5da8dSAndroid Build Coastguard Worker        '''
375*cda5da8dSAndroid Build Coastguard Worker        Create a new shared object and return its id
376*cda5da8dSAndroid Build Coastguard Worker        '''
377*cda5da8dSAndroid Build Coastguard Worker        with self.mutex:
378*cda5da8dSAndroid Build Coastguard Worker            callable, exposed, method_to_typeid, proxytype = \
379*cda5da8dSAndroid Build Coastguard Worker                      self.registry[typeid]
380*cda5da8dSAndroid Build Coastguard Worker
381*cda5da8dSAndroid Build Coastguard Worker            if callable is None:
382*cda5da8dSAndroid Build Coastguard Worker                if kwds or (len(args) != 1):
383*cda5da8dSAndroid Build Coastguard Worker                    raise ValueError(
384*cda5da8dSAndroid Build Coastguard Worker                        "Without callable, must have one non-keyword argument")
385*cda5da8dSAndroid Build Coastguard Worker                obj = args[0]
386*cda5da8dSAndroid Build Coastguard Worker            else:
387*cda5da8dSAndroid Build Coastguard Worker                obj = callable(*args, **kwds)
388*cda5da8dSAndroid Build Coastguard Worker
389*cda5da8dSAndroid Build Coastguard Worker            if exposed is None:
390*cda5da8dSAndroid Build Coastguard Worker                exposed = public_methods(obj)
391*cda5da8dSAndroid Build Coastguard Worker            if method_to_typeid is not None:
392*cda5da8dSAndroid Build Coastguard Worker                if not isinstance(method_to_typeid, dict):
393*cda5da8dSAndroid Build Coastguard Worker                    raise TypeError(
394*cda5da8dSAndroid Build Coastguard Worker                        "Method_to_typeid {0!r}: type {1!s}, not dict".format(
395*cda5da8dSAndroid Build Coastguard Worker                            method_to_typeid, type(method_to_typeid)))
396*cda5da8dSAndroid Build Coastguard Worker                exposed = list(exposed) + list(method_to_typeid)
397*cda5da8dSAndroid Build Coastguard Worker
398*cda5da8dSAndroid Build Coastguard Worker            ident = '%x' % id(obj)  # convert to string because xmlrpclib
399*cda5da8dSAndroid Build Coastguard Worker                                    # only has 32 bit signed integers
400*cda5da8dSAndroid Build Coastguard Worker            util.debug('%r callable returned object with id %r', typeid, ident)
401*cda5da8dSAndroid Build Coastguard Worker
402*cda5da8dSAndroid Build Coastguard Worker            self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
403*cda5da8dSAndroid Build Coastguard Worker            if ident not in self.id_to_refcount:
404*cda5da8dSAndroid Build Coastguard Worker                self.id_to_refcount[ident] = 0
405*cda5da8dSAndroid Build Coastguard Worker
406*cda5da8dSAndroid Build Coastguard Worker        self.incref(c, ident)
407*cda5da8dSAndroid Build Coastguard Worker        return ident, tuple(exposed)
408*cda5da8dSAndroid Build Coastguard Worker
409*cda5da8dSAndroid Build Coastguard Worker    def get_methods(self, c, token):
410*cda5da8dSAndroid Build Coastguard Worker        '''
411*cda5da8dSAndroid Build Coastguard Worker        Return the methods of the shared object indicated by token
412*cda5da8dSAndroid Build Coastguard Worker        '''
413*cda5da8dSAndroid Build Coastguard Worker        return tuple(self.id_to_obj[token.id][1])
414*cda5da8dSAndroid Build Coastguard Worker
415*cda5da8dSAndroid Build Coastguard Worker    def accept_connection(self, c, name):
416*cda5da8dSAndroid Build Coastguard Worker        '''
417*cda5da8dSAndroid Build Coastguard Worker        Spawn a new thread to serve this connection
418*cda5da8dSAndroid Build Coastguard Worker        '''
419*cda5da8dSAndroid Build Coastguard Worker        threading.current_thread().name = name
420*cda5da8dSAndroid Build Coastguard Worker        c.send(('#RETURN', None))
421*cda5da8dSAndroid Build Coastguard Worker        self.serve_client(c)
422*cda5da8dSAndroid Build Coastguard Worker
423*cda5da8dSAndroid Build Coastguard Worker    def incref(self, c, ident):
424*cda5da8dSAndroid Build Coastguard Worker        with self.mutex:
425*cda5da8dSAndroid Build Coastguard Worker            try:
426*cda5da8dSAndroid Build Coastguard Worker                self.id_to_refcount[ident] += 1
427*cda5da8dSAndroid Build Coastguard Worker            except KeyError as ke:
428*cda5da8dSAndroid Build Coastguard Worker                # If no external references exist but an internal (to the
429*cda5da8dSAndroid Build Coastguard Worker                # manager) still does and a new external reference is created
430*cda5da8dSAndroid Build Coastguard Worker                # from it, restore the manager's tracking of it from the
431*cda5da8dSAndroid Build Coastguard Worker                # previously stashed internal ref.
432*cda5da8dSAndroid Build Coastguard Worker                if ident in self.id_to_local_proxy_obj:
433*cda5da8dSAndroid Build Coastguard Worker                    self.id_to_refcount[ident] = 1
434*cda5da8dSAndroid Build Coastguard Worker                    self.id_to_obj[ident] = \
435*cda5da8dSAndroid Build Coastguard Worker                        self.id_to_local_proxy_obj[ident]
436*cda5da8dSAndroid Build Coastguard Worker                    obj, exposed, gettypeid = self.id_to_obj[ident]
437*cda5da8dSAndroid Build Coastguard Worker                    util.debug('Server re-enabled tracking & INCREF %r', ident)
438*cda5da8dSAndroid Build Coastguard Worker                else:
439*cda5da8dSAndroid Build Coastguard Worker                    raise ke
440*cda5da8dSAndroid Build Coastguard Worker
441*cda5da8dSAndroid Build Coastguard Worker    def decref(self, c, ident):
442*cda5da8dSAndroid Build Coastguard Worker        if ident not in self.id_to_refcount and \
443*cda5da8dSAndroid Build Coastguard Worker            ident in self.id_to_local_proxy_obj:
444*cda5da8dSAndroid Build Coastguard Worker            util.debug('Server DECREF skipping %r', ident)
445*cda5da8dSAndroid Build Coastguard Worker            return
446*cda5da8dSAndroid Build Coastguard Worker
447*cda5da8dSAndroid Build Coastguard Worker        with self.mutex:
448*cda5da8dSAndroid Build Coastguard Worker            if self.id_to_refcount[ident] <= 0:
449*cda5da8dSAndroid Build Coastguard Worker                raise AssertionError(
450*cda5da8dSAndroid Build Coastguard Worker                    "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
451*cda5da8dSAndroid Build Coastguard Worker                        ident, self.id_to_obj[ident],
452*cda5da8dSAndroid Build Coastguard Worker                        self.id_to_refcount[ident]))
453*cda5da8dSAndroid Build Coastguard Worker            self.id_to_refcount[ident] -= 1
454*cda5da8dSAndroid Build Coastguard Worker            if self.id_to_refcount[ident] == 0:
455*cda5da8dSAndroid Build Coastguard Worker                del self.id_to_refcount[ident]
456*cda5da8dSAndroid Build Coastguard Worker
457*cda5da8dSAndroid Build Coastguard Worker        if ident not in self.id_to_refcount:
458*cda5da8dSAndroid Build Coastguard Worker            # Two-step process in case the object turns out to contain other
459*cda5da8dSAndroid Build Coastguard Worker            # proxy objects (e.g. a managed list of managed lists).
460*cda5da8dSAndroid Build Coastguard Worker            # Otherwise, deleting self.id_to_obj[ident] would trigger the
461*cda5da8dSAndroid Build Coastguard Worker            # deleting of the stored value (another managed object) which would
462*cda5da8dSAndroid Build Coastguard Worker            # in turn attempt to acquire the mutex that is already held here.
463*cda5da8dSAndroid Build Coastguard Worker            self.id_to_obj[ident] = (None, (), None)  # thread-safe
464*cda5da8dSAndroid Build Coastguard Worker            util.debug('disposing of obj with id %r', ident)
465*cda5da8dSAndroid Build Coastguard Worker            with self.mutex:
466*cda5da8dSAndroid Build Coastguard Worker                del self.id_to_obj[ident]
467*cda5da8dSAndroid Build Coastguard Worker
468*cda5da8dSAndroid Build Coastguard Worker
469*cda5da8dSAndroid Build Coastguard Worker#
470*cda5da8dSAndroid Build Coastguard Worker# Class to represent state of a manager
471*cda5da8dSAndroid Build Coastguard Worker#
472*cda5da8dSAndroid Build Coastguard Worker
473*cda5da8dSAndroid Build Coastguard Workerclass State(object):
474*cda5da8dSAndroid Build Coastguard Worker    __slots__ = ['value']
475*cda5da8dSAndroid Build Coastguard Worker    INITIAL = 0
476*cda5da8dSAndroid Build Coastguard Worker    STARTED = 1
477*cda5da8dSAndroid Build Coastguard Worker    SHUTDOWN = 2
478*cda5da8dSAndroid Build Coastguard Worker
479*cda5da8dSAndroid Build Coastguard Worker#
480*cda5da8dSAndroid Build Coastguard Worker# Mapping from serializer name to Listener and Client types
481*cda5da8dSAndroid Build Coastguard Worker#
482*cda5da8dSAndroid Build Coastguard Worker
483*cda5da8dSAndroid Build Coastguard Workerlistener_client = {
484*cda5da8dSAndroid Build Coastguard Worker    'pickle' : (connection.Listener, connection.Client),
485*cda5da8dSAndroid Build Coastguard Worker    'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
486*cda5da8dSAndroid Build Coastguard Worker    }
487*cda5da8dSAndroid Build Coastguard Worker
488*cda5da8dSAndroid Build Coastguard Worker#
489*cda5da8dSAndroid Build Coastguard Worker# Definition of BaseManager
490*cda5da8dSAndroid Build Coastguard Worker#
491*cda5da8dSAndroid Build Coastguard Worker
492*cda5da8dSAndroid Build Coastguard Workerclass BaseManager(object):
493*cda5da8dSAndroid Build Coastguard Worker    '''
494*cda5da8dSAndroid Build Coastguard Worker    Base class for managers
495*cda5da8dSAndroid Build Coastguard Worker    '''
496*cda5da8dSAndroid Build Coastguard Worker    _registry = {}
497*cda5da8dSAndroid Build Coastguard Worker    _Server = Server
498*cda5da8dSAndroid Build Coastguard Worker
499*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, address=None, authkey=None, serializer='pickle',
500*cda5da8dSAndroid Build Coastguard Worker                 ctx=None, *, shutdown_timeout=1.0):
501*cda5da8dSAndroid Build Coastguard Worker        if authkey is None:
502*cda5da8dSAndroid Build Coastguard Worker            authkey = process.current_process().authkey
503*cda5da8dSAndroid Build Coastguard Worker        self._address = address     # XXX not final address if eg ('', 0)
504*cda5da8dSAndroid Build Coastguard Worker        self._authkey = process.AuthenticationString(authkey)
505*cda5da8dSAndroid Build Coastguard Worker        self._state = State()
506*cda5da8dSAndroid Build Coastguard Worker        self._state.value = State.INITIAL
507*cda5da8dSAndroid Build Coastguard Worker        self._serializer = serializer
508*cda5da8dSAndroid Build Coastguard Worker        self._Listener, self._Client = listener_client[serializer]
509*cda5da8dSAndroid Build Coastguard Worker        self._ctx = ctx or get_context()
510*cda5da8dSAndroid Build Coastguard Worker        self._shutdown_timeout = shutdown_timeout
511*cda5da8dSAndroid Build Coastguard Worker
512*cda5da8dSAndroid Build Coastguard Worker    def get_server(self):
513*cda5da8dSAndroid Build Coastguard Worker        '''
514*cda5da8dSAndroid Build Coastguard Worker        Return server object with serve_forever() method and address attribute
515*cda5da8dSAndroid Build Coastguard Worker        '''
516*cda5da8dSAndroid Build Coastguard Worker        if self._state.value != State.INITIAL:
517*cda5da8dSAndroid Build Coastguard Worker            if self._state.value == State.STARTED:
518*cda5da8dSAndroid Build Coastguard Worker                raise ProcessError("Already started server")
519*cda5da8dSAndroid Build Coastguard Worker            elif self._state.value == State.SHUTDOWN:
520*cda5da8dSAndroid Build Coastguard Worker                raise ProcessError("Manager has shut down")
521*cda5da8dSAndroid Build Coastguard Worker            else:
522*cda5da8dSAndroid Build Coastguard Worker                raise ProcessError(
523*cda5da8dSAndroid Build Coastguard Worker                    "Unknown state {!r}".format(self._state.value))
524*cda5da8dSAndroid Build Coastguard Worker        return Server(self._registry, self._address,
525*cda5da8dSAndroid Build Coastguard Worker                      self._authkey, self._serializer)
526*cda5da8dSAndroid Build Coastguard Worker
527*cda5da8dSAndroid Build Coastguard Worker    def connect(self):
528*cda5da8dSAndroid Build Coastguard Worker        '''
529*cda5da8dSAndroid Build Coastguard Worker        Connect manager object to the server process
530*cda5da8dSAndroid Build Coastguard Worker        '''
531*cda5da8dSAndroid Build Coastguard Worker        Listener, Client = listener_client[self._serializer]
532*cda5da8dSAndroid Build Coastguard Worker        conn = Client(self._address, authkey=self._authkey)
533*cda5da8dSAndroid Build Coastguard Worker        dispatch(conn, None, 'dummy')
534*cda5da8dSAndroid Build Coastguard Worker        self._state.value = State.STARTED
535*cda5da8dSAndroid Build Coastguard Worker
536*cda5da8dSAndroid Build Coastguard Worker    def start(self, initializer=None, initargs=()):
537*cda5da8dSAndroid Build Coastguard Worker        '''
538*cda5da8dSAndroid Build Coastguard Worker        Spawn a server process for this manager object
539*cda5da8dSAndroid Build Coastguard Worker        '''
540*cda5da8dSAndroid Build Coastguard Worker        if self._state.value != State.INITIAL:
541*cda5da8dSAndroid Build Coastguard Worker            if self._state.value == State.STARTED:
542*cda5da8dSAndroid Build Coastguard Worker                raise ProcessError("Already started server")
543*cda5da8dSAndroid Build Coastguard Worker            elif self._state.value == State.SHUTDOWN:
544*cda5da8dSAndroid Build Coastguard Worker                raise ProcessError("Manager has shut down")
545*cda5da8dSAndroid Build Coastguard Worker            else:
546*cda5da8dSAndroid Build Coastguard Worker                raise ProcessError(
547*cda5da8dSAndroid Build Coastguard Worker                    "Unknown state {!r}".format(self._state.value))
548*cda5da8dSAndroid Build Coastguard Worker
549*cda5da8dSAndroid Build Coastguard Worker        if initializer is not None and not callable(initializer):
550*cda5da8dSAndroid Build Coastguard Worker            raise TypeError('initializer must be a callable')
551*cda5da8dSAndroid Build Coastguard Worker
552*cda5da8dSAndroid Build Coastguard Worker        # pipe over which we will retrieve address of server
553*cda5da8dSAndroid Build Coastguard Worker        reader, writer = connection.Pipe(duplex=False)
554*cda5da8dSAndroid Build Coastguard Worker
555*cda5da8dSAndroid Build Coastguard Worker        # spawn process which runs a server
556*cda5da8dSAndroid Build Coastguard Worker        self._process = self._ctx.Process(
557*cda5da8dSAndroid Build Coastguard Worker            target=type(self)._run_server,
558*cda5da8dSAndroid Build Coastguard Worker            args=(self._registry, self._address, self._authkey,
559*cda5da8dSAndroid Build Coastguard Worker                  self._serializer, writer, initializer, initargs),
560*cda5da8dSAndroid Build Coastguard Worker            )
561*cda5da8dSAndroid Build Coastguard Worker        ident = ':'.join(str(i) for i in self._process._identity)
562*cda5da8dSAndroid Build Coastguard Worker        self._process.name = type(self).__name__  + '-' + ident
563*cda5da8dSAndroid Build Coastguard Worker        self._process.start()
564*cda5da8dSAndroid Build Coastguard Worker
565*cda5da8dSAndroid Build Coastguard Worker        # get address of server
566*cda5da8dSAndroid Build Coastguard Worker        writer.close()
567*cda5da8dSAndroid Build Coastguard Worker        self._address = reader.recv()
568*cda5da8dSAndroid Build Coastguard Worker        reader.close()
569*cda5da8dSAndroid Build Coastguard Worker
570*cda5da8dSAndroid Build Coastguard Worker        # register a finalizer
571*cda5da8dSAndroid Build Coastguard Worker        self._state.value = State.STARTED
572*cda5da8dSAndroid Build Coastguard Worker        self.shutdown = util.Finalize(
573*cda5da8dSAndroid Build Coastguard Worker            self, type(self)._finalize_manager,
574*cda5da8dSAndroid Build Coastguard Worker            args=(self._process, self._address, self._authkey, self._state,
575*cda5da8dSAndroid Build Coastguard Worker                  self._Client, self._shutdown_timeout),
576*cda5da8dSAndroid Build Coastguard Worker            exitpriority=0
577*cda5da8dSAndroid Build Coastguard Worker            )
578*cda5da8dSAndroid Build Coastguard Worker
579*cda5da8dSAndroid Build Coastguard Worker    @classmethod
580*cda5da8dSAndroid Build Coastguard Worker    def _run_server(cls, registry, address, authkey, serializer, writer,
581*cda5da8dSAndroid Build Coastguard Worker                    initializer=None, initargs=()):
582*cda5da8dSAndroid Build Coastguard Worker        '''
583*cda5da8dSAndroid Build Coastguard Worker        Create a server, report its address and run it
584*cda5da8dSAndroid Build Coastguard Worker        '''
585*cda5da8dSAndroid Build Coastguard Worker        # bpo-36368: protect server process from KeyboardInterrupt signals
586*cda5da8dSAndroid Build Coastguard Worker        signal.signal(signal.SIGINT, signal.SIG_IGN)
587*cda5da8dSAndroid Build Coastguard Worker
588*cda5da8dSAndroid Build Coastguard Worker        if initializer is not None:
589*cda5da8dSAndroid Build Coastguard Worker            initializer(*initargs)
590*cda5da8dSAndroid Build Coastguard Worker
591*cda5da8dSAndroid Build Coastguard Worker        # create server
592*cda5da8dSAndroid Build Coastguard Worker        server = cls._Server(registry, address, authkey, serializer)
593*cda5da8dSAndroid Build Coastguard Worker
594*cda5da8dSAndroid Build Coastguard Worker        # inform parent process of the server's address
595*cda5da8dSAndroid Build Coastguard Worker        writer.send(server.address)
596*cda5da8dSAndroid Build Coastguard Worker        writer.close()
597*cda5da8dSAndroid Build Coastguard Worker
598*cda5da8dSAndroid Build Coastguard Worker        # run the manager
599*cda5da8dSAndroid Build Coastguard Worker        util.info('manager serving at %r', server.address)
600*cda5da8dSAndroid Build Coastguard Worker        server.serve_forever()
601*cda5da8dSAndroid Build Coastguard Worker
602*cda5da8dSAndroid Build Coastguard Worker    def _create(self, typeid, /, *args, **kwds):
603*cda5da8dSAndroid Build Coastguard Worker        '''
604*cda5da8dSAndroid Build Coastguard Worker        Create a new shared object; return the token and exposed tuple
605*cda5da8dSAndroid Build Coastguard Worker        '''
606*cda5da8dSAndroid Build Coastguard Worker        assert self._state.value == State.STARTED, 'server not yet started'
607*cda5da8dSAndroid Build Coastguard Worker        conn = self._Client(self._address, authkey=self._authkey)
608*cda5da8dSAndroid Build Coastguard Worker        try:
609*cda5da8dSAndroid Build Coastguard Worker            id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
610*cda5da8dSAndroid Build Coastguard Worker        finally:
611*cda5da8dSAndroid Build Coastguard Worker            conn.close()
612*cda5da8dSAndroid Build Coastguard Worker        return Token(typeid, self._address, id), exposed
613*cda5da8dSAndroid Build Coastguard Worker
614*cda5da8dSAndroid Build Coastguard Worker    def join(self, timeout=None):
615*cda5da8dSAndroid Build Coastguard Worker        '''
616*cda5da8dSAndroid Build Coastguard Worker        Join the manager process (if it has been spawned)
617*cda5da8dSAndroid Build Coastguard Worker        '''
618*cda5da8dSAndroid Build Coastguard Worker        if self._process is not None:
619*cda5da8dSAndroid Build Coastguard Worker            self._process.join(timeout)
620*cda5da8dSAndroid Build Coastguard Worker            if not self._process.is_alive():
621*cda5da8dSAndroid Build Coastguard Worker                self._process = None
622*cda5da8dSAndroid Build Coastguard Worker
623*cda5da8dSAndroid Build Coastguard Worker    def _debug_info(self):
624*cda5da8dSAndroid Build Coastguard Worker        '''
625*cda5da8dSAndroid Build Coastguard Worker        Return some info about the servers shared objects and connections
626*cda5da8dSAndroid Build Coastguard Worker        '''
627*cda5da8dSAndroid Build Coastguard Worker        conn = self._Client(self._address, authkey=self._authkey)
628*cda5da8dSAndroid Build Coastguard Worker        try:
629*cda5da8dSAndroid Build Coastguard Worker            return dispatch(conn, None, 'debug_info')
630*cda5da8dSAndroid Build Coastguard Worker        finally:
631*cda5da8dSAndroid Build Coastguard Worker            conn.close()
632*cda5da8dSAndroid Build Coastguard Worker
633*cda5da8dSAndroid Build Coastguard Worker    def _number_of_objects(self):
634*cda5da8dSAndroid Build Coastguard Worker        '''
635*cda5da8dSAndroid Build Coastguard Worker        Return the number of shared objects
636*cda5da8dSAndroid Build Coastguard Worker        '''
637*cda5da8dSAndroid Build Coastguard Worker        conn = self._Client(self._address, authkey=self._authkey)
638*cda5da8dSAndroid Build Coastguard Worker        try:
639*cda5da8dSAndroid Build Coastguard Worker            return dispatch(conn, None, 'number_of_objects')
640*cda5da8dSAndroid Build Coastguard Worker        finally:
641*cda5da8dSAndroid Build Coastguard Worker            conn.close()
642*cda5da8dSAndroid Build Coastguard Worker
643*cda5da8dSAndroid Build Coastguard Worker    def __enter__(self):
644*cda5da8dSAndroid Build Coastguard Worker        if self._state.value == State.INITIAL:
645*cda5da8dSAndroid Build Coastguard Worker            self.start()
646*cda5da8dSAndroid Build Coastguard Worker        if self._state.value != State.STARTED:
647*cda5da8dSAndroid Build Coastguard Worker            if self._state.value == State.INITIAL:
648*cda5da8dSAndroid Build Coastguard Worker                raise ProcessError("Unable to start server")
649*cda5da8dSAndroid Build Coastguard Worker            elif self._state.value == State.SHUTDOWN:
650*cda5da8dSAndroid Build Coastguard Worker                raise ProcessError("Manager has shut down")
651*cda5da8dSAndroid Build Coastguard Worker            else:
652*cda5da8dSAndroid Build Coastguard Worker                raise ProcessError(
653*cda5da8dSAndroid Build Coastguard Worker                    "Unknown state {!r}".format(self._state.value))
654*cda5da8dSAndroid Build Coastguard Worker        return self
655*cda5da8dSAndroid Build Coastguard Worker
656*cda5da8dSAndroid Build Coastguard Worker    def __exit__(self, exc_type, exc_val, exc_tb):
657*cda5da8dSAndroid Build Coastguard Worker        self.shutdown()
658*cda5da8dSAndroid Build Coastguard Worker
659*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
660*cda5da8dSAndroid Build Coastguard Worker    def _finalize_manager(process, address, authkey, state, _Client,
661*cda5da8dSAndroid Build Coastguard Worker                          shutdown_timeout):
662*cda5da8dSAndroid Build Coastguard Worker        '''
663*cda5da8dSAndroid Build Coastguard Worker        Shutdown the manager process; will be registered as a finalizer
664*cda5da8dSAndroid Build Coastguard Worker        '''
665*cda5da8dSAndroid Build Coastguard Worker        if process.is_alive():
666*cda5da8dSAndroid Build Coastguard Worker            util.info('sending shutdown message to manager')
667*cda5da8dSAndroid Build Coastguard Worker            try:
668*cda5da8dSAndroid Build Coastguard Worker                conn = _Client(address, authkey=authkey)
669*cda5da8dSAndroid Build Coastguard Worker                try:
670*cda5da8dSAndroid Build Coastguard Worker                    dispatch(conn, None, 'shutdown')
671*cda5da8dSAndroid Build Coastguard Worker                finally:
672*cda5da8dSAndroid Build Coastguard Worker                    conn.close()
673*cda5da8dSAndroid Build Coastguard Worker            except Exception:
674*cda5da8dSAndroid Build Coastguard Worker                pass
675*cda5da8dSAndroid Build Coastguard Worker
676*cda5da8dSAndroid Build Coastguard Worker            process.join(timeout=shutdown_timeout)
677*cda5da8dSAndroid Build Coastguard Worker            if process.is_alive():
678*cda5da8dSAndroid Build Coastguard Worker                util.info('manager still alive')
679*cda5da8dSAndroid Build Coastguard Worker                if hasattr(process, 'terminate'):
680*cda5da8dSAndroid Build Coastguard Worker                    util.info('trying to `terminate()` manager process')
681*cda5da8dSAndroid Build Coastguard Worker                    process.terminate()
682*cda5da8dSAndroid Build Coastguard Worker                    process.join(timeout=shutdown_timeout)
683*cda5da8dSAndroid Build Coastguard Worker                    if process.is_alive():
684*cda5da8dSAndroid Build Coastguard Worker                        util.info('manager still alive after terminate')
685*cda5da8dSAndroid Build Coastguard Worker                        process.kill()
686*cda5da8dSAndroid Build Coastguard Worker                        process.join()
687*cda5da8dSAndroid Build Coastguard Worker
688*cda5da8dSAndroid Build Coastguard Worker        state.value = State.SHUTDOWN
689*cda5da8dSAndroid Build Coastguard Worker        try:
690*cda5da8dSAndroid Build Coastguard Worker            del BaseProxy._address_to_local[address]
691*cda5da8dSAndroid Build Coastguard Worker        except KeyError:
692*cda5da8dSAndroid Build Coastguard Worker            pass
693*cda5da8dSAndroid Build Coastguard Worker
694*cda5da8dSAndroid Build Coastguard Worker    @property
695*cda5da8dSAndroid Build Coastguard Worker    def address(self):
696*cda5da8dSAndroid Build Coastguard Worker        return self._address
697*cda5da8dSAndroid Build Coastguard Worker
698*cda5da8dSAndroid Build Coastguard Worker    @classmethod
699*cda5da8dSAndroid Build Coastguard Worker    def register(cls, typeid, callable=None, proxytype=None, exposed=None,
700*cda5da8dSAndroid Build Coastguard Worker                 method_to_typeid=None, create_method=True):
701*cda5da8dSAndroid Build Coastguard Worker        '''
702*cda5da8dSAndroid Build Coastguard Worker        Register a typeid with the manager type
703*cda5da8dSAndroid Build Coastguard Worker        '''
704*cda5da8dSAndroid Build Coastguard Worker        if '_registry' not in cls.__dict__:
705*cda5da8dSAndroid Build Coastguard Worker            cls._registry = cls._registry.copy()
706*cda5da8dSAndroid Build Coastguard Worker
707*cda5da8dSAndroid Build Coastguard Worker        if proxytype is None:
708*cda5da8dSAndroid Build Coastguard Worker            proxytype = AutoProxy
709*cda5da8dSAndroid Build Coastguard Worker
710*cda5da8dSAndroid Build Coastguard Worker        exposed = exposed or getattr(proxytype, '_exposed_', None)
711*cda5da8dSAndroid Build Coastguard Worker
712*cda5da8dSAndroid Build Coastguard Worker        method_to_typeid = method_to_typeid or \
713*cda5da8dSAndroid Build Coastguard Worker                           getattr(proxytype, '_method_to_typeid_', None)
714*cda5da8dSAndroid Build Coastguard Worker
715*cda5da8dSAndroid Build Coastguard Worker        if method_to_typeid:
716*cda5da8dSAndroid Build Coastguard Worker            for key, value in list(method_to_typeid.items()): # isinstance?
717*cda5da8dSAndroid Build Coastguard Worker                assert type(key) is str, '%r is not a string' % key
718*cda5da8dSAndroid Build Coastguard Worker                assert type(value) is str, '%r is not a string' % value
719*cda5da8dSAndroid Build Coastguard Worker
720*cda5da8dSAndroid Build Coastguard Worker        cls._registry[typeid] = (
721*cda5da8dSAndroid Build Coastguard Worker            callable, exposed, method_to_typeid, proxytype
722*cda5da8dSAndroid Build Coastguard Worker            )
723*cda5da8dSAndroid Build Coastguard Worker
724*cda5da8dSAndroid Build Coastguard Worker        if create_method:
725*cda5da8dSAndroid Build Coastguard Worker            def temp(self, /, *args, **kwds):
726*cda5da8dSAndroid Build Coastguard Worker                util.debug('requesting creation of a shared %r object', typeid)
727*cda5da8dSAndroid Build Coastguard Worker                token, exp = self._create(typeid, *args, **kwds)
728*cda5da8dSAndroid Build Coastguard Worker                proxy = proxytype(
729*cda5da8dSAndroid Build Coastguard Worker                    token, self._serializer, manager=self,
730*cda5da8dSAndroid Build Coastguard Worker                    authkey=self._authkey, exposed=exp
731*cda5da8dSAndroid Build Coastguard Worker                    )
732*cda5da8dSAndroid Build Coastguard Worker                conn = self._Client(token.address, authkey=self._authkey)
733*cda5da8dSAndroid Build Coastguard Worker                dispatch(conn, None, 'decref', (token.id,))
734*cda5da8dSAndroid Build Coastguard Worker                return proxy
735*cda5da8dSAndroid Build Coastguard Worker            temp.__name__ = typeid
736*cda5da8dSAndroid Build Coastguard Worker            setattr(cls, typeid, temp)
737*cda5da8dSAndroid Build Coastguard Worker
738*cda5da8dSAndroid Build Coastguard Worker#
739*cda5da8dSAndroid Build Coastguard Worker# Subclass of set which get cleared after a fork
740*cda5da8dSAndroid Build Coastguard Worker#
741*cda5da8dSAndroid Build Coastguard Worker
742*cda5da8dSAndroid Build Coastguard Workerclass ProcessLocalSet(set):
743*cda5da8dSAndroid Build Coastguard Worker    def __init__(self):
744*cda5da8dSAndroid Build Coastguard Worker        util.register_after_fork(self, lambda obj: obj.clear())
745*cda5da8dSAndroid Build Coastguard Worker    def __reduce__(self):
746*cda5da8dSAndroid Build Coastguard Worker        return type(self), ()
747*cda5da8dSAndroid Build Coastguard Worker
748*cda5da8dSAndroid Build Coastguard Worker#
749*cda5da8dSAndroid Build Coastguard Worker# Definition of BaseProxy
750*cda5da8dSAndroid Build Coastguard Worker#
751*cda5da8dSAndroid Build Coastguard Worker
752*cda5da8dSAndroid Build Coastguard Workerclass BaseProxy(object):
753*cda5da8dSAndroid Build Coastguard Worker    '''
754*cda5da8dSAndroid Build Coastguard Worker    A base for proxies of shared objects
755*cda5da8dSAndroid Build Coastguard Worker    '''
756*cda5da8dSAndroid Build Coastguard Worker    _address_to_local = {}
757*cda5da8dSAndroid Build Coastguard Worker    _mutex = util.ForkAwareThreadLock()
758*cda5da8dSAndroid Build Coastguard Worker
759*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, token, serializer, manager=None,
760*cda5da8dSAndroid Build Coastguard Worker                 authkey=None, exposed=None, incref=True, manager_owned=False):
761*cda5da8dSAndroid Build Coastguard Worker        with BaseProxy._mutex:
762*cda5da8dSAndroid Build Coastguard Worker            tls_idset = BaseProxy._address_to_local.get(token.address, None)
763*cda5da8dSAndroid Build Coastguard Worker            if tls_idset is None:
764*cda5da8dSAndroid Build Coastguard Worker                tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
765*cda5da8dSAndroid Build Coastguard Worker                BaseProxy._address_to_local[token.address] = tls_idset
766*cda5da8dSAndroid Build Coastguard Worker
767*cda5da8dSAndroid Build Coastguard Worker        # self._tls is used to record the connection used by this
768*cda5da8dSAndroid Build Coastguard Worker        # thread to communicate with the manager at token.address
769*cda5da8dSAndroid Build Coastguard Worker        self._tls = tls_idset[0]
770*cda5da8dSAndroid Build Coastguard Worker
771*cda5da8dSAndroid Build Coastguard Worker        # self._idset is used to record the identities of all shared
772*cda5da8dSAndroid Build Coastguard Worker        # objects for which the current process owns references and
773*cda5da8dSAndroid Build Coastguard Worker        # which are in the manager at token.address
774*cda5da8dSAndroid Build Coastguard Worker        self._idset = tls_idset[1]
775*cda5da8dSAndroid Build Coastguard Worker
776*cda5da8dSAndroid Build Coastguard Worker        self._token = token
777*cda5da8dSAndroid Build Coastguard Worker        self._id = self._token.id
778*cda5da8dSAndroid Build Coastguard Worker        self._manager = manager
779*cda5da8dSAndroid Build Coastguard Worker        self._serializer = serializer
780*cda5da8dSAndroid Build Coastguard Worker        self._Client = listener_client[serializer][1]
781*cda5da8dSAndroid Build Coastguard Worker
782*cda5da8dSAndroid Build Coastguard Worker        # Should be set to True only when a proxy object is being created
783*cda5da8dSAndroid Build Coastguard Worker        # on the manager server; primary use case: nested proxy objects.
784*cda5da8dSAndroid Build Coastguard Worker        # RebuildProxy detects when a proxy is being created on the manager
785*cda5da8dSAndroid Build Coastguard Worker        # and sets this value appropriately.
786*cda5da8dSAndroid Build Coastguard Worker        self._owned_by_manager = manager_owned
787*cda5da8dSAndroid Build Coastguard Worker
788*cda5da8dSAndroid Build Coastguard Worker        if authkey is not None:
789*cda5da8dSAndroid Build Coastguard Worker            self._authkey = process.AuthenticationString(authkey)
790*cda5da8dSAndroid Build Coastguard Worker        elif self._manager is not None:
791*cda5da8dSAndroid Build Coastguard Worker            self._authkey = self._manager._authkey
792*cda5da8dSAndroid Build Coastguard Worker        else:
793*cda5da8dSAndroid Build Coastguard Worker            self._authkey = process.current_process().authkey
794*cda5da8dSAndroid Build Coastguard Worker
795*cda5da8dSAndroid Build Coastguard Worker        if incref:
796*cda5da8dSAndroid Build Coastguard Worker            self._incref()
797*cda5da8dSAndroid Build Coastguard Worker
798*cda5da8dSAndroid Build Coastguard Worker        util.register_after_fork(self, BaseProxy._after_fork)
799*cda5da8dSAndroid Build Coastguard Worker
800*cda5da8dSAndroid Build Coastguard Worker    def _connect(self):
801*cda5da8dSAndroid Build Coastguard Worker        util.debug('making connection to manager')
802*cda5da8dSAndroid Build Coastguard Worker        name = process.current_process().name
803*cda5da8dSAndroid Build Coastguard Worker        if threading.current_thread().name != 'MainThread':
804*cda5da8dSAndroid Build Coastguard Worker            name += '|' + threading.current_thread().name
805*cda5da8dSAndroid Build Coastguard Worker        conn = self._Client(self._token.address, authkey=self._authkey)
806*cda5da8dSAndroid Build Coastguard Worker        dispatch(conn, None, 'accept_connection', (name,))
807*cda5da8dSAndroid Build Coastguard Worker        self._tls.connection = conn
808*cda5da8dSAndroid Build Coastguard Worker
809*cda5da8dSAndroid Build Coastguard Worker    def _callmethod(self, methodname, args=(), kwds={}):
810*cda5da8dSAndroid Build Coastguard Worker        '''
811*cda5da8dSAndroid Build Coastguard Worker        Try to call a method of the referent and return a copy of the result
812*cda5da8dSAndroid Build Coastguard Worker        '''
813*cda5da8dSAndroid Build Coastguard Worker        try:
814*cda5da8dSAndroid Build Coastguard Worker            conn = self._tls.connection
815*cda5da8dSAndroid Build Coastguard Worker        except AttributeError:
816*cda5da8dSAndroid Build Coastguard Worker            util.debug('thread %r does not own a connection',
817*cda5da8dSAndroid Build Coastguard Worker                       threading.current_thread().name)
818*cda5da8dSAndroid Build Coastguard Worker            self._connect()
819*cda5da8dSAndroid Build Coastguard Worker            conn = self._tls.connection
820*cda5da8dSAndroid Build Coastguard Worker
821*cda5da8dSAndroid Build Coastguard Worker        conn.send((self._id, methodname, args, kwds))
822*cda5da8dSAndroid Build Coastguard Worker        kind, result = conn.recv()
823*cda5da8dSAndroid Build Coastguard Worker
824*cda5da8dSAndroid Build Coastguard Worker        if kind == '#RETURN':
825*cda5da8dSAndroid Build Coastguard Worker            return result
826*cda5da8dSAndroid Build Coastguard Worker        elif kind == '#PROXY':
827*cda5da8dSAndroid Build Coastguard Worker            exposed, token = result
828*cda5da8dSAndroid Build Coastguard Worker            proxytype = self._manager._registry[token.typeid][-1]
829*cda5da8dSAndroid Build Coastguard Worker            token.address = self._token.address
830*cda5da8dSAndroid Build Coastguard Worker            proxy = proxytype(
831*cda5da8dSAndroid Build Coastguard Worker                token, self._serializer, manager=self._manager,
832*cda5da8dSAndroid Build Coastguard Worker                authkey=self._authkey, exposed=exposed
833*cda5da8dSAndroid Build Coastguard Worker                )
834*cda5da8dSAndroid Build Coastguard Worker            conn = self._Client(token.address, authkey=self._authkey)
835*cda5da8dSAndroid Build Coastguard Worker            dispatch(conn, None, 'decref', (token.id,))
836*cda5da8dSAndroid Build Coastguard Worker            return proxy
837*cda5da8dSAndroid Build Coastguard Worker        raise convert_to_error(kind, result)
838*cda5da8dSAndroid Build Coastguard Worker
839*cda5da8dSAndroid Build Coastguard Worker    def _getvalue(self):
840*cda5da8dSAndroid Build Coastguard Worker        '''
841*cda5da8dSAndroid Build Coastguard Worker        Get a copy of the value of the referent
842*cda5da8dSAndroid Build Coastguard Worker        '''
843*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('#GETVALUE')
844*cda5da8dSAndroid Build Coastguard Worker
845*cda5da8dSAndroid Build Coastguard Worker    def _incref(self):
846*cda5da8dSAndroid Build Coastguard Worker        if self._owned_by_manager:
847*cda5da8dSAndroid Build Coastguard Worker            util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
848*cda5da8dSAndroid Build Coastguard Worker            return
849*cda5da8dSAndroid Build Coastguard Worker
850*cda5da8dSAndroid Build Coastguard Worker        conn = self._Client(self._token.address, authkey=self._authkey)
851*cda5da8dSAndroid Build Coastguard Worker        dispatch(conn, None, 'incref', (self._id,))
852*cda5da8dSAndroid Build Coastguard Worker        util.debug('INCREF %r', self._token.id)
853*cda5da8dSAndroid Build Coastguard Worker
854*cda5da8dSAndroid Build Coastguard Worker        self._idset.add(self._id)
855*cda5da8dSAndroid Build Coastguard Worker
856*cda5da8dSAndroid Build Coastguard Worker        state = self._manager and self._manager._state
857*cda5da8dSAndroid Build Coastguard Worker
858*cda5da8dSAndroid Build Coastguard Worker        self._close = util.Finalize(
859*cda5da8dSAndroid Build Coastguard Worker            self, BaseProxy._decref,
860*cda5da8dSAndroid Build Coastguard Worker            args=(self._token, self._authkey, state,
861*cda5da8dSAndroid Build Coastguard Worker                  self._tls, self._idset, self._Client),
862*cda5da8dSAndroid Build Coastguard Worker            exitpriority=10
863*cda5da8dSAndroid Build Coastguard Worker            )
864*cda5da8dSAndroid Build Coastguard Worker
865*cda5da8dSAndroid Build Coastguard Worker    @staticmethod
866*cda5da8dSAndroid Build Coastguard Worker    def _decref(token, authkey, state, tls, idset, _Client):
867*cda5da8dSAndroid Build Coastguard Worker        idset.discard(token.id)
868*cda5da8dSAndroid Build Coastguard Worker
869*cda5da8dSAndroid Build Coastguard Worker        # check whether manager is still alive
870*cda5da8dSAndroid Build Coastguard Worker        if state is None or state.value == State.STARTED:
871*cda5da8dSAndroid Build Coastguard Worker            # tell manager this process no longer cares about referent
872*cda5da8dSAndroid Build Coastguard Worker            try:
873*cda5da8dSAndroid Build Coastguard Worker                util.debug('DECREF %r', token.id)
874*cda5da8dSAndroid Build Coastguard Worker                conn = _Client(token.address, authkey=authkey)
875*cda5da8dSAndroid Build Coastguard Worker                dispatch(conn, None, 'decref', (token.id,))
876*cda5da8dSAndroid Build Coastguard Worker            except Exception as e:
877*cda5da8dSAndroid Build Coastguard Worker                util.debug('... decref failed %s', e)
878*cda5da8dSAndroid Build Coastguard Worker
879*cda5da8dSAndroid Build Coastguard Worker        else:
880*cda5da8dSAndroid Build Coastguard Worker            util.debug('DECREF %r -- manager already shutdown', token.id)
881*cda5da8dSAndroid Build Coastguard Worker
882*cda5da8dSAndroid Build Coastguard Worker        # check whether we can close this thread's connection because
883*cda5da8dSAndroid Build Coastguard Worker        # the process owns no more references to objects for this manager
884*cda5da8dSAndroid Build Coastguard Worker        if not idset and hasattr(tls, 'connection'):
885*cda5da8dSAndroid Build Coastguard Worker            util.debug('thread %r has no more proxies so closing conn',
886*cda5da8dSAndroid Build Coastguard Worker                       threading.current_thread().name)
887*cda5da8dSAndroid Build Coastguard Worker            tls.connection.close()
888*cda5da8dSAndroid Build Coastguard Worker            del tls.connection
889*cda5da8dSAndroid Build Coastguard Worker
890*cda5da8dSAndroid Build Coastguard Worker    def _after_fork(self):
891*cda5da8dSAndroid Build Coastguard Worker        self._manager = None
892*cda5da8dSAndroid Build Coastguard Worker        try:
893*cda5da8dSAndroid Build Coastguard Worker            self._incref()
894*cda5da8dSAndroid Build Coastguard Worker        except Exception as e:
895*cda5da8dSAndroid Build Coastguard Worker            # the proxy may just be for a manager which has shutdown
896*cda5da8dSAndroid Build Coastguard Worker            util.info('incref failed: %s' % e)
897*cda5da8dSAndroid Build Coastguard Worker
898*cda5da8dSAndroid Build Coastguard Worker    def __reduce__(self):
899*cda5da8dSAndroid Build Coastguard Worker        kwds = {}
900*cda5da8dSAndroid Build Coastguard Worker        if get_spawning_popen() is not None:
901*cda5da8dSAndroid Build Coastguard Worker            kwds['authkey'] = self._authkey
902*cda5da8dSAndroid Build Coastguard Worker
903*cda5da8dSAndroid Build Coastguard Worker        if getattr(self, '_isauto', False):
904*cda5da8dSAndroid Build Coastguard Worker            kwds['exposed'] = self._exposed_
905*cda5da8dSAndroid Build Coastguard Worker            return (RebuildProxy,
906*cda5da8dSAndroid Build Coastguard Worker                    (AutoProxy, self._token, self._serializer, kwds))
907*cda5da8dSAndroid Build Coastguard Worker        else:
908*cda5da8dSAndroid Build Coastguard Worker            return (RebuildProxy,
909*cda5da8dSAndroid Build Coastguard Worker                    (type(self), self._token, self._serializer, kwds))
910*cda5da8dSAndroid Build Coastguard Worker
911*cda5da8dSAndroid Build Coastguard Worker    def __deepcopy__(self, memo):
912*cda5da8dSAndroid Build Coastguard Worker        return self._getvalue()
913*cda5da8dSAndroid Build Coastguard Worker
914*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
915*cda5da8dSAndroid Build Coastguard Worker        return '<%s object, typeid %r at %#x>' % \
916*cda5da8dSAndroid Build Coastguard Worker               (type(self).__name__, self._token.typeid, id(self))
917*cda5da8dSAndroid Build Coastguard Worker
918*cda5da8dSAndroid Build Coastguard Worker    def __str__(self):
919*cda5da8dSAndroid Build Coastguard Worker        '''
920*cda5da8dSAndroid Build Coastguard Worker        Return representation of the referent (or a fall-back if that fails)
921*cda5da8dSAndroid Build Coastguard Worker        '''
922*cda5da8dSAndroid Build Coastguard Worker        try:
923*cda5da8dSAndroid Build Coastguard Worker            return self._callmethod('__repr__')
924*cda5da8dSAndroid Build Coastguard Worker        except Exception:
925*cda5da8dSAndroid Build Coastguard Worker            return repr(self)[:-1] + "; '__str__()' failed>"
926*cda5da8dSAndroid Build Coastguard Worker
927*cda5da8dSAndroid Build Coastguard Worker#
928*cda5da8dSAndroid Build Coastguard Worker# Function used for unpickling
929*cda5da8dSAndroid Build Coastguard Worker#
930*cda5da8dSAndroid Build Coastguard Worker
931*cda5da8dSAndroid Build Coastguard Workerdef RebuildProxy(func, token, serializer, kwds):
932*cda5da8dSAndroid Build Coastguard Worker    '''
933*cda5da8dSAndroid Build Coastguard Worker    Function used for unpickling proxy objects.
934*cda5da8dSAndroid Build Coastguard Worker    '''
935*cda5da8dSAndroid Build Coastguard Worker    server = getattr(process.current_process(), '_manager_server', None)
936*cda5da8dSAndroid Build Coastguard Worker    if server and server.address == token.address:
937*cda5da8dSAndroid Build Coastguard Worker        util.debug('Rebuild a proxy owned by manager, token=%r', token)
938*cda5da8dSAndroid Build Coastguard Worker        kwds['manager_owned'] = True
939*cda5da8dSAndroid Build Coastguard Worker        if token.id not in server.id_to_local_proxy_obj:
940*cda5da8dSAndroid Build Coastguard Worker            server.id_to_local_proxy_obj[token.id] = \
941*cda5da8dSAndroid Build Coastguard Worker                server.id_to_obj[token.id]
942*cda5da8dSAndroid Build Coastguard Worker    incref = (
943*cda5da8dSAndroid Build Coastguard Worker        kwds.pop('incref', True) and
944*cda5da8dSAndroid Build Coastguard Worker        not getattr(process.current_process(), '_inheriting', False)
945*cda5da8dSAndroid Build Coastguard Worker        )
946*cda5da8dSAndroid Build Coastguard Worker    return func(token, serializer, incref=incref, **kwds)
947*cda5da8dSAndroid Build Coastguard Worker
948*cda5da8dSAndroid Build Coastguard Worker#
949*cda5da8dSAndroid Build Coastguard Worker# Functions to create proxies and proxy types
950*cda5da8dSAndroid Build Coastguard Worker#
951*cda5da8dSAndroid Build Coastguard Worker
952*cda5da8dSAndroid Build Coastguard Workerdef MakeProxyType(name, exposed, _cache={}):
953*cda5da8dSAndroid Build Coastguard Worker    '''
954*cda5da8dSAndroid Build Coastguard Worker    Return a proxy type whose methods are given by `exposed`
955*cda5da8dSAndroid Build Coastguard Worker    '''
956*cda5da8dSAndroid Build Coastguard Worker    exposed = tuple(exposed)
957*cda5da8dSAndroid Build Coastguard Worker    try:
958*cda5da8dSAndroid Build Coastguard Worker        return _cache[(name, exposed)]
959*cda5da8dSAndroid Build Coastguard Worker    except KeyError:
960*cda5da8dSAndroid Build Coastguard Worker        pass
961*cda5da8dSAndroid Build Coastguard Worker
962*cda5da8dSAndroid Build Coastguard Worker    dic = {}
963*cda5da8dSAndroid Build Coastguard Worker
964*cda5da8dSAndroid Build Coastguard Worker    for meth in exposed:
965*cda5da8dSAndroid Build Coastguard Worker        exec('''def %s(self, /, *args, **kwds):
966*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
967*cda5da8dSAndroid Build Coastguard Worker
968*cda5da8dSAndroid Build Coastguard Worker    ProxyType = type(name, (BaseProxy,), dic)
969*cda5da8dSAndroid Build Coastguard Worker    ProxyType._exposed_ = exposed
970*cda5da8dSAndroid Build Coastguard Worker    _cache[(name, exposed)] = ProxyType
971*cda5da8dSAndroid Build Coastguard Worker    return ProxyType
972*cda5da8dSAndroid Build Coastguard Worker
973*cda5da8dSAndroid Build Coastguard Worker
974*cda5da8dSAndroid Build Coastguard Workerdef AutoProxy(token, serializer, manager=None, authkey=None,
975*cda5da8dSAndroid Build Coastguard Worker              exposed=None, incref=True, manager_owned=False):
976*cda5da8dSAndroid Build Coastguard Worker    '''
977*cda5da8dSAndroid Build Coastguard Worker    Return an auto-proxy for `token`
978*cda5da8dSAndroid Build Coastguard Worker    '''
979*cda5da8dSAndroid Build Coastguard Worker    _Client = listener_client[serializer][1]
980*cda5da8dSAndroid Build Coastguard Worker
981*cda5da8dSAndroid Build Coastguard Worker    if exposed is None:
982*cda5da8dSAndroid Build Coastguard Worker        conn = _Client(token.address, authkey=authkey)
983*cda5da8dSAndroid Build Coastguard Worker        try:
984*cda5da8dSAndroid Build Coastguard Worker            exposed = dispatch(conn, None, 'get_methods', (token,))
985*cda5da8dSAndroid Build Coastguard Worker        finally:
986*cda5da8dSAndroid Build Coastguard Worker            conn.close()
987*cda5da8dSAndroid Build Coastguard Worker
988*cda5da8dSAndroid Build Coastguard Worker    if authkey is None and manager is not None:
989*cda5da8dSAndroid Build Coastguard Worker        authkey = manager._authkey
990*cda5da8dSAndroid Build Coastguard Worker    if authkey is None:
991*cda5da8dSAndroid Build Coastguard Worker        authkey = process.current_process().authkey
992*cda5da8dSAndroid Build Coastguard Worker
993*cda5da8dSAndroid Build Coastguard Worker    ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
994*cda5da8dSAndroid Build Coastguard Worker    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
995*cda5da8dSAndroid Build Coastguard Worker                      incref=incref, manager_owned=manager_owned)
996*cda5da8dSAndroid Build Coastguard Worker    proxy._isauto = True
997*cda5da8dSAndroid Build Coastguard Worker    return proxy
998*cda5da8dSAndroid Build Coastguard Worker
999*cda5da8dSAndroid Build Coastguard Worker#
1000*cda5da8dSAndroid Build Coastguard Worker# Types/callables which we will register with SyncManager
1001*cda5da8dSAndroid Build Coastguard Worker#
1002*cda5da8dSAndroid Build Coastguard Worker
1003*cda5da8dSAndroid Build Coastguard Workerclass Namespace(object):
1004*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, /, **kwds):
1005*cda5da8dSAndroid Build Coastguard Worker        self.__dict__.update(kwds)
1006*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
1007*cda5da8dSAndroid Build Coastguard Worker        items = list(self.__dict__.items())
1008*cda5da8dSAndroid Build Coastguard Worker        temp = []
1009*cda5da8dSAndroid Build Coastguard Worker        for name, value in items:
1010*cda5da8dSAndroid Build Coastguard Worker            if not name.startswith('_'):
1011*cda5da8dSAndroid Build Coastguard Worker                temp.append('%s=%r' % (name, value))
1012*cda5da8dSAndroid Build Coastguard Worker        temp.sort()
1013*cda5da8dSAndroid Build Coastguard Worker        return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
1014*cda5da8dSAndroid Build Coastguard Worker
1015*cda5da8dSAndroid Build Coastguard Workerclass Value(object):
1016*cda5da8dSAndroid Build Coastguard Worker    def __init__(self, typecode, value, lock=True):
1017*cda5da8dSAndroid Build Coastguard Worker        self._typecode = typecode
1018*cda5da8dSAndroid Build Coastguard Worker        self._value = value
1019*cda5da8dSAndroid Build Coastguard Worker    def get(self):
1020*cda5da8dSAndroid Build Coastguard Worker        return self._value
1021*cda5da8dSAndroid Build Coastguard Worker    def set(self, value):
1022*cda5da8dSAndroid Build Coastguard Worker        self._value = value
1023*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
1024*cda5da8dSAndroid Build Coastguard Worker        return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1025*cda5da8dSAndroid Build Coastguard Worker    value = property(get, set)
1026*cda5da8dSAndroid Build Coastguard Worker
1027*cda5da8dSAndroid Build Coastguard Workerdef Array(typecode, sequence, lock=True):
1028*cda5da8dSAndroid Build Coastguard Worker    return array.array(typecode, sequence)
1029*cda5da8dSAndroid Build Coastguard Worker
1030*cda5da8dSAndroid Build Coastguard Worker#
1031*cda5da8dSAndroid Build Coastguard Worker# Proxy types used by SyncManager
1032*cda5da8dSAndroid Build Coastguard Worker#
1033*cda5da8dSAndroid Build Coastguard Worker
1034*cda5da8dSAndroid Build Coastguard Workerclass IteratorProxy(BaseProxy):
1035*cda5da8dSAndroid Build Coastguard Worker    _exposed_ = ('__next__', 'send', 'throw', 'close')
1036*cda5da8dSAndroid Build Coastguard Worker    def __iter__(self):
1037*cda5da8dSAndroid Build Coastguard Worker        return self
1038*cda5da8dSAndroid Build Coastguard Worker    def __next__(self, *args):
1039*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('__next__', args)
1040*cda5da8dSAndroid Build Coastguard Worker    def send(self, *args):
1041*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('send', args)
1042*cda5da8dSAndroid Build Coastguard Worker    def throw(self, *args):
1043*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('throw', args)
1044*cda5da8dSAndroid Build Coastguard Worker    def close(self, *args):
1045*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('close', args)
1046*cda5da8dSAndroid Build Coastguard Worker
1047*cda5da8dSAndroid Build Coastguard Worker
1048*cda5da8dSAndroid Build Coastguard Workerclass AcquirerProxy(BaseProxy):
1049*cda5da8dSAndroid Build Coastguard Worker    _exposed_ = ('acquire', 'release')
1050*cda5da8dSAndroid Build Coastguard Worker    def acquire(self, blocking=True, timeout=None):
1051*cda5da8dSAndroid Build Coastguard Worker        args = (blocking,) if timeout is None else (blocking, timeout)
1052*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('acquire', args)
1053*cda5da8dSAndroid Build Coastguard Worker    def release(self):
1054*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('release')
1055*cda5da8dSAndroid Build Coastguard Worker    def __enter__(self):
1056*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('acquire')
1057*cda5da8dSAndroid Build Coastguard Worker    def __exit__(self, exc_type, exc_val, exc_tb):
1058*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('release')
1059*cda5da8dSAndroid Build Coastguard Worker
1060*cda5da8dSAndroid Build Coastguard Worker
1061*cda5da8dSAndroid Build Coastguard Workerclass ConditionProxy(AcquirerProxy):
1062*cda5da8dSAndroid Build Coastguard Worker    _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
1063*cda5da8dSAndroid Build Coastguard Worker    def wait(self, timeout=None):
1064*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('wait', (timeout,))
1065*cda5da8dSAndroid Build Coastguard Worker    def notify(self, n=1):
1066*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('notify', (n,))
1067*cda5da8dSAndroid Build Coastguard Worker    def notify_all(self):
1068*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('notify_all')
1069*cda5da8dSAndroid Build Coastguard Worker    def wait_for(self, predicate, timeout=None):
1070*cda5da8dSAndroid Build Coastguard Worker        result = predicate()
1071*cda5da8dSAndroid Build Coastguard Worker        if result:
1072*cda5da8dSAndroid Build Coastguard Worker            return result
1073*cda5da8dSAndroid Build Coastguard Worker        if timeout is not None:
1074*cda5da8dSAndroid Build Coastguard Worker            endtime = time.monotonic() + timeout
1075*cda5da8dSAndroid Build Coastguard Worker        else:
1076*cda5da8dSAndroid Build Coastguard Worker            endtime = None
1077*cda5da8dSAndroid Build Coastguard Worker            waittime = None
1078*cda5da8dSAndroid Build Coastguard Worker        while not result:
1079*cda5da8dSAndroid Build Coastguard Worker            if endtime is not None:
1080*cda5da8dSAndroid Build Coastguard Worker                waittime = endtime - time.monotonic()
1081*cda5da8dSAndroid Build Coastguard Worker                if waittime <= 0:
1082*cda5da8dSAndroid Build Coastguard Worker                    break
1083*cda5da8dSAndroid Build Coastguard Worker            self.wait(waittime)
1084*cda5da8dSAndroid Build Coastguard Worker            result = predicate()
1085*cda5da8dSAndroid Build Coastguard Worker        return result
1086*cda5da8dSAndroid Build Coastguard Worker
1087*cda5da8dSAndroid Build Coastguard Worker
1088*cda5da8dSAndroid Build Coastguard Workerclass EventProxy(BaseProxy):
1089*cda5da8dSAndroid Build Coastguard Worker    _exposed_ = ('is_set', 'set', 'clear', 'wait')
1090*cda5da8dSAndroid Build Coastguard Worker    def is_set(self):
1091*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('is_set')
1092*cda5da8dSAndroid Build Coastguard Worker    def set(self):
1093*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('set')
1094*cda5da8dSAndroid Build Coastguard Worker    def clear(self):
1095*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('clear')
1096*cda5da8dSAndroid Build Coastguard Worker    def wait(self, timeout=None):
1097*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('wait', (timeout,))
1098*cda5da8dSAndroid Build Coastguard Worker
1099*cda5da8dSAndroid Build Coastguard Worker
1100*cda5da8dSAndroid Build Coastguard Workerclass BarrierProxy(BaseProxy):
1101*cda5da8dSAndroid Build Coastguard Worker    _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1102*cda5da8dSAndroid Build Coastguard Worker    def wait(self, timeout=None):
1103*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('wait', (timeout,))
1104*cda5da8dSAndroid Build Coastguard Worker    def abort(self):
1105*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('abort')
1106*cda5da8dSAndroid Build Coastguard Worker    def reset(self):
1107*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('reset')
1108*cda5da8dSAndroid Build Coastguard Worker    @property
1109*cda5da8dSAndroid Build Coastguard Worker    def parties(self):
1110*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('__getattribute__', ('parties',))
1111*cda5da8dSAndroid Build Coastguard Worker    @property
1112*cda5da8dSAndroid Build Coastguard Worker    def n_waiting(self):
1113*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('__getattribute__', ('n_waiting',))
1114*cda5da8dSAndroid Build Coastguard Worker    @property
1115*cda5da8dSAndroid Build Coastguard Worker    def broken(self):
1116*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('__getattribute__', ('broken',))
1117*cda5da8dSAndroid Build Coastguard Worker
1118*cda5da8dSAndroid Build Coastguard Worker
1119*cda5da8dSAndroid Build Coastguard Workerclass NamespaceProxy(BaseProxy):
1120*cda5da8dSAndroid Build Coastguard Worker    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1121*cda5da8dSAndroid Build Coastguard Worker    def __getattr__(self, key):
1122*cda5da8dSAndroid Build Coastguard Worker        if key[0] == '_':
1123*cda5da8dSAndroid Build Coastguard Worker            return object.__getattribute__(self, key)
1124*cda5da8dSAndroid Build Coastguard Worker        callmethod = object.__getattribute__(self, '_callmethod')
1125*cda5da8dSAndroid Build Coastguard Worker        return callmethod('__getattribute__', (key,))
1126*cda5da8dSAndroid Build Coastguard Worker    def __setattr__(self, key, value):
1127*cda5da8dSAndroid Build Coastguard Worker        if key[0] == '_':
1128*cda5da8dSAndroid Build Coastguard Worker            return object.__setattr__(self, key, value)
1129*cda5da8dSAndroid Build Coastguard Worker        callmethod = object.__getattribute__(self, '_callmethod')
1130*cda5da8dSAndroid Build Coastguard Worker        return callmethod('__setattr__', (key, value))
1131*cda5da8dSAndroid Build Coastguard Worker    def __delattr__(self, key):
1132*cda5da8dSAndroid Build Coastguard Worker        if key[0] == '_':
1133*cda5da8dSAndroid Build Coastguard Worker            return object.__delattr__(self, key)
1134*cda5da8dSAndroid Build Coastguard Worker        callmethod = object.__getattribute__(self, '_callmethod')
1135*cda5da8dSAndroid Build Coastguard Worker        return callmethod('__delattr__', (key,))
1136*cda5da8dSAndroid Build Coastguard Worker
1137*cda5da8dSAndroid Build Coastguard Worker
1138*cda5da8dSAndroid Build Coastguard Workerclass ValueProxy(BaseProxy):
1139*cda5da8dSAndroid Build Coastguard Worker    _exposed_ = ('get', 'set')
1140*cda5da8dSAndroid Build Coastguard Worker    def get(self):
1141*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('get')
1142*cda5da8dSAndroid Build Coastguard Worker    def set(self, value):
1143*cda5da8dSAndroid Build Coastguard Worker        return self._callmethod('set', (value,))
1144*cda5da8dSAndroid Build Coastguard Worker    value = property(get, set)
1145*cda5da8dSAndroid Build Coastguard Worker
1146*cda5da8dSAndroid Build Coastguard Worker    __class_getitem__ = classmethod(types.GenericAlias)
1147*cda5da8dSAndroid Build Coastguard Worker
1148*cda5da8dSAndroid Build Coastguard Worker
1149*cda5da8dSAndroid Build Coastguard WorkerBaseListProxy = MakeProxyType('BaseListProxy', (
1150*cda5da8dSAndroid Build Coastguard Worker    '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1151*cda5da8dSAndroid Build Coastguard Worker    '__mul__', '__reversed__', '__rmul__', '__setitem__',
1152*cda5da8dSAndroid Build Coastguard Worker    'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1153*cda5da8dSAndroid Build Coastguard Worker    'reverse', 'sort', '__imul__'
1154*cda5da8dSAndroid Build Coastguard Worker    ))
1155*cda5da8dSAndroid Build Coastguard Workerclass ListProxy(BaseListProxy):
1156*cda5da8dSAndroid Build Coastguard Worker    def __iadd__(self, value):
1157*cda5da8dSAndroid Build Coastguard Worker        self._callmethod('extend', (value,))
1158*cda5da8dSAndroid Build Coastguard Worker        return self
1159*cda5da8dSAndroid Build Coastguard Worker    def __imul__(self, value):
1160*cda5da8dSAndroid Build Coastguard Worker        self._callmethod('__imul__', (value,))
1161*cda5da8dSAndroid Build Coastguard Worker        return self
1162*cda5da8dSAndroid Build Coastguard Worker
1163*cda5da8dSAndroid Build Coastguard Worker
1164*cda5da8dSAndroid Build Coastguard WorkerDictProxy = MakeProxyType('DictProxy', (
1165*cda5da8dSAndroid Build Coastguard Worker    '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
1166*cda5da8dSAndroid Build Coastguard Worker    '__setitem__', 'clear', 'copy', 'get', 'items',
1167*cda5da8dSAndroid Build Coastguard Worker    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1168*cda5da8dSAndroid Build Coastguard Worker    ))
1169*cda5da8dSAndroid Build Coastguard WorkerDictProxy._method_to_typeid_ = {
1170*cda5da8dSAndroid Build Coastguard Worker    '__iter__': 'Iterator',
1171*cda5da8dSAndroid Build Coastguard Worker    }
1172*cda5da8dSAndroid Build Coastguard Worker
1173*cda5da8dSAndroid Build Coastguard Worker
1174*cda5da8dSAndroid Build Coastguard WorkerArrayProxy = MakeProxyType('ArrayProxy', (
1175*cda5da8dSAndroid Build Coastguard Worker    '__len__', '__getitem__', '__setitem__'
1176*cda5da8dSAndroid Build Coastguard Worker    ))
1177*cda5da8dSAndroid Build Coastguard Worker
1178*cda5da8dSAndroid Build Coastguard Worker
1179*cda5da8dSAndroid Build Coastguard WorkerBasePoolProxy = MakeProxyType('PoolProxy', (
1180*cda5da8dSAndroid Build Coastguard Worker    'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1181*cda5da8dSAndroid Build Coastguard Worker    'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
1182*cda5da8dSAndroid Build Coastguard Worker    ))
1183*cda5da8dSAndroid Build Coastguard WorkerBasePoolProxy._method_to_typeid_ = {
1184*cda5da8dSAndroid Build Coastguard Worker    'apply_async': 'AsyncResult',
1185*cda5da8dSAndroid Build Coastguard Worker    'map_async': 'AsyncResult',
1186*cda5da8dSAndroid Build Coastguard Worker    'starmap_async': 'AsyncResult',
1187*cda5da8dSAndroid Build Coastguard Worker    'imap': 'Iterator',
1188*cda5da8dSAndroid Build Coastguard Worker    'imap_unordered': 'Iterator'
1189*cda5da8dSAndroid Build Coastguard Worker    }
1190*cda5da8dSAndroid Build Coastguard Workerclass PoolProxy(BasePoolProxy):
1191*cda5da8dSAndroid Build Coastguard Worker    def __enter__(self):
1192*cda5da8dSAndroid Build Coastguard Worker        return self
1193*cda5da8dSAndroid Build Coastguard Worker    def __exit__(self, exc_type, exc_val, exc_tb):
1194*cda5da8dSAndroid Build Coastguard Worker        self.terminate()
1195*cda5da8dSAndroid Build Coastguard Worker
1196*cda5da8dSAndroid Build Coastguard Worker#
1197*cda5da8dSAndroid Build Coastguard Worker# Definition of SyncManager
1198*cda5da8dSAndroid Build Coastguard Worker#
1199*cda5da8dSAndroid Build Coastguard Worker
1200*cda5da8dSAndroid Build Coastguard Workerclass SyncManager(BaseManager):
1201*cda5da8dSAndroid Build Coastguard Worker    '''
1202*cda5da8dSAndroid Build Coastguard Worker    Subclass of `BaseManager` which supports a number of shared object types.
1203*cda5da8dSAndroid Build Coastguard Worker
1204*cda5da8dSAndroid Build Coastguard Worker    The types registered are those intended for the synchronization
1205*cda5da8dSAndroid Build Coastguard Worker    of threads, plus `dict`, `list` and `Namespace`.
1206*cda5da8dSAndroid Build Coastguard Worker
1207*cda5da8dSAndroid Build Coastguard Worker    The `multiprocessing.Manager()` function creates started instances of
1208*cda5da8dSAndroid Build Coastguard Worker    this class.
1209*cda5da8dSAndroid Build Coastguard Worker    '''
1210*cda5da8dSAndroid Build Coastguard Worker
1211*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Queue', queue.Queue)
1212*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('JoinableQueue', queue.Queue)
1213*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Event', threading.Event, EventProxy)
1214*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Lock', threading.Lock, AcquirerProxy)
1215*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('RLock', threading.RLock, AcquirerProxy)
1216*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1217*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1218*cda5da8dSAndroid Build Coastguard Worker                     AcquirerProxy)
1219*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Condition', threading.Condition, ConditionProxy)
1220*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Barrier', threading.Barrier, BarrierProxy)
1221*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Pool', pool.Pool, PoolProxy)
1222*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('list', list, ListProxy)
1223*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('dict', dict, DictProxy)
1224*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Value', Value, ValueProxy)
1225*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Array', Array, ArrayProxy)
1226*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Namespace', Namespace, NamespaceProxy)
1227*cda5da8dSAndroid Build Coastguard Worker
1228*cda5da8dSAndroid Build Coastguard Worker# types returned by methods of PoolProxy
1229*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1230*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('AsyncResult', create_method=False)
1231*cda5da8dSAndroid Build Coastguard Worker
1232*cda5da8dSAndroid Build Coastguard Worker#
1233*cda5da8dSAndroid Build Coastguard Worker# Definition of SharedMemoryManager and SharedMemoryServer
1234*cda5da8dSAndroid Build Coastguard Worker#
1235*cda5da8dSAndroid Build Coastguard Worker
1236*cda5da8dSAndroid Build Coastguard Workerif HAS_SHMEM:
1237*cda5da8dSAndroid Build Coastguard Worker    class _SharedMemoryTracker:
1238*cda5da8dSAndroid Build Coastguard Worker        "Manages one or more shared memory segments."
1239*cda5da8dSAndroid Build Coastguard Worker
1240*cda5da8dSAndroid Build Coastguard Worker        def __init__(self, name, segment_names=[]):
1241*cda5da8dSAndroid Build Coastguard Worker            self.shared_memory_context_name = name
1242*cda5da8dSAndroid Build Coastguard Worker            self.segment_names = segment_names
1243*cda5da8dSAndroid Build Coastguard Worker
1244*cda5da8dSAndroid Build Coastguard Worker        def register_segment(self, segment_name):
1245*cda5da8dSAndroid Build Coastguard Worker            "Adds the supplied shared memory block name to tracker."
1246*cda5da8dSAndroid Build Coastguard Worker            util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1247*cda5da8dSAndroid Build Coastguard Worker            self.segment_names.append(segment_name)
1248*cda5da8dSAndroid Build Coastguard Worker
1249*cda5da8dSAndroid Build Coastguard Worker        def destroy_segment(self, segment_name):
1250*cda5da8dSAndroid Build Coastguard Worker            """Calls unlink() on the shared memory block with the supplied name
1251*cda5da8dSAndroid Build Coastguard Worker            and removes it from the list of blocks being tracked."""
1252*cda5da8dSAndroid Build Coastguard Worker            util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1253*cda5da8dSAndroid Build Coastguard Worker            self.segment_names.remove(segment_name)
1254*cda5da8dSAndroid Build Coastguard Worker            segment = shared_memory.SharedMemory(segment_name)
1255*cda5da8dSAndroid Build Coastguard Worker            segment.close()
1256*cda5da8dSAndroid Build Coastguard Worker            segment.unlink()
1257*cda5da8dSAndroid Build Coastguard Worker
1258*cda5da8dSAndroid Build Coastguard Worker        def unlink(self):
1259*cda5da8dSAndroid Build Coastguard Worker            "Calls destroy_segment() on all tracked shared memory blocks."
1260*cda5da8dSAndroid Build Coastguard Worker            for segment_name in self.segment_names[:]:
1261*cda5da8dSAndroid Build Coastguard Worker                self.destroy_segment(segment_name)
1262*cda5da8dSAndroid Build Coastguard Worker
1263*cda5da8dSAndroid Build Coastguard Worker        def __del__(self):
1264*cda5da8dSAndroid Build Coastguard Worker            util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1265*cda5da8dSAndroid Build Coastguard Worker            self.unlink()
1266*cda5da8dSAndroid Build Coastguard Worker
1267*cda5da8dSAndroid Build Coastguard Worker        def __getstate__(self):
1268*cda5da8dSAndroid Build Coastguard Worker            return (self.shared_memory_context_name, self.segment_names)
1269*cda5da8dSAndroid Build Coastguard Worker
1270*cda5da8dSAndroid Build Coastguard Worker        def __setstate__(self, state):
1271*cda5da8dSAndroid Build Coastguard Worker            self.__init__(*state)
1272*cda5da8dSAndroid Build Coastguard Worker
1273*cda5da8dSAndroid Build Coastguard Worker
1274*cda5da8dSAndroid Build Coastguard Worker    class SharedMemoryServer(Server):
1275*cda5da8dSAndroid Build Coastguard Worker
1276*cda5da8dSAndroid Build Coastguard Worker        public = Server.public + \
1277*cda5da8dSAndroid Build Coastguard Worker                 ['track_segment', 'release_segment', 'list_segments']
1278*cda5da8dSAndroid Build Coastguard Worker
1279*cda5da8dSAndroid Build Coastguard Worker        def __init__(self, *args, **kwargs):
1280*cda5da8dSAndroid Build Coastguard Worker            Server.__init__(self, *args, **kwargs)
1281*cda5da8dSAndroid Build Coastguard Worker            address = self.address
1282*cda5da8dSAndroid Build Coastguard Worker            # The address of Linux abstract namespaces can be bytes
1283*cda5da8dSAndroid Build Coastguard Worker            if isinstance(address, bytes):
1284*cda5da8dSAndroid Build Coastguard Worker                address = os.fsdecode(address)
1285*cda5da8dSAndroid Build Coastguard Worker            self.shared_memory_context = \
1286*cda5da8dSAndroid Build Coastguard Worker                _SharedMemoryTracker(f"shm_{address}_{getpid()}")
1287*cda5da8dSAndroid Build Coastguard Worker            util.debug(f"SharedMemoryServer started by pid {getpid()}")
1288*cda5da8dSAndroid Build Coastguard Worker
1289*cda5da8dSAndroid Build Coastguard Worker        def create(self, c, typeid, /, *args, **kwargs):
1290*cda5da8dSAndroid Build Coastguard Worker            """Create a new distributed-shared object (not backed by a shared
1291*cda5da8dSAndroid Build Coastguard Worker            memory block) and return its id to be used in a Proxy Object."""
1292*cda5da8dSAndroid Build Coastguard Worker            # Unless set up as a shared proxy, don't make shared_memory_context
1293*cda5da8dSAndroid Build Coastguard Worker            # a standard part of kwargs.  This makes things easier for supplying
1294*cda5da8dSAndroid Build Coastguard Worker            # simple functions.
1295*cda5da8dSAndroid Build Coastguard Worker            if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1296*cda5da8dSAndroid Build Coastguard Worker                kwargs['shared_memory_context'] = self.shared_memory_context
1297*cda5da8dSAndroid Build Coastguard Worker            return Server.create(self, c, typeid, *args, **kwargs)
1298*cda5da8dSAndroid Build Coastguard Worker
1299*cda5da8dSAndroid Build Coastguard Worker        def shutdown(self, c):
1300*cda5da8dSAndroid Build Coastguard Worker            "Call unlink() on all tracked shared memory, terminate the Server."
1301*cda5da8dSAndroid Build Coastguard Worker            self.shared_memory_context.unlink()
1302*cda5da8dSAndroid Build Coastguard Worker            return Server.shutdown(self, c)
1303*cda5da8dSAndroid Build Coastguard Worker
1304*cda5da8dSAndroid Build Coastguard Worker        def track_segment(self, c, segment_name):
1305*cda5da8dSAndroid Build Coastguard Worker            "Adds the supplied shared memory block name to Server's tracker."
1306*cda5da8dSAndroid Build Coastguard Worker            self.shared_memory_context.register_segment(segment_name)
1307*cda5da8dSAndroid Build Coastguard Worker
1308*cda5da8dSAndroid Build Coastguard Worker        def release_segment(self, c, segment_name):
1309*cda5da8dSAndroid Build Coastguard Worker            """Calls unlink() on the shared memory block with the supplied name
1310*cda5da8dSAndroid Build Coastguard Worker            and removes it from the tracker instance inside the Server."""
1311*cda5da8dSAndroid Build Coastguard Worker            self.shared_memory_context.destroy_segment(segment_name)
1312*cda5da8dSAndroid Build Coastguard Worker
1313*cda5da8dSAndroid Build Coastguard Worker        def list_segments(self, c):
1314*cda5da8dSAndroid Build Coastguard Worker            """Returns a list of names of shared memory blocks that the Server
1315*cda5da8dSAndroid Build Coastguard Worker            is currently tracking."""
1316*cda5da8dSAndroid Build Coastguard Worker            return self.shared_memory_context.segment_names
1317*cda5da8dSAndroid Build Coastguard Worker
1318*cda5da8dSAndroid Build Coastguard Worker
1319*cda5da8dSAndroid Build Coastguard Worker    class SharedMemoryManager(BaseManager):
1320*cda5da8dSAndroid Build Coastguard Worker        """Like SyncManager but uses SharedMemoryServer instead of Server.
1321*cda5da8dSAndroid Build Coastguard Worker
1322*cda5da8dSAndroid Build Coastguard Worker        It provides methods for creating and returning SharedMemory instances
1323*cda5da8dSAndroid Build Coastguard Worker        and for creating a list-like object (ShareableList) backed by shared
1324*cda5da8dSAndroid Build Coastguard Worker        memory.  It also provides methods that create and return Proxy Objects
1325*cda5da8dSAndroid Build Coastguard Worker        that support synchronization across processes (i.e. multi-process-safe
1326*cda5da8dSAndroid Build Coastguard Worker        locks and semaphores).
1327*cda5da8dSAndroid Build Coastguard Worker        """
1328*cda5da8dSAndroid Build Coastguard Worker
1329*cda5da8dSAndroid Build Coastguard Worker        _Server = SharedMemoryServer
1330*cda5da8dSAndroid Build Coastguard Worker
1331*cda5da8dSAndroid Build Coastguard Worker        def __init__(self, *args, **kwargs):
1332*cda5da8dSAndroid Build Coastguard Worker            if os.name == "posix":
1333*cda5da8dSAndroid Build Coastguard Worker                # bpo-36867: Ensure the resource_tracker is running before
1334*cda5da8dSAndroid Build Coastguard Worker                # launching the manager process, so that concurrent
1335*cda5da8dSAndroid Build Coastguard Worker                # shared_memory manipulation both in the manager and in the
1336*cda5da8dSAndroid Build Coastguard Worker                # current process does not create two resource_tracker
1337*cda5da8dSAndroid Build Coastguard Worker                # processes.
1338*cda5da8dSAndroid Build Coastguard Worker                from . import resource_tracker
1339*cda5da8dSAndroid Build Coastguard Worker                resource_tracker.ensure_running()
1340*cda5da8dSAndroid Build Coastguard Worker            BaseManager.__init__(self, *args, **kwargs)
1341*cda5da8dSAndroid Build Coastguard Worker            util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1342*cda5da8dSAndroid Build Coastguard Worker
1343*cda5da8dSAndroid Build Coastguard Worker        def __del__(self):
1344*cda5da8dSAndroid Build Coastguard Worker            util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1345*cda5da8dSAndroid Build Coastguard Worker
1346*cda5da8dSAndroid Build Coastguard Worker        def get_server(self):
1347*cda5da8dSAndroid Build Coastguard Worker            'Better than monkeypatching for now; merge into Server ultimately'
1348*cda5da8dSAndroid Build Coastguard Worker            if self._state.value != State.INITIAL:
1349*cda5da8dSAndroid Build Coastguard Worker                if self._state.value == State.STARTED:
1350*cda5da8dSAndroid Build Coastguard Worker                    raise ProcessError("Already started SharedMemoryServer")
1351*cda5da8dSAndroid Build Coastguard Worker                elif self._state.value == State.SHUTDOWN:
1352*cda5da8dSAndroid Build Coastguard Worker                    raise ProcessError("SharedMemoryManager has shut down")
1353*cda5da8dSAndroid Build Coastguard Worker                else:
1354*cda5da8dSAndroid Build Coastguard Worker                    raise ProcessError(
1355*cda5da8dSAndroid Build Coastguard Worker                        "Unknown state {!r}".format(self._state.value))
1356*cda5da8dSAndroid Build Coastguard Worker            return self._Server(self._registry, self._address,
1357*cda5da8dSAndroid Build Coastguard Worker                                self._authkey, self._serializer)
1358*cda5da8dSAndroid Build Coastguard Worker
1359*cda5da8dSAndroid Build Coastguard Worker        def SharedMemory(self, size):
1360*cda5da8dSAndroid Build Coastguard Worker            """Returns a new SharedMemory instance with the specified size in
1361*cda5da8dSAndroid Build Coastguard Worker            bytes, to be tracked by the manager."""
1362*cda5da8dSAndroid Build Coastguard Worker            with self._Client(self._address, authkey=self._authkey) as conn:
1363*cda5da8dSAndroid Build Coastguard Worker                sms = shared_memory.SharedMemory(None, create=True, size=size)
1364*cda5da8dSAndroid Build Coastguard Worker                try:
1365*cda5da8dSAndroid Build Coastguard Worker                    dispatch(conn, None, 'track_segment', (sms.name,))
1366*cda5da8dSAndroid Build Coastguard Worker                except BaseException as e:
1367*cda5da8dSAndroid Build Coastguard Worker                    sms.unlink()
1368*cda5da8dSAndroid Build Coastguard Worker                    raise e
1369*cda5da8dSAndroid Build Coastguard Worker            return sms
1370*cda5da8dSAndroid Build Coastguard Worker
1371*cda5da8dSAndroid Build Coastguard Worker        def ShareableList(self, sequence):
1372*cda5da8dSAndroid Build Coastguard Worker            """Returns a new ShareableList instance populated with the values
1373*cda5da8dSAndroid Build Coastguard Worker            from the input sequence, to be tracked by the manager."""
1374*cda5da8dSAndroid Build Coastguard Worker            with self._Client(self._address, authkey=self._authkey) as conn:
1375*cda5da8dSAndroid Build Coastguard Worker                sl = shared_memory.ShareableList(sequence)
1376*cda5da8dSAndroid Build Coastguard Worker                try:
1377*cda5da8dSAndroid Build Coastguard Worker                    dispatch(conn, None, 'track_segment', (sl.shm.name,))
1378*cda5da8dSAndroid Build Coastguard Worker                except BaseException as e:
1379*cda5da8dSAndroid Build Coastguard Worker                    sl.shm.unlink()
1380*cda5da8dSAndroid Build Coastguard Worker                    raise e
1381*cda5da8dSAndroid Build Coastguard Worker            return sl
1382