1*cda5da8dSAndroid Build Coastguard Worker# 2*cda5da8dSAndroid Build Coastguard Worker# Module providing manager classes for dealing 3*cda5da8dSAndroid Build Coastguard Worker# with shared objects 4*cda5da8dSAndroid Build Coastguard Worker# 5*cda5da8dSAndroid Build Coastguard Worker# multiprocessing/managers.py 6*cda5da8dSAndroid Build Coastguard Worker# 7*cda5da8dSAndroid Build Coastguard Worker# Copyright (c) 2006-2008, R Oudkerk 8*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement. 9*cda5da8dSAndroid Build Coastguard Worker# 10*cda5da8dSAndroid Build Coastguard Worker 11*cda5da8dSAndroid Build Coastguard Worker__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] 12*cda5da8dSAndroid Build Coastguard Worker 13*cda5da8dSAndroid Build Coastguard Worker# 14*cda5da8dSAndroid Build Coastguard Worker# Imports 15*cda5da8dSAndroid Build Coastguard Worker# 16*cda5da8dSAndroid Build Coastguard Worker 17*cda5da8dSAndroid Build Coastguard Workerimport sys 18*cda5da8dSAndroid Build Coastguard Workerimport threading 19*cda5da8dSAndroid Build Coastguard Workerimport signal 20*cda5da8dSAndroid Build Coastguard Workerimport array 21*cda5da8dSAndroid Build Coastguard Workerimport queue 22*cda5da8dSAndroid Build Coastguard Workerimport time 23*cda5da8dSAndroid Build Coastguard Workerimport types 24*cda5da8dSAndroid Build Coastguard Workerimport os 25*cda5da8dSAndroid Build Coastguard Workerfrom os import getpid 26*cda5da8dSAndroid Build Coastguard Worker 27*cda5da8dSAndroid Build Coastguard Workerfrom traceback import format_exc 28*cda5da8dSAndroid Build Coastguard Worker 29*cda5da8dSAndroid Build Coastguard Workerfrom . import connection 30*cda5da8dSAndroid Build Coastguard Workerfrom .context import reduction, get_spawning_popen, ProcessError 31*cda5da8dSAndroid Build Coastguard Workerfrom . import pool 32*cda5da8dSAndroid Build Coastguard Workerfrom . import process 33*cda5da8dSAndroid Build Coastguard Workerfrom . import util 34*cda5da8dSAndroid Build Coastguard Workerfrom . import get_context 35*cda5da8dSAndroid Build Coastguard Workertry: 36*cda5da8dSAndroid Build Coastguard Worker from . import shared_memory 37*cda5da8dSAndroid Build Coastguard Workerexcept ImportError: 38*cda5da8dSAndroid Build Coastguard Worker HAS_SHMEM = False 39*cda5da8dSAndroid Build Coastguard Workerelse: 40*cda5da8dSAndroid Build Coastguard Worker HAS_SHMEM = True 41*cda5da8dSAndroid Build Coastguard Worker __all__.append('SharedMemoryManager') 42*cda5da8dSAndroid Build Coastguard Worker 43*cda5da8dSAndroid Build Coastguard Worker# 44*cda5da8dSAndroid Build Coastguard Worker# Register some things for pickling 45*cda5da8dSAndroid Build Coastguard Worker# 46*cda5da8dSAndroid Build Coastguard Worker 47*cda5da8dSAndroid Build Coastguard Workerdef reduce_array(a): 48*cda5da8dSAndroid Build Coastguard Worker return array.array, (a.typecode, a.tobytes()) 49*cda5da8dSAndroid Build Coastguard Workerreduction.register(array.array, reduce_array) 50*cda5da8dSAndroid Build Coastguard Worker 51*cda5da8dSAndroid Build Coastguard Workerview_types = [type(getattr({}, name)()) for name in ('items','keys','values')] 52*cda5da8dSAndroid Build Coastguard Workerdef rebuild_as_list(obj): 53*cda5da8dSAndroid Build Coastguard Worker return list, (list(obj),) 54*cda5da8dSAndroid Build Coastguard Workerfor view_type in view_types: 55*cda5da8dSAndroid Build Coastguard Worker reduction.register(view_type, rebuild_as_list) 56*cda5da8dSAndroid Build Coastguard Workerdel view_type, view_types 57*cda5da8dSAndroid Build Coastguard Worker 58*cda5da8dSAndroid Build Coastguard Worker# 59*cda5da8dSAndroid Build Coastguard Worker# Type for identifying shared objects 60*cda5da8dSAndroid Build Coastguard Worker# 61*cda5da8dSAndroid Build Coastguard Worker 62*cda5da8dSAndroid Build Coastguard Workerclass Token(object): 63*cda5da8dSAndroid Build Coastguard Worker ''' 64*cda5da8dSAndroid Build Coastguard Worker Type to uniquely identify a shared object 65*cda5da8dSAndroid Build Coastguard Worker ''' 66*cda5da8dSAndroid Build Coastguard Worker __slots__ = ('typeid', 'address', 'id') 67*cda5da8dSAndroid Build Coastguard Worker 68*cda5da8dSAndroid Build Coastguard Worker def __init__(self, typeid, address, id): 69*cda5da8dSAndroid Build Coastguard Worker (self.typeid, self.address, self.id) = (typeid, address, id) 70*cda5da8dSAndroid Build Coastguard Worker 71*cda5da8dSAndroid Build Coastguard Worker def __getstate__(self): 72*cda5da8dSAndroid Build Coastguard Worker return (self.typeid, self.address, self.id) 73*cda5da8dSAndroid Build Coastguard Worker 74*cda5da8dSAndroid Build Coastguard Worker def __setstate__(self, state): 75*cda5da8dSAndroid Build Coastguard Worker (self.typeid, self.address, self.id) = state 76*cda5da8dSAndroid Build Coastguard Worker 77*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 78*cda5da8dSAndroid Build Coastguard Worker return '%s(typeid=%r, address=%r, id=%r)' % \ 79*cda5da8dSAndroid Build Coastguard Worker (self.__class__.__name__, self.typeid, self.address, self.id) 80*cda5da8dSAndroid Build Coastguard Worker 81*cda5da8dSAndroid Build Coastguard Worker# 82*cda5da8dSAndroid Build Coastguard Worker# Function for communication with a manager's server process 83*cda5da8dSAndroid Build Coastguard Worker# 84*cda5da8dSAndroid Build Coastguard Worker 85*cda5da8dSAndroid Build Coastguard Workerdef dispatch(c, id, methodname, args=(), kwds={}): 86*cda5da8dSAndroid Build Coastguard Worker ''' 87*cda5da8dSAndroid Build Coastguard Worker Send a message to manager using connection `c` and return response 88*cda5da8dSAndroid Build Coastguard Worker ''' 89*cda5da8dSAndroid Build Coastguard Worker c.send((id, methodname, args, kwds)) 90*cda5da8dSAndroid Build Coastguard Worker kind, result = c.recv() 91*cda5da8dSAndroid Build Coastguard Worker if kind == '#RETURN': 92*cda5da8dSAndroid Build Coastguard Worker return result 93*cda5da8dSAndroid Build Coastguard Worker raise convert_to_error(kind, result) 94*cda5da8dSAndroid Build Coastguard Worker 95*cda5da8dSAndroid Build Coastguard Workerdef convert_to_error(kind, result): 96*cda5da8dSAndroid Build Coastguard Worker if kind == '#ERROR': 97*cda5da8dSAndroid Build Coastguard Worker return result 98*cda5da8dSAndroid Build Coastguard Worker elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'): 99*cda5da8dSAndroid Build Coastguard Worker if not isinstance(result, str): 100*cda5da8dSAndroid Build Coastguard Worker raise TypeError( 101*cda5da8dSAndroid Build Coastguard Worker "Result {0!r} (kind '{1}') type is {2}, not str".format( 102*cda5da8dSAndroid Build Coastguard Worker result, kind, type(result))) 103*cda5da8dSAndroid Build Coastguard Worker if kind == '#UNSERIALIZABLE': 104*cda5da8dSAndroid Build Coastguard Worker return RemoteError('Unserializable message: %s\n' % result) 105*cda5da8dSAndroid Build Coastguard Worker else: 106*cda5da8dSAndroid Build Coastguard Worker return RemoteError(result) 107*cda5da8dSAndroid Build Coastguard Worker else: 108*cda5da8dSAndroid Build Coastguard Worker return ValueError('Unrecognized message type {!r}'.format(kind)) 109*cda5da8dSAndroid Build Coastguard Worker 110*cda5da8dSAndroid Build Coastguard Workerclass RemoteError(Exception): 111*cda5da8dSAndroid Build Coastguard Worker def __str__(self): 112*cda5da8dSAndroid Build Coastguard Worker return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) 113*cda5da8dSAndroid Build Coastguard Worker 114*cda5da8dSAndroid Build Coastguard Worker# 115*cda5da8dSAndroid Build Coastguard Worker# Functions for finding the method names of an object 116*cda5da8dSAndroid Build Coastguard Worker# 117*cda5da8dSAndroid Build Coastguard Worker 118*cda5da8dSAndroid Build Coastguard Workerdef all_methods(obj): 119*cda5da8dSAndroid Build Coastguard Worker ''' 120*cda5da8dSAndroid Build Coastguard Worker Return a list of names of methods of `obj` 121*cda5da8dSAndroid Build Coastguard Worker ''' 122*cda5da8dSAndroid Build Coastguard Worker temp = [] 123*cda5da8dSAndroid Build Coastguard Worker for name in dir(obj): 124*cda5da8dSAndroid Build Coastguard Worker func = getattr(obj, name) 125*cda5da8dSAndroid Build Coastguard Worker if callable(func): 126*cda5da8dSAndroid Build Coastguard Worker temp.append(name) 127*cda5da8dSAndroid Build Coastguard Worker return temp 128*cda5da8dSAndroid Build Coastguard Worker 129*cda5da8dSAndroid Build Coastguard Workerdef public_methods(obj): 130*cda5da8dSAndroid Build Coastguard Worker ''' 131*cda5da8dSAndroid Build Coastguard Worker Return a list of names of methods of `obj` which do not start with '_' 132*cda5da8dSAndroid Build Coastguard Worker ''' 133*cda5da8dSAndroid Build Coastguard Worker return [name for name in all_methods(obj) if name[0] != '_'] 134*cda5da8dSAndroid Build Coastguard Worker 135*cda5da8dSAndroid Build Coastguard Worker# 136*cda5da8dSAndroid Build Coastguard Worker# Server which is run in a process controlled by a manager 137*cda5da8dSAndroid Build Coastguard Worker# 138*cda5da8dSAndroid Build Coastguard Worker 139*cda5da8dSAndroid Build Coastguard Workerclass Server(object): 140*cda5da8dSAndroid Build Coastguard Worker ''' 141*cda5da8dSAndroid Build Coastguard Worker Server class which runs in a process controlled by a manager object 142*cda5da8dSAndroid Build Coastguard Worker ''' 143*cda5da8dSAndroid Build Coastguard Worker public = ['shutdown', 'create', 'accept_connection', 'get_methods', 144*cda5da8dSAndroid Build Coastguard Worker 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] 145*cda5da8dSAndroid Build Coastguard Worker 146*cda5da8dSAndroid Build Coastguard Worker def __init__(self, registry, address, authkey, serializer): 147*cda5da8dSAndroid Build Coastguard Worker if not isinstance(authkey, bytes): 148*cda5da8dSAndroid Build Coastguard Worker raise TypeError( 149*cda5da8dSAndroid Build Coastguard Worker "Authkey {0!r} is type {1!s}, not bytes".format( 150*cda5da8dSAndroid Build Coastguard Worker authkey, type(authkey))) 151*cda5da8dSAndroid Build Coastguard Worker self.registry = registry 152*cda5da8dSAndroid Build Coastguard Worker self.authkey = process.AuthenticationString(authkey) 153*cda5da8dSAndroid Build Coastguard Worker Listener, Client = listener_client[serializer] 154*cda5da8dSAndroid Build Coastguard Worker 155*cda5da8dSAndroid Build Coastguard Worker # do authentication later 156*cda5da8dSAndroid Build Coastguard Worker self.listener = Listener(address=address, backlog=16) 157*cda5da8dSAndroid Build Coastguard Worker self.address = self.listener.address 158*cda5da8dSAndroid Build Coastguard Worker 159*cda5da8dSAndroid Build Coastguard Worker self.id_to_obj = {'0': (None, ())} 160*cda5da8dSAndroid Build Coastguard Worker self.id_to_refcount = {} 161*cda5da8dSAndroid Build Coastguard Worker self.id_to_local_proxy_obj = {} 162*cda5da8dSAndroid Build Coastguard Worker self.mutex = threading.Lock() 163*cda5da8dSAndroid Build Coastguard Worker 164*cda5da8dSAndroid Build Coastguard Worker def serve_forever(self): 165*cda5da8dSAndroid Build Coastguard Worker ''' 166*cda5da8dSAndroid Build Coastguard Worker Run the server forever 167*cda5da8dSAndroid Build Coastguard Worker ''' 168*cda5da8dSAndroid Build Coastguard Worker self.stop_event = threading.Event() 169*cda5da8dSAndroid Build Coastguard Worker process.current_process()._manager_server = self 170*cda5da8dSAndroid Build Coastguard Worker try: 171*cda5da8dSAndroid Build Coastguard Worker accepter = threading.Thread(target=self.accepter) 172*cda5da8dSAndroid Build Coastguard Worker accepter.daemon = True 173*cda5da8dSAndroid Build Coastguard Worker accepter.start() 174*cda5da8dSAndroid Build Coastguard Worker try: 175*cda5da8dSAndroid Build Coastguard Worker while not self.stop_event.is_set(): 176*cda5da8dSAndroid Build Coastguard Worker self.stop_event.wait(1) 177*cda5da8dSAndroid Build Coastguard Worker except (KeyboardInterrupt, SystemExit): 178*cda5da8dSAndroid Build Coastguard Worker pass 179*cda5da8dSAndroid Build Coastguard Worker finally: 180*cda5da8dSAndroid Build Coastguard Worker if sys.stdout != sys.__stdout__: # what about stderr? 181*cda5da8dSAndroid Build Coastguard Worker util.debug('resetting stdout, stderr') 182*cda5da8dSAndroid Build Coastguard Worker sys.stdout = sys.__stdout__ 183*cda5da8dSAndroid Build Coastguard Worker sys.stderr = sys.__stderr__ 184*cda5da8dSAndroid Build Coastguard Worker sys.exit(0) 185*cda5da8dSAndroid Build Coastguard Worker 186*cda5da8dSAndroid Build Coastguard Worker def accepter(self): 187*cda5da8dSAndroid Build Coastguard Worker while True: 188*cda5da8dSAndroid Build Coastguard Worker try: 189*cda5da8dSAndroid Build Coastguard Worker c = self.listener.accept() 190*cda5da8dSAndroid Build Coastguard Worker except OSError: 191*cda5da8dSAndroid Build Coastguard Worker continue 192*cda5da8dSAndroid Build Coastguard Worker t = threading.Thread(target=self.handle_request, args=(c,)) 193*cda5da8dSAndroid Build Coastguard Worker t.daemon = True 194*cda5da8dSAndroid Build Coastguard Worker t.start() 195*cda5da8dSAndroid Build Coastguard Worker 196*cda5da8dSAndroid Build Coastguard Worker def _handle_request(self, c): 197*cda5da8dSAndroid Build Coastguard Worker request = None 198*cda5da8dSAndroid Build Coastguard Worker try: 199*cda5da8dSAndroid Build Coastguard Worker connection.deliver_challenge(c, self.authkey) 200*cda5da8dSAndroid Build Coastguard Worker connection.answer_challenge(c, self.authkey) 201*cda5da8dSAndroid Build Coastguard Worker request = c.recv() 202*cda5da8dSAndroid Build Coastguard Worker ignore, funcname, args, kwds = request 203*cda5da8dSAndroid Build Coastguard Worker assert funcname in self.public, '%r unrecognized' % funcname 204*cda5da8dSAndroid Build Coastguard Worker func = getattr(self, funcname) 205*cda5da8dSAndroid Build Coastguard Worker except Exception: 206*cda5da8dSAndroid Build Coastguard Worker msg = ('#TRACEBACK', format_exc()) 207*cda5da8dSAndroid Build Coastguard Worker else: 208*cda5da8dSAndroid Build Coastguard Worker try: 209*cda5da8dSAndroid Build Coastguard Worker result = func(c, *args, **kwds) 210*cda5da8dSAndroid Build Coastguard Worker except Exception: 211*cda5da8dSAndroid Build Coastguard Worker msg = ('#TRACEBACK', format_exc()) 212*cda5da8dSAndroid Build Coastguard Worker else: 213*cda5da8dSAndroid Build Coastguard Worker msg = ('#RETURN', result) 214*cda5da8dSAndroid Build Coastguard Worker 215*cda5da8dSAndroid Build Coastguard Worker try: 216*cda5da8dSAndroid Build Coastguard Worker c.send(msg) 217*cda5da8dSAndroid Build Coastguard Worker except Exception as e: 218*cda5da8dSAndroid Build Coastguard Worker try: 219*cda5da8dSAndroid Build Coastguard Worker c.send(('#TRACEBACK', format_exc())) 220*cda5da8dSAndroid Build Coastguard Worker except Exception: 221*cda5da8dSAndroid Build Coastguard Worker pass 222*cda5da8dSAndroid Build Coastguard Worker util.info('Failure to send message: %r', msg) 223*cda5da8dSAndroid Build Coastguard Worker util.info(' ... request was %r', request) 224*cda5da8dSAndroid Build Coastguard Worker util.info(' ... exception was %r', e) 225*cda5da8dSAndroid Build Coastguard Worker 226*cda5da8dSAndroid Build Coastguard Worker def handle_request(self, conn): 227*cda5da8dSAndroid Build Coastguard Worker ''' 228*cda5da8dSAndroid Build Coastguard Worker Handle a new connection 229*cda5da8dSAndroid Build Coastguard Worker ''' 230*cda5da8dSAndroid Build Coastguard Worker try: 231*cda5da8dSAndroid Build Coastguard Worker self._handle_request(conn) 232*cda5da8dSAndroid Build Coastguard Worker except SystemExit: 233*cda5da8dSAndroid Build Coastguard Worker # Server.serve_client() calls sys.exit(0) on EOF 234*cda5da8dSAndroid Build Coastguard Worker pass 235*cda5da8dSAndroid Build Coastguard Worker finally: 236*cda5da8dSAndroid Build Coastguard Worker conn.close() 237*cda5da8dSAndroid Build Coastguard Worker 238*cda5da8dSAndroid Build Coastguard Worker def serve_client(self, conn): 239*cda5da8dSAndroid Build Coastguard Worker ''' 240*cda5da8dSAndroid Build Coastguard Worker Handle requests from the proxies in a particular process/thread 241*cda5da8dSAndroid Build Coastguard Worker ''' 242*cda5da8dSAndroid Build Coastguard Worker util.debug('starting server thread to service %r', 243*cda5da8dSAndroid Build Coastguard Worker threading.current_thread().name) 244*cda5da8dSAndroid Build Coastguard Worker 245*cda5da8dSAndroid Build Coastguard Worker recv = conn.recv 246*cda5da8dSAndroid Build Coastguard Worker send = conn.send 247*cda5da8dSAndroid Build Coastguard Worker id_to_obj = self.id_to_obj 248*cda5da8dSAndroid Build Coastguard Worker 249*cda5da8dSAndroid Build Coastguard Worker while not self.stop_event.is_set(): 250*cda5da8dSAndroid Build Coastguard Worker 251*cda5da8dSAndroid Build Coastguard Worker try: 252*cda5da8dSAndroid Build Coastguard Worker methodname = obj = None 253*cda5da8dSAndroid Build Coastguard Worker request = recv() 254*cda5da8dSAndroid Build Coastguard Worker ident, methodname, args, kwds = request 255*cda5da8dSAndroid Build Coastguard Worker try: 256*cda5da8dSAndroid Build Coastguard Worker obj, exposed, gettypeid = id_to_obj[ident] 257*cda5da8dSAndroid Build Coastguard Worker except KeyError as ke: 258*cda5da8dSAndroid Build Coastguard Worker try: 259*cda5da8dSAndroid Build Coastguard Worker obj, exposed, gettypeid = \ 260*cda5da8dSAndroid Build Coastguard Worker self.id_to_local_proxy_obj[ident] 261*cda5da8dSAndroid Build Coastguard Worker except KeyError: 262*cda5da8dSAndroid Build Coastguard Worker raise ke 263*cda5da8dSAndroid Build Coastguard Worker 264*cda5da8dSAndroid Build Coastguard Worker if methodname not in exposed: 265*cda5da8dSAndroid Build Coastguard Worker raise AttributeError( 266*cda5da8dSAndroid Build Coastguard Worker 'method %r of %r object is not in exposed=%r' % 267*cda5da8dSAndroid Build Coastguard Worker (methodname, type(obj), exposed) 268*cda5da8dSAndroid Build Coastguard Worker ) 269*cda5da8dSAndroid Build Coastguard Worker 270*cda5da8dSAndroid Build Coastguard Worker function = getattr(obj, methodname) 271*cda5da8dSAndroid Build Coastguard Worker 272*cda5da8dSAndroid Build Coastguard Worker try: 273*cda5da8dSAndroid Build Coastguard Worker res = function(*args, **kwds) 274*cda5da8dSAndroid Build Coastguard Worker except Exception as e: 275*cda5da8dSAndroid Build Coastguard Worker msg = ('#ERROR', e) 276*cda5da8dSAndroid Build Coastguard Worker else: 277*cda5da8dSAndroid Build Coastguard Worker typeid = gettypeid and gettypeid.get(methodname, None) 278*cda5da8dSAndroid Build Coastguard Worker if typeid: 279*cda5da8dSAndroid Build Coastguard Worker rident, rexposed = self.create(conn, typeid, res) 280*cda5da8dSAndroid Build Coastguard Worker token = Token(typeid, self.address, rident) 281*cda5da8dSAndroid Build Coastguard Worker msg = ('#PROXY', (rexposed, token)) 282*cda5da8dSAndroid Build Coastguard Worker else: 283*cda5da8dSAndroid Build Coastguard Worker msg = ('#RETURN', res) 284*cda5da8dSAndroid Build Coastguard Worker 285*cda5da8dSAndroid Build Coastguard Worker except AttributeError: 286*cda5da8dSAndroid Build Coastguard Worker if methodname is None: 287*cda5da8dSAndroid Build Coastguard Worker msg = ('#TRACEBACK', format_exc()) 288*cda5da8dSAndroid Build Coastguard Worker else: 289*cda5da8dSAndroid Build Coastguard Worker try: 290*cda5da8dSAndroid Build Coastguard Worker fallback_func = self.fallback_mapping[methodname] 291*cda5da8dSAndroid Build Coastguard Worker result = fallback_func( 292*cda5da8dSAndroid Build Coastguard Worker self, conn, ident, obj, *args, **kwds 293*cda5da8dSAndroid Build Coastguard Worker ) 294*cda5da8dSAndroid Build Coastguard Worker msg = ('#RETURN', result) 295*cda5da8dSAndroid Build Coastguard Worker except Exception: 296*cda5da8dSAndroid Build Coastguard Worker msg = ('#TRACEBACK', format_exc()) 297*cda5da8dSAndroid Build Coastguard Worker 298*cda5da8dSAndroid Build Coastguard Worker except EOFError: 299*cda5da8dSAndroid Build Coastguard Worker util.debug('got EOF -- exiting thread serving %r', 300*cda5da8dSAndroid Build Coastguard Worker threading.current_thread().name) 301*cda5da8dSAndroid Build Coastguard Worker sys.exit(0) 302*cda5da8dSAndroid Build Coastguard Worker 303*cda5da8dSAndroid Build Coastguard Worker except Exception: 304*cda5da8dSAndroid Build Coastguard Worker msg = ('#TRACEBACK', format_exc()) 305*cda5da8dSAndroid Build Coastguard Worker 306*cda5da8dSAndroid Build Coastguard Worker try: 307*cda5da8dSAndroid Build Coastguard Worker try: 308*cda5da8dSAndroid Build Coastguard Worker send(msg) 309*cda5da8dSAndroid Build Coastguard Worker except Exception: 310*cda5da8dSAndroid Build Coastguard Worker send(('#UNSERIALIZABLE', format_exc())) 311*cda5da8dSAndroid Build Coastguard Worker except Exception as e: 312*cda5da8dSAndroid Build Coastguard Worker util.info('exception in thread serving %r', 313*cda5da8dSAndroid Build Coastguard Worker threading.current_thread().name) 314*cda5da8dSAndroid Build Coastguard Worker util.info(' ... message was %r', msg) 315*cda5da8dSAndroid Build Coastguard Worker util.info(' ... exception was %r', e) 316*cda5da8dSAndroid Build Coastguard Worker conn.close() 317*cda5da8dSAndroid Build Coastguard Worker sys.exit(1) 318*cda5da8dSAndroid Build Coastguard Worker 319*cda5da8dSAndroid Build Coastguard Worker def fallback_getvalue(self, conn, ident, obj): 320*cda5da8dSAndroid Build Coastguard Worker return obj 321*cda5da8dSAndroid Build Coastguard Worker 322*cda5da8dSAndroid Build Coastguard Worker def fallback_str(self, conn, ident, obj): 323*cda5da8dSAndroid Build Coastguard Worker return str(obj) 324*cda5da8dSAndroid Build Coastguard Worker 325*cda5da8dSAndroid Build Coastguard Worker def fallback_repr(self, conn, ident, obj): 326*cda5da8dSAndroid Build Coastguard Worker return repr(obj) 327*cda5da8dSAndroid Build Coastguard Worker 328*cda5da8dSAndroid Build Coastguard Worker fallback_mapping = { 329*cda5da8dSAndroid Build Coastguard Worker '__str__':fallback_str, 330*cda5da8dSAndroid Build Coastguard Worker '__repr__':fallback_repr, 331*cda5da8dSAndroid Build Coastguard Worker '#GETVALUE':fallback_getvalue 332*cda5da8dSAndroid Build Coastguard Worker } 333*cda5da8dSAndroid Build Coastguard Worker 334*cda5da8dSAndroid Build Coastguard Worker def dummy(self, c): 335*cda5da8dSAndroid Build Coastguard Worker pass 336*cda5da8dSAndroid Build Coastguard Worker 337*cda5da8dSAndroid Build Coastguard Worker def debug_info(self, c): 338*cda5da8dSAndroid Build Coastguard Worker ''' 339*cda5da8dSAndroid Build Coastguard Worker Return some info --- useful to spot problems with refcounting 340*cda5da8dSAndroid Build Coastguard Worker ''' 341*cda5da8dSAndroid Build Coastguard Worker # Perhaps include debug info about 'c'? 342*cda5da8dSAndroid Build Coastguard Worker with self.mutex: 343*cda5da8dSAndroid Build Coastguard Worker result = [] 344*cda5da8dSAndroid Build Coastguard Worker keys = list(self.id_to_refcount.keys()) 345*cda5da8dSAndroid Build Coastguard Worker keys.sort() 346*cda5da8dSAndroid Build Coastguard Worker for ident in keys: 347*cda5da8dSAndroid Build Coastguard Worker if ident != '0': 348*cda5da8dSAndroid Build Coastguard Worker result.append(' %s: refcount=%s\n %s' % 349*cda5da8dSAndroid Build Coastguard Worker (ident, self.id_to_refcount[ident], 350*cda5da8dSAndroid Build Coastguard Worker str(self.id_to_obj[ident][0])[:75])) 351*cda5da8dSAndroid Build Coastguard Worker return '\n'.join(result) 352*cda5da8dSAndroid Build Coastguard Worker 353*cda5da8dSAndroid Build Coastguard Worker def number_of_objects(self, c): 354*cda5da8dSAndroid Build Coastguard Worker ''' 355*cda5da8dSAndroid Build Coastguard Worker Number of shared objects 356*cda5da8dSAndroid Build Coastguard Worker ''' 357*cda5da8dSAndroid Build Coastguard Worker # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' 358*cda5da8dSAndroid Build Coastguard Worker return len(self.id_to_refcount) 359*cda5da8dSAndroid Build Coastguard Worker 360*cda5da8dSAndroid Build Coastguard Worker def shutdown(self, c): 361*cda5da8dSAndroid Build Coastguard Worker ''' 362*cda5da8dSAndroid Build Coastguard Worker Shutdown this process 363*cda5da8dSAndroid Build Coastguard Worker ''' 364*cda5da8dSAndroid Build Coastguard Worker try: 365*cda5da8dSAndroid Build Coastguard Worker util.debug('manager received shutdown message') 366*cda5da8dSAndroid Build Coastguard Worker c.send(('#RETURN', None)) 367*cda5da8dSAndroid Build Coastguard Worker except: 368*cda5da8dSAndroid Build Coastguard Worker import traceback 369*cda5da8dSAndroid Build Coastguard Worker traceback.print_exc() 370*cda5da8dSAndroid Build Coastguard Worker finally: 371*cda5da8dSAndroid Build Coastguard Worker self.stop_event.set() 372*cda5da8dSAndroid Build Coastguard Worker 373*cda5da8dSAndroid Build Coastguard Worker def create(self, c, typeid, /, *args, **kwds): 374*cda5da8dSAndroid Build Coastguard Worker ''' 375*cda5da8dSAndroid Build Coastguard Worker Create a new shared object and return its id 376*cda5da8dSAndroid Build Coastguard Worker ''' 377*cda5da8dSAndroid Build Coastguard Worker with self.mutex: 378*cda5da8dSAndroid Build Coastguard Worker callable, exposed, method_to_typeid, proxytype = \ 379*cda5da8dSAndroid Build Coastguard Worker self.registry[typeid] 380*cda5da8dSAndroid Build Coastguard Worker 381*cda5da8dSAndroid Build Coastguard Worker if callable is None: 382*cda5da8dSAndroid Build Coastguard Worker if kwds or (len(args) != 1): 383*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 384*cda5da8dSAndroid Build Coastguard Worker "Without callable, must have one non-keyword argument") 385*cda5da8dSAndroid Build Coastguard Worker obj = args[0] 386*cda5da8dSAndroid Build Coastguard Worker else: 387*cda5da8dSAndroid Build Coastguard Worker obj = callable(*args, **kwds) 388*cda5da8dSAndroid Build Coastguard Worker 389*cda5da8dSAndroid Build Coastguard Worker if exposed is None: 390*cda5da8dSAndroid Build Coastguard Worker exposed = public_methods(obj) 391*cda5da8dSAndroid Build Coastguard Worker if method_to_typeid is not None: 392*cda5da8dSAndroid Build Coastguard Worker if not isinstance(method_to_typeid, dict): 393*cda5da8dSAndroid Build Coastguard Worker raise TypeError( 394*cda5da8dSAndroid Build Coastguard Worker "Method_to_typeid {0!r}: type {1!s}, not dict".format( 395*cda5da8dSAndroid Build Coastguard Worker method_to_typeid, type(method_to_typeid))) 396*cda5da8dSAndroid Build Coastguard Worker exposed = list(exposed) + list(method_to_typeid) 397*cda5da8dSAndroid Build Coastguard Worker 398*cda5da8dSAndroid Build Coastguard Worker ident = '%x' % id(obj) # convert to string because xmlrpclib 399*cda5da8dSAndroid Build Coastguard Worker # only has 32 bit signed integers 400*cda5da8dSAndroid Build Coastguard Worker util.debug('%r callable returned object with id %r', typeid, ident) 401*cda5da8dSAndroid Build Coastguard Worker 402*cda5da8dSAndroid Build Coastguard Worker self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) 403*cda5da8dSAndroid Build Coastguard Worker if ident not in self.id_to_refcount: 404*cda5da8dSAndroid Build Coastguard Worker self.id_to_refcount[ident] = 0 405*cda5da8dSAndroid Build Coastguard Worker 406*cda5da8dSAndroid Build Coastguard Worker self.incref(c, ident) 407*cda5da8dSAndroid Build Coastguard Worker return ident, tuple(exposed) 408*cda5da8dSAndroid Build Coastguard Worker 409*cda5da8dSAndroid Build Coastguard Worker def get_methods(self, c, token): 410*cda5da8dSAndroid Build Coastguard Worker ''' 411*cda5da8dSAndroid Build Coastguard Worker Return the methods of the shared object indicated by token 412*cda5da8dSAndroid Build Coastguard Worker ''' 413*cda5da8dSAndroid Build Coastguard Worker return tuple(self.id_to_obj[token.id][1]) 414*cda5da8dSAndroid Build Coastguard Worker 415*cda5da8dSAndroid Build Coastguard Worker def accept_connection(self, c, name): 416*cda5da8dSAndroid Build Coastguard Worker ''' 417*cda5da8dSAndroid Build Coastguard Worker Spawn a new thread to serve this connection 418*cda5da8dSAndroid Build Coastguard Worker ''' 419*cda5da8dSAndroid Build Coastguard Worker threading.current_thread().name = name 420*cda5da8dSAndroid Build Coastguard Worker c.send(('#RETURN', None)) 421*cda5da8dSAndroid Build Coastguard Worker self.serve_client(c) 422*cda5da8dSAndroid Build Coastguard Worker 423*cda5da8dSAndroid Build Coastguard Worker def incref(self, c, ident): 424*cda5da8dSAndroid Build Coastguard Worker with self.mutex: 425*cda5da8dSAndroid Build Coastguard Worker try: 426*cda5da8dSAndroid Build Coastguard Worker self.id_to_refcount[ident] += 1 427*cda5da8dSAndroid Build Coastguard Worker except KeyError as ke: 428*cda5da8dSAndroid Build Coastguard Worker # If no external references exist but an internal (to the 429*cda5da8dSAndroid Build Coastguard Worker # manager) still does and a new external reference is created 430*cda5da8dSAndroid Build Coastguard Worker # from it, restore the manager's tracking of it from the 431*cda5da8dSAndroid Build Coastguard Worker # previously stashed internal ref. 432*cda5da8dSAndroid Build Coastguard Worker if ident in self.id_to_local_proxy_obj: 433*cda5da8dSAndroid Build Coastguard Worker self.id_to_refcount[ident] = 1 434*cda5da8dSAndroid Build Coastguard Worker self.id_to_obj[ident] = \ 435*cda5da8dSAndroid Build Coastguard Worker self.id_to_local_proxy_obj[ident] 436*cda5da8dSAndroid Build Coastguard Worker obj, exposed, gettypeid = self.id_to_obj[ident] 437*cda5da8dSAndroid Build Coastguard Worker util.debug('Server re-enabled tracking & INCREF %r', ident) 438*cda5da8dSAndroid Build Coastguard Worker else: 439*cda5da8dSAndroid Build Coastguard Worker raise ke 440*cda5da8dSAndroid Build Coastguard Worker 441*cda5da8dSAndroid Build Coastguard Worker def decref(self, c, ident): 442*cda5da8dSAndroid Build Coastguard Worker if ident not in self.id_to_refcount and \ 443*cda5da8dSAndroid Build Coastguard Worker ident in self.id_to_local_proxy_obj: 444*cda5da8dSAndroid Build Coastguard Worker util.debug('Server DECREF skipping %r', ident) 445*cda5da8dSAndroid Build Coastguard Worker return 446*cda5da8dSAndroid Build Coastguard Worker 447*cda5da8dSAndroid Build Coastguard Worker with self.mutex: 448*cda5da8dSAndroid Build Coastguard Worker if self.id_to_refcount[ident] <= 0: 449*cda5da8dSAndroid Build Coastguard Worker raise AssertionError( 450*cda5da8dSAndroid Build Coastguard Worker "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( 451*cda5da8dSAndroid Build Coastguard Worker ident, self.id_to_obj[ident], 452*cda5da8dSAndroid Build Coastguard Worker self.id_to_refcount[ident])) 453*cda5da8dSAndroid Build Coastguard Worker self.id_to_refcount[ident] -= 1 454*cda5da8dSAndroid Build Coastguard Worker if self.id_to_refcount[ident] == 0: 455*cda5da8dSAndroid Build Coastguard Worker del self.id_to_refcount[ident] 456*cda5da8dSAndroid Build Coastguard Worker 457*cda5da8dSAndroid Build Coastguard Worker if ident not in self.id_to_refcount: 458*cda5da8dSAndroid Build Coastguard Worker # Two-step process in case the object turns out to contain other 459*cda5da8dSAndroid Build Coastguard Worker # proxy objects (e.g. a managed list of managed lists). 460*cda5da8dSAndroid Build Coastguard Worker # Otherwise, deleting self.id_to_obj[ident] would trigger the 461*cda5da8dSAndroid Build Coastguard Worker # deleting of the stored value (another managed object) which would 462*cda5da8dSAndroid Build Coastguard Worker # in turn attempt to acquire the mutex that is already held here. 463*cda5da8dSAndroid Build Coastguard Worker self.id_to_obj[ident] = (None, (), None) # thread-safe 464*cda5da8dSAndroid Build Coastguard Worker util.debug('disposing of obj with id %r', ident) 465*cda5da8dSAndroid Build Coastguard Worker with self.mutex: 466*cda5da8dSAndroid Build Coastguard Worker del self.id_to_obj[ident] 467*cda5da8dSAndroid Build Coastguard Worker 468*cda5da8dSAndroid Build Coastguard Worker 469*cda5da8dSAndroid Build Coastguard Worker# 470*cda5da8dSAndroid Build Coastguard Worker# Class to represent state of a manager 471*cda5da8dSAndroid Build Coastguard Worker# 472*cda5da8dSAndroid Build Coastguard Worker 473*cda5da8dSAndroid Build Coastguard Workerclass State(object): 474*cda5da8dSAndroid Build Coastguard Worker __slots__ = ['value'] 475*cda5da8dSAndroid Build Coastguard Worker INITIAL = 0 476*cda5da8dSAndroid Build Coastguard Worker STARTED = 1 477*cda5da8dSAndroid Build Coastguard Worker SHUTDOWN = 2 478*cda5da8dSAndroid Build Coastguard Worker 479*cda5da8dSAndroid Build Coastguard Worker# 480*cda5da8dSAndroid Build Coastguard Worker# Mapping from serializer name to Listener and Client types 481*cda5da8dSAndroid Build Coastguard Worker# 482*cda5da8dSAndroid Build Coastguard Worker 483*cda5da8dSAndroid Build Coastguard Workerlistener_client = { 484*cda5da8dSAndroid Build Coastguard Worker 'pickle' : (connection.Listener, connection.Client), 485*cda5da8dSAndroid Build Coastguard Worker 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) 486*cda5da8dSAndroid Build Coastguard Worker } 487*cda5da8dSAndroid Build Coastguard Worker 488*cda5da8dSAndroid Build Coastguard Worker# 489*cda5da8dSAndroid Build Coastguard Worker# Definition of BaseManager 490*cda5da8dSAndroid Build Coastguard Worker# 491*cda5da8dSAndroid Build Coastguard Worker 492*cda5da8dSAndroid Build Coastguard Workerclass BaseManager(object): 493*cda5da8dSAndroid Build Coastguard Worker ''' 494*cda5da8dSAndroid Build Coastguard Worker Base class for managers 495*cda5da8dSAndroid Build Coastguard Worker ''' 496*cda5da8dSAndroid Build Coastguard Worker _registry = {} 497*cda5da8dSAndroid Build Coastguard Worker _Server = Server 498*cda5da8dSAndroid Build Coastguard Worker 499*cda5da8dSAndroid Build Coastguard Worker def __init__(self, address=None, authkey=None, serializer='pickle', 500*cda5da8dSAndroid Build Coastguard Worker ctx=None, *, shutdown_timeout=1.0): 501*cda5da8dSAndroid Build Coastguard Worker if authkey is None: 502*cda5da8dSAndroid Build Coastguard Worker authkey = process.current_process().authkey 503*cda5da8dSAndroid Build Coastguard Worker self._address = address # XXX not final address if eg ('', 0) 504*cda5da8dSAndroid Build Coastguard Worker self._authkey = process.AuthenticationString(authkey) 505*cda5da8dSAndroid Build Coastguard Worker self._state = State() 506*cda5da8dSAndroid Build Coastguard Worker self._state.value = State.INITIAL 507*cda5da8dSAndroid Build Coastguard Worker self._serializer = serializer 508*cda5da8dSAndroid Build Coastguard Worker self._Listener, self._Client = listener_client[serializer] 509*cda5da8dSAndroid Build Coastguard Worker self._ctx = ctx or get_context() 510*cda5da8dSAndroid Build Coastguard Worker self._shutdown_timeout = shutdown_timeout 511*cda5da8dSAndroid Build Coastguard Worker 512*cda5da8dSAndroid Build Coastguard Worker def get_server(self): 513*cda5da8dSAndroid Build Coastguard Worker ''' 514*cda5da8dSAndroid Build Coastguard Worker Return server object with serve_forever() method and address attribute 515*cda5da8dSAndroid Build Coastguard Worker ''' 516*cda5da8dSAndroid Build Coastguard Worker if self._state.value != State.INITIAL: 517*cda5da8dSAndroid Build Coastguard Worker if self._state.value == State.STARTED: 518*cda5da8dSAndroid Build Coastguard Worker raise ProcessError("Already started server") 519*cda5da8dSAndroid Build Coastguard Worker elif self._state.value == State.SHUTDOWN: 520*cda5da8dSAndroid Build Coastguard Worker raise ProcessError("Manager has shut down") 521*cda5da8dSAndroid Build Coastguard Worker else: 522*cda5da8dSAndroid Build Coastguard Worker raise ProcessError( 523*cda5da8dSAndroid Build Coastguard Worker "Unknown state {!r}".format(self._state.value)) 524*cda5da8dSAndroid Build Coastguard Worker return Server(self._registry, self._address, 525*cda5da8dSAndroid Build Coastguard Worker self._authkey, self._serializer) 526*cda5da8dSAndroid Build Coastguard Worker 527*cda5da8dSAndroid Build Coastguard Worker def connect(self): 528*cda5da8dSAndroid Build Coastguard Worker ''' 529*cda5da8dSAndroid Build Coastguard Worker Connect manager object to the server process 530*cda5da8dSAndroid Build Coastguard Worker ''' 531*cda5da8dSAndroid Build Coastguard Worker Listener, Client = listener_client[self._serializer] 532*cda5da8dSAndroid Build Coastguard Worker conn = Client(self._address, authkey=self._authkey) 533*cda5da8dSAndroid Build Coastguard Worker dispatch(conn, None, 'dummy') 534*cda5da8dSAndroid Build Coastguard Worker self._state.value = State.STARTED 535*cda5da8dSAndroid Build Coastguard Worker 536*cda5da8dSAndroid Build Coastguard Worker def start(self, initializer=None, initargs=()): 537*cda5da8dSAndroid Build Coastguard Worker ''' 538*cda5da8dSAndroid Build Coastguard Worker Spawn a server process for this manager object 539*cda5da8dSAndroid Build Coastguard Worker ''' 540*cda5da8dSAndroid Build Coastguard Worker if self._state.value != State.INITIAL: 541*cda5da8dSAndroid Build Coastguard Worker if self._state.value == State.STARTED: 542*cda5da8dSAndroid Build Coastguard Worker raise ProcessError("Already started server") 543*cda5da8dSAndroid Build Coastguard Worker elif self._state.value == State.SHUTDOWN: 544*cda5da8dSAndroid Build Coastguard Worker raise ProcessError("Manager has shut down") 545*cda5da8dSAndroid Build Coastguard Worker else: 546*cda5da8dSAndroid Build Coastguard Worker raise ProcessError( 547*cda5da8dSAndroid Build Coastguard Worker "Unknown state {!r}".format(self._state.value)) 548*cda5da8dSAndroid Build Coastguard Worker 549*cda5da8dSAndroid Build Coastguard Worker if initializer is not None and not callable(initializer): 550*cda5da8dSAndroid Build Coastguard Worker raise TypeError('initializer must be a callable') 551*cda5da8dSAndroid Build Coastguard Worker 552*cda5da8dSAndroid Build Coastguard Worker # pipe over which we will retrieve address of server 553*cda5da8dSAndroid Build Coastguard Worker reader, writer = connection.Pipe(duplex=False) 554*cda5da8dSAndroid Build Coastguard Worker 555*cda5da8dSAndroid Build Coastguard Worker # spawn process which runs a server 556*cda5da8dSAndroid Build Coastguard Worker self._process = self._ctx.Process( 557*cda5da8dSAndroid Build Coastguard Worker target=type(self)._run_server, 558*cda5da8dSAndroid Build Coastguard Worker args=(self._registry, self._address, self._authkey, 559*cda5da8dSAndroid Build Coastguard Worker self._serializer, writer, initializer, initargs), 560*cda5da8dSAndroid Build Coastguard Worker ) 561*cda5da8dSAndroid Build Coastguard Worker ident = ':'.join(str(i) for i in self._process._identity) 562*cda5da8dSAndroid Build Coastguard Worker self._process.name = type(self).__name__ + '-' + ident 563*cda5da8dSAndroid Build Coastguard Worker self._process.start() 564*cda5da8dSAndroid Build Coastguard Worker 565*cda5da8dSAndroid Build Coastguard Worker # get address of server 566*cda5da8dSAndroid Build Coastguard Worker writer.close() 567*cda5da8dSAndroid Build Coastguard Worker self._address = reader.recv() 568*cda5da8dSAndroid Build Coastguard Worker reader.close() 569*cda5da8dSAndroid Build Coastguard Worker 570*cda5da8dSAndroid Build Coastguard Worker # register a finalizer 571*cda5da8dSAndroid Build Coastguard Worker self._state.value = State.STARTED 572*cda5da8dSAndroid Build Coastguard Worker self.shutdown = util.Finalize( 573*cda5da8dSAndroid Build Coastguard Worker self, type(self)._finalize_manager, 574*cda5da8dSAndroid Build Coastguard Worker args=(self._process, self._address, self._authkey, self._state, 575*cda5da8dSAndroid Build Coastguard Worker self._Client, self._shutdown_timeout), 576*cda5da8dSAndroid Build Coastguard Worker exitpriority=0 577*cda5da8dSAndroid Build Coastguard Worker ) 578*cda5da8dSAndroid Build Coastguard Worker 579*cda5da8dSAndroid Build Coastguard Worker @classmethod 580*cda5da8dSAndroid Build Coastguard Worker def _run_server(cls, registry, address, authkey, serializer, writer, 581*cda5da8dSAndroid Build Coastguard Worker initializer=None, initargs=()): 582*cda5da8dSAndroid Build Coastguard Worker ''' 583*cda5da8dSAndroid Build Coastguard Worker Create a server, report its address and run it 584*cda5da8dSAndroid Build Coastguard Worker ''' 585*cda5da8dSAndroid Build Coastguard Worker # bpo-36368: protect server process from KeyboardInterrupt signals 586*cda5da8dSAndroid Build Coastguard Worker signal.signal(signal.SIGINT, signal.SIG_IGN) 587*cda5da8dSAndroid Build Coastguard Worker 588*cda5da8dSAndroid Build Coastguard Worker if initializer is not None: 589*cda5da8dSAndroid Build Coastguard Worker initializer(*initargs) 590*cda5da8dSAndroid Build Coastguard Worker 591*cda5da8dSAndroid Build Coastguard Worker # create server 592*cda5da8dSAndroid Build Coastguard Worker server = cls._Server(registry, address, authkey, serializer) 593*cda5da8dSAndroid Build Coastguard Worker 594*cda5da8dSAndroid Build Coastguard Worker # inform parent process of the server's address 595*cda5da8dSAndroid Build Coastguard Worker writer.send(server.address) 596*cda5da8dSAndroid Build Coastguard Worker writer.close() 597*cda5da8dSAndroid Build Coastguard Worker 598*cda5da8dSAndroid Build Coastguard Worker # run the manager 599*cda5da8dSAndroid Build Coastguard Worker util.info('manager serving at %r', server.address) 600*cda5da8dSAndroid Build Coastguard Worker server.serve_forever() 601*cda5da8dSAndroid Build Coastguard Worker 602*cda5da8dSAndroid Build Coastguard Worker def _create(self, typeid, /, *args, **kwds): 603*cda5da8dSAndroid Build Coastguard Worker ''' 604*cda5da8dSAndroid Build Coastguard Worker Create a new shared object; return the token and exposed tuple 605*cda5da8dSAndroid Build Coastguard Worker ''' 606*cda5da8dSAndroid Build Coastguard Worker assert self._state.value == State.STARTED, 'server not yet started' 607*cda5da8dSAndroid Build Coastguard Worker conn = self._Client(self._address, authkey=self._authkey) 608*cda5da8dSAndroid Build Coastguard Worker try: 609*cda5da8dSAndroid Build Coastguard Worker id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) 610*cda5da8dSAndroid Build Coastguard Worker finally: 611*cda5da8dSAndroid Build Coastguard Worker conn.close() 612*cda5da8dSAndroid Build Coastguard Worker return Token(typeid, self._address, id), exposed 613*cda5da8dSAndroid Build Coastguard Worker 614*cda5da8dSAndroid Build Coastguard Worker def join(self, timeout=None): 615*cda5da8dSAndroid Build Coastguard Worker ''' 616*cda5da8dSAndroid Build Coastguard Worker Join the manager process (if it has been spawned) 617*cda5da8dSAndroid Build Coastguard Worker ''' 618*cda5da8dSAndroid Build Coastguard Worker if self._process is not None: 619*cda5da8dSAndroid Build Coastguard Worker self._process.join(timeout) 620*cda5da8dSAndroid Build Coastguard Worker if not self._process.is_alive(): 621*cda5da8dSAndroid Build Coastguard Worker self._process = None 622*cda5da8dSAndroid Build Coastguard Worker 623*cda5da8dSAndroid Build Coastguard Worker def _debug_info(self): 624*cda5da8dSAndroid Build Coastguard Worker ''' 625*cda5da8dSAndroid Build Coastguard Worker Return some info about the servers shared objects and connections 626*cda5da8dSAndroid Build Coastguard Worker ''' 627*cda5da8dSAndroid Build Coastguard Worker conn = self._Client(self._address, authkey=self._authkey) 628*cda5da8dSAndroid Build Coastguard Worker try: 629*cda5da8dSAndroid Build Coastguard Worker return dispatch(conn, None, 'debug_info') 630*cda5da8dSAndroid Build Coastguard Worker finally: 631*cda5da8dSAndroid Build Coastguard Worker conn.close() 632*cda5da8dSAndroid Build Coastguard Worker 633*cda5da8dSAndroid Build Coastguard Worker def _number_of_objects(self): 634*cda5da8dSAndroid Build Coastguard Worker ''' 635*cda5da8dSAndroid Build Coastguard Worker Return the number of shared objects 636*cda5da8dSAndroid Build Coastguard Worker ''' 637*cda5da8dSAndroid Build Coastguard Worker conn = self._Client(self._address, authkey=self._authkey) 638*cda5da8dSAndroid Build Coastguard Worker try: 639*cda5da8dSAndroid Build Coastguard Worker return dispatch(conn, None, 'number_of_objects') 640*cda5da8dSAndroid Build Coastguard Worker finally: 641*cda5da8dSAndroid Build Coastguard Worker conn.close() 642*cda5da8dSAndroid Build Coastguard Worker 643*cda5da8dSAndroid Build Coastguard Worker def __enter__(self): 644*cda5da8dSAndroid Build Coastguard Worker if self._state.value == State.INITIAL: 645*cda5da8dSAndroid Build Coastguard Worker self.start() 646*cda5da8dSAndroid Build Coastguard Worker if self._state.value != State.STARTED: 647*cda5da8dSAndroid Build Coastguard Worker if self._state.value == State.INITIAL: 648*cda5da8dSAndroid Build Coastguard Worker raise ProcessError("Unable to start server") 649*cda5da8dSAndroid Build Coastguard Worker elif self._state.value == State.SHUTDOWN: 650*cda5da8dSAndroid Build Coastguard Worker raise ProcessError("Manager has shut down") 651*cda5da8dSAndroid Build Coastguard Worker else: 652*cda5da8dSAndroid Build Coastguard Worker raise ProcessError( 653*cda5da8dSAndroid Build Coastguard Worker "Unknown state {!r}".format(self._state.value)) 654*cda5da8dSAndroid Build Coastguard Worker return self 655*cda5da8dSAndroid Build Coastguard Worker 656*cda5da8dSAndroid Build Coastguard Worker def __exit__(self, exc_type, exc_val, exc_tb): 657*cda5da8dSAndroid Build Coastguard Worker self.shutdown() 658*cda5da8dSAndroid Build Coastguard Worker 659*cda5da8dSAndroid Build Coastguard Worker @staticmethod 660*cda5da8dSAndroid Build Coastguard Worker def _finalize_manager(process, address, authkey, state, _Client, 661*cda5da8dSAndroid Build Coastguard Worker shutdown_timeout): 662*cda5da8dSAndroid Build Coastguard Worker ''' 663*cda5da8dSAndroid Build Coastguard Worker Shutdown the manager process; will be registered as a finalizer 664*cda5da8dSAndroid Build Coastguard Worker ''' 665*cda5da8dSAndroid Build Coastguard Worker if process.is_alive(): 666*cda5da8dSAndroid Build Coastguard Worker util.info('sending shutdown message to manager') 667*cda5da8dSAndroid Build Coastguard Worker try: 668*cda5da8dSAndroid Build Coastguard Worker conn = _Client(address, authkey=authkey) 669*cda5da8dSAndroid Build Coastguard Worker try: 670*cda5da8dSAndroid Build Coastguard Worker dispatch(conn, None, 'shutdown') 671*cda5da8dSAndroid Build Coastguard Worker finally: 672*cda5da8dSAndroid Build Coastguard Worker conn.close() 673*cda5da8dSAndroid Build Coastguard Worker except Exception: 674*cda5da8dSAndroid Build Coastguard Worker pass 675*cda5da8dSAndroid Build Coastguard Worker 676*cda5da8dSAndroid Build Coastguard Worker process.join(timeout=shutdown_timeout) 677*cda5da8dSAndroid Build Coastguard Worker if process.is_alive(): 678*cda5da8dSAndroid Build Coastguard Worker util.info('manager still alive') 679*cda5da8dSAndroid Build Coastguard Worker if hasattr(process, 'terminate'): 680*cda5da8dSAndroid Build Coastguard Worker util.info('trying to `terminate()` manager process') 681*cda5da8dSAndroid Build Coastguard Worker process.terminate() 682*cda5da8dSAndroid Build Coastguard Worker process.join(timeout=shutdown_timeout) 683*cda5da8dSAndroid Build Coastguard Worker if process.is_alive(): 684*cda5da8dSAndroid Build Coastguard Worker util.info('manager still alive after terminate') 685*cda5da8dSAndroid Build Coastguard Worker process.kill() 686*cda5da8dSAndroid Build Coastguard Worker process.join() 687*cda5da8dSAndroid Build Coastguard Worker 688*cda5da8dSAndroid Build Coastguard Worker state.value = State.SHUTDOWN 689*cda5da8dSAndroid Build Coastguard Worker try: 690*cda5da8dSAndroid Build Coastguard Worker del BaseProxy._address_to_local[address] 691*cda5da8dSAndroid Build Coastguard Worker except KeyError: 692*cda5da8dSAndroid Build Coastguard Worker pass 693*cda5da8dSAndroid Build Coastguard Worker 694*cda5da8dSAndroid Build Coastguard Worker @property 695*cda5da8dSAndroid Build Coastguard Worker def address(self): 696*cda5da8dSAndroid Build Coastguard Worker return self._address 697*cda5da8dSAndroid Build Coastguard Worker 698*cda5da8dSAndroid Build Coastguard Worker @classmethod 699*cda5da8dSAndroid Build Coastguard Worker def register(cls, typeid, callable=None, proxytype=None, exposed=None, 700*cda5da8dSAndroid Build Coastguard Worker method_to_typeid=None, create_method=True): 701*cda5da8dSAndroid Build Coastguard Worker ''' 702*cda5da8dSAndroid Build Coastguard Worker Register a typeid with the manager type 703*cda5da8dSAndroid Build Coastguard Worker ''' 704*cda5da8dSAndroid Build Coastguard Worker if '_registry' not in cls.__dict__: 705*cda5da8dSAndroid Build Coastguard Worker cls._registry = cls._registry.copy() 706*cda5da8dSAndroid Build Coastguard Worker 707*cda5da8dSAndroid Build Coastguard Worker if proxytype is None: 708*cda5da8dSAndroid Build Coastguard Worker proxytype = AutoProxy 709*cda5da8dSAndroid Build Coastguard Worker 710*cda5da8dSAndroid Build Coastguard Worker exposed = exposed or getattr(proxytype, '_exposed_', None) 711*cda5da8dSAndroid Build Coastguard Worker 712*cda5da8dSAndroid Build Coastguard Worker method_to_typeid = method_to_typeid or \ 713*cda5da8dSAndroid Build Coastguard Worker getattr(proxytype, '_method_to_typeid_', None) 714*cda5da8dSAndroid Build Coastguard Worker 715*cda5da8dSAndroid Build Coastguard Worker if method_to_typeid: 716*cda5da8dSAndroid Build Coastguard Worker for key, value in list(method_to_typeid.items()): # isinstance? 717*cda5da8dSAndroid Build Coastguard Worker assert type(key) is str, '%r is not a string' % key 718*cda5da8dSAndroid Build Coastguard Worker assert type(value) is str, '%r is not a string' % value 719*cda5da8dSAndroid Build Coastguard Worker 720*cda5da8dSAndroid Build Coastguard Worker cls._registry[typeid] = ( 721*cda5da8dSAndroid Build Coastguard Worker callable, exposed, method_to_typeid, proxytype 722*cda5da8dSAndroid Build Coastguard Worker ) 723*cda5da8dSAndroid Build Coastguard Worker 724*cda5da8dSAndroid Build Coastguard Worker if create_method: 725*cda5da8dSAndroid Build Coastguard Worker def temp(self, /, *args, **kwds): 726*cda5da8dSAndroid Build Coastguard Worker util.debug('requesting creation of a shared %r object', typeid) 727*cda5da8dSAndroid Build Coastguard Worker token, exp = self._create(typeid, *args, **kwds) 728*cda5da8dSAndroid Build Coastguard Worker proxy = proxytype( 729*cda5da8dSAndroid Build Coastguard Worker token, self._serializer, manager=self, 730*cda5da8dSAndroid Build Coastguard Worker authkey=self._authkey, exposed=exp 731*cda5da8dSAndroid Build Coastguard Worker ) 732*cda5da8dSAndroid Build Coastguard Worker conn = self._Client(token.address, authkey=self._authkey) 733*cda5da8dSAndroid Build Coastguard Worker dispatch(conn, None, 'decref', (token.id,)) 734*cda5da8dSAndroid Build Coastguard Worker return proxy 735*cda5da8dSAndroid Build Coastguard Worker temp.__name__ = typeid 736*cda5da8dSAndroid Build Coastguard Worker setattr(cls, typeid, temp) 737*cda5da8dSAndroid Build Coastguard Worker 738*cda5da8dSAndroid Build Coastguard Worker# 739*cda5da8dSAndroid Build Coastguard Worker# Subclass of set which get cleared after a fork 740*cda5da8dSAndroid Build Coastguard Worker# 741*cda5da8dSAndroid Build Coastguard Worker 742*cda5da8dSAndroid Build Coastguard Workerclass ProcessLocalSet(set): 743*cda5da8dSAndroid Build Coastguard Worker def __init__(self): 744*cda5da8dSAndroid Build Coastguard Worker util.register_after_fork(self, lambda obj: obj.clear()) 745*cda5da8dSAndroid Build Coastguard Worker def __reduce__(self): 746*cda5da8dSAndroid Build Coastguard Worker return type(self), () 747*cda5da8dSAndroid Build Coastguard Worker 748*cda5da8dSAndroid Build Coastguard Worker# 749*cda5da8dSAndroid Build Coastguard Worker# Definition of BaseProxy 750*cda5da8dSAndroid Build Coastguard Worker# 751*cda5da8dSAndroid Build Coastguard Worker 752*cda5da8dSAndroid Build Coastguard Workerclass BaseProxy(object): 753*cda5da8dSAndroid Build Coastguard Worker ''' 754*cda5da8dSAndroid Build Coastguard Worker A base for proxies of shared objects 755*cda5da8dSAndroid Build Coastguard Worker ''' 756*cda5da8dSAndroid Build Coastguard Worker _address_to_local = {} 757*cda5da8dSAndroid Build Coastguard Worker _mutex = util.ForkAwareThreadLock() 758*cda5da8dSAndroid Build Coastguard Worker 759*cda5da8dSAndroid Build Coastguard Worker def __init__(self, token, serializer, manager=None, 760*cda5da8dSAndroid Build Coastguard Worker authkey=None, exposed=None, incref=True, manager_owned=False): 761*cda5da8dSAndroid Build Coastguard Worker with BaseProxy._mutex: 762*cda5da8dSAndroid Build Coastguard Worker tls_idset = BaseProxy._address_to_local.get(token.address, None) 763*cda5da8dSAndroid Build Coastguard Worker if tls_idset is None: 764*cda5da8dSAndroid Build Coastguard Worker tls_idset = util.ForkAwareLocal(), ProcessLocalSet() 765*cda5da8dSAndroid Build Coastguard Worker BaseProxy._address_to_local[token.address] = tls_idset 766*cda5da8dSAndroid Build Coastguard Worker 767*cda5da8dSAndroid Build Coastguard Worker # self._tls is used to record the connection used by this 768*cda5da8dSAndroid Build Coastguard Worker # thread to communicate with the manager at token.address 769*cda5da8dSAndroid Build Coastguard Worker self._tls = tls_idset[0] 770*cda5da8dSAndroid Build Coastguard Worker 771*cda5da8dSAndroid Build Coastguard Worker # self._idset is used to record the identities of all shared 772*cda5da8dSAndroid Build Coastguard Worker # objects for which the current process owns references and 773*cda5da8dSAndroid Build Coastguard Worker # which are in the manager at token.address 774*cda5da8dSAndroid Build Coastguard Worker self._idset = tls_idset[1] 775*cda5da8dSAndroid Build Coastguard Worker 776*cda5da8dSAndroid Build Coastguard Worker self._token = token 777*cda5da8dSAndroid Build Coastguard Worker self._id = self._token.id 778*cda5da8dSAndroid Build Coastguard Worker self._manager = manager 779*cda5da8dSAndroid Build Coastguard Worker self._serializer = serializer 780*cda5da8dSAndroid Build Coastguard Worker self._Client = listener_client[serializer][1] 781*cda5da8dSAndroid Build Coastguard Worker 782*cda5da8dSAndroid Build Coastguard Worker # Should be set to True only when a proxy object is being created 783*cda5da8dSAndroid Build Coastguard Worker # on the manager server; primary use case: nested proxy objects. 784*cda5da8dSAndroid Build Coastguard Worker # RebuildProxy detects when a proxy is being created on the manager 785*cda5da8dSAndroid Build Coastguard Worker # and sets this value appropriately. 786*cda5da8dSAndroid Build Coastguard Worker self._owned_by_manager = manager_owned 787*cda5da8dSAndroid Build Coastguard Worker 788*cda5da8dSAndroid Build Coastguard Worker if authkey is not None: 789*cda5da8dSAndroid Build Coastguard Worker self._authkey = process.AuthenticationString(authkey) 790*cda5da8dSAndroid Build Coastguard Worker elif self._manager is not None: 791*cda5da8dSAndroid Build Coastguard Worker self._authkey = self._manager._authkey 792*cda5da8dSAndroid Build Coastguard Worker else: 793*cda5da8dSAndroid Build Coastguard Worker self._authkey = process.current_process().authkey 794*cda5da8dSAndroid Build Coastguard Worker 795*cda5da8dSAndroid Build Coastguard Worker if incref: 796*cda5da8dSAndroid Build Coastguard Worker self._incref() 797*cda5da8dSAndroid Build Coastguard Worker 798*cda5da8dSAndroid Build Coastguard Worker util.register_after_fork(self, BaseProxy._after_fork) 799*cda5da8dSAndroid Build Coastguard Worker 800*cda5da8dSAndroid Build Coastguard Worker def _connect(self): 801*cda5da8dSAndroid Build Coastguard Worker util.debug('making connection to manager') 802*cda5da8dSAndroid Build Coastguard Worker name = process.current_process().name 803*cda5da8dSAndroid Build Coastguard Worker if threading.current_thread().name != 'MainThread': 804*cda5da8dSAndroid Build Coastguard Worker name += '|' + threading.current_thread().name 805*cda5da8dSAndroid Build Coastguard Worker conn = self._Client(self._token.address, authkey=self._authkey) 806*cda5da8dSAndroid Build Coastguard Worker dispatch(conn, None, 'accept_connection', (name,)) 807*cda5da8dSAndroid Build Coastguard Worker self._tls.connection = conn 808*cda5da8dSAndroid Build Coastguard Worker 809*cda5da8dSAndroid Build Coastguard Worker def _callmethod(self, methodname, args=(), kwds={}): 810*cda5da8dSAndroid Build Coastguard Worker ''' 811*cda5da8dSAndroid Build Coastguard Worker Try to call a method of the referent and return a copy of the result 812*cda5da8dSAndroid Build Coastguard Worker ''' 813*cda5da8dSAndroid Build Coastguard Worker try: 814*cda5da8dSAndroid Build Coastguard Worker conn = self._tls.connection 815*cda5da8dSAndroid Build Coastguard Worker except AttributeError: 816*cda5da8dSAndroid Build Coastguard Worker util.debug('thread %r does not own a connection', 817*cda5da8dSAndroid Build Coastguard Worker threading.current_thread().name) 818*cda5da8dSAndroid Build Coastguard Worker self._connect() 819*cda5da8dSAndroid Build Coastguard Worker conn = self._tls.connection 820*cda5da8dSAndroid Build Coastguard Worker 821*cda5da8dSAndroid Build Coastguard Worker conn.send((self._id, methodname, args, kwds)) 822*cda5da8dSAndroid Build Coastguard Worker kind, result = conn.recv() 823*cda5da8dSAndroid Build Coastguard Worker 824*cda5da8dSAndroid Build Coastguard Worker if kind == '#RETURN': 825*cda5da8dSAndroid Build Coastguard Worker return result 826*cda5da8dSAndroid Build Coastguard Worker elif kind == '#PROXY': 827*cda5da8dSAndroid Build Coastguard Worker exposed, token = result 828*cda5da8dSAndroid Build Coastguard Worker proxytype = self._manager._registry[token.typeid][-1] 829*cda5da8dSAndroid Build Coastguard Worker token.address = self._token.address 830*cda5da8dSAndroid Build Coastguard Worker proxy = proxytype( 831*cda5da8dSAndroid Build Coastguard Worker token, self._serializer, manager=self._manager, 832*cda5da8dSAndroid Build Coastguard Worker authkey=self._authkey, exposed=exposed 833*cda5da8dSAndroid Build Coastguard Worker ) 834*cda5da8dSAndroid Build Coastguard Worker conn = self._Client(token.address, authkey=self._authkey) 835*cda5da8dSAndroid Build Coastguard Worker dispatch(conn, None, 'decref', (token.id,)) 836*cda5da8dSAndroid Build Coastguard Worker return proxy 837*cda5da8dSAndroid Build Coastguard Worker raise convert_to_error(kind, result) 838*cda5da8dSAndroid Build Coastguard Worker 839*cda5da8dSAndroid Build Coastguard Worker def _getvalue(self): 840*cda5da8dSAndroid Build Coastguard Worker ''' 841*cda5da8dSAndroid Build Coastguard Worker Get a copy of the value of the referent 842*cda5da8dSAndroid Build Coastguard Worker ''' 843*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('#GETVALUE') 844*cda5da8dSAndroid Build Coastguard Worker 845*cda5da8dSAndroid Build Coastguard Worker def _incref(self): 846*cda5da8dSAndroid Build Coastguard Worker if self._owned_by_manager: 847*cda5da8dSAndroid Build Coastguard Worker util.debug('owned_by_manager skipped INCREF of %r', self._token.id) 848*cda5da8dSAndroid Build Coastguard Worker return 849*cda5da8dSAndroid Build Coastguard Worker 850*cda5da8dSAndroid Build Coastguard Worker conn = self._Client(self._token.address, authkey=self._authkey) 851*cda5da8dSAndroid Build Coastguard Worker dispatch(conn, None, 'incref', (self._id,)) 852*cda5da8dSAndroid Build Coastguard Worker util.debug('INCREF %r', self._token.id) 853*cda5da8dSAndroid Build Coastguard Worker 854*cda5da8dSAndroid Build Coastguard Worker self._idset.add(self._id) 855*cda5da8dSAndroid Build Coastguard Worker 856*cda5da8dSAndroid Build Coastguard Worker state = self._manager and self._manager._state 857*cda5da8dSAndroid Build Coastguard Worker 858*cda5da8dSAndroid Build Coastguard Worker self._close = util.Finalize( 859*cda5da8dSAndroid Build Coastguard Worker self, BaseProxy._decref, 860*cda5da8dSAndroid Build Coastguard Worker args=(self._token, self._authkey, state, 861*cda5da8dSAndroid Build Coastguard Worker self._tls, self._idset, self._Client), 862*cda5da8dSAndroid Build Coastguard Worker exitpriority=10 863*cda5da8dSAndroid Build Coastguard Worker ) 864*cda5da8dSAndroid Build Coastguard Worker 865*cda5da8dSAndroid Build Coastguard Worker @staticmethod 866*cda5da8dSAndroid Build Coastguard Worker def _decref(token, authkey, state, tls, idset, _Client): 867*cda5da8dSAndroid Build Coastguard Worker idset.discard(token.id) 868*cda5da8dSAndroid Build Coastguard Worker 869*cda5da8dSAndroid Build Coastguard Worker # check whether manager is still alive 870*cda5da8dSAndroid Build Coastguard Worker if state is None or state.value == State.STARTED: 871*cda5da8dSAndroid Build Coastguard Worker # tell manager this process no longer cares about referent 872*cda5da8dSAndroid Build Coastguard Worker try: 873*cda5da8dSAndroid Build Coastguard Worker util.debug('DECREF %r', token.id) 874*cda5da8dSAndroid Build Coastguard Worker conn = _Client(token.address, authkey=authkey) 875*cda5da8dSAndroid Build Coastguard Worker dispatch(conn, None, 'decref', (token.id,)) 876*cda5da8dSAndroid Build Coastguard Worker except Exception as e: 877*cda5da8dSAndroid Build Coastguard Worker util.debug('... decref failed %s', e) 878*cda5da8dSAndroid Build Coastguard Worker 879*cda5da8dSAndroid Build Coastguard Worker else: 880*cda5da8dSAndroid Build Coastguard Worker util.debug('DECREF %r -- manager already shutdown', token.id) 881*cda5da8dSAndroid Build Coastguard Worker 882*cda5da8dSAndroid Build Coastguard Worker # check whether we can close this thread's connection because 883*cda5da8dSAndroid Build Coastguard Worker # the process owns no more references to objects for this manager 884*cda5da8dSAndroid Build Coastguard Worker if not idset and hasattr(tls, 'connection'): 885*cda5da8dSAndroid Build Coastguard Worker util.debug('thread %r has no more proxies so closing conn', 886*cda5da8dSAndroid Build Coastguard Worker threading.current_thread().name) 887*cda5da8dSAndroid Build Coastguard Worker tls.connection.close() 888*cda5da8dSAndroid Build Coastguard Worker del tls.connection 889*cda5da8dSAndroid Build Coastguard Worker 890*cda5da8dSAndroid Build Coastguard Worker def _after_fork(self): 891*cda5da8dSAndroid Build Coastguard Worker self._manager = None 892*cda5da8dSAndroid Build Coastguard Worker try: 893*cda5da8dSAndroid Build Coastguard Worker self._incref() 894*cda5da8dSAndroid Build Coastguard Worker except Exception as e: 895*cda5da8dSAndroid Build Coastguard Worker # the proxy may just be for a manager which has shutdown 896*cda5da8dSAndroid Build Coastguard Worker util.info('incref failed: %s' % e) 897*cda5da8dSAndroid Build Coastguard Worker 898*cda5da8dSAndroid Build Coastguard Worker def __reduce__(self): 899*cda5da8dSAndroid Build Coastguard Worker kwds = {} 900*cda5da8dSAndroid Build Coastguard Worker if get_spawning_popen() is not None: 901*cda5da8dSAndroid Build Coastguard Worker kwds['authkey'] = self._authkey 902*cda5da8dSAndroid Build Coastguard Worker 903*cda5da8dSAndroid Build Coastguard Worker if getattr(self, '_isauto', False): 904*cda5da8dSAndroid Build Coastguard Worker kwds['exposed'] = self._exposed_ 905*cda5da8dSAndroid Build Coastguard Worker return (RebuildProxy, 906*cda5da8dSAndroid Build Coastguard Worker (AutoProxy, self._token, self._serializer, kwds)) 907*cda5da8dSAndroid Build Coastguard Worker else: 908*cda5da8dSAndroid Build Coastguard Worker return (RebuildProxy, 909*cda5da8dSAndroid Build Coastguard Worker (type(self), self._token, self._serializer, kwds)) 910*cda5da8dSAndroid Build Coastguard Worker 911*cda5da8dSAndroid Build Coastguard Worker def __deepcopy__(self, memo): 912*cda5da8dSAndroid Build Coastguard Worker return self._getvalue() 913*cda5da8dSAndroid Build Coastguard Worker 914*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 915*cda5da8dSAndroid Build Coastguard Worker return '<%s object, typeid %r at %#x>' % \ 916*cda5da8dSAndroid Build Coastguard Worker (type(self).__name__, self._token.typeid, id(self)) 917*cda5da8dSAndroid Build Coastguard Worker 918*cda5da8dSAndroid Build Coastguard Worker def __str__(self): 919*cda5da8dSAndroid Build Coastguard Worker ''' 920*cda5da8dSAndroid Build Coastguard Worker Return representation of the referent (or a fall-back if that fails) 921*cda5da8dSAndroid Build Coastguard Worker ''' 922*cda5da8dSAndroid Build Coastguard Worker try: 923*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('__repr__') 924*cda5da8dSAndroid Build Coastguard Worker except Exception: 925*cda5da8dSAndroid Build Coastguard Worker return repr(self)[:-1] + "; '__str__()' failed>" 926*cda5da8dSAndroid Build Coastguard Worker 927*cda5da8dSAndroid Build Coastguard Worker# 928*cda5da8dSAndroid Build Coastguard Worker# Function used for unpickling 929*cda5da8dSAndroid Build Coastguard Worker# 930*cda5da8dSAndroid Build Coastguard Worker 931*cda5da8dSAndroid Build Coastguard Workerdef RebuildProxy(func, token, serializer, kwds): 932*cda5da8dSAndroid Build Coastguard Worker ''' 933*cda5da8dSAndroid Build Coastguard Worker Function used for unpickling proxy objects. 934*cda5da8dSAndroid Build Coastguard Worker ''' 935*cda5da8dSAndroid Build Coastguard Worker server = getattr(process.current_process(), '_manager_server', None) 936*cda5da8dSAndroid Build Coastguard Worker if server and server.address == token.address: 937*cda5da8dSAndroid Build Coastguard Worker util.debug('Rebuild a proxy owned by manager, token=%r', token) 938*cda5da8dSAndroid Build Coastguard Worker kwds['manager_owned'] = True 939*cda5da8dSAndroid Build Coastguard Worker if token.id not in server.id_to_local_proxy_obj: 940*cda5da8dSAndroid Build Coastguard Worker server.id_to_local_proxy_obj[token.id] = \ 941*cda5da8dSAndroid Build Coastguard Worker server.id_to_obj[token.id] 942*cda5da8dSAndroid Build Coastguard Worker incref = ( 943*cda5da8dSAndroid Build Coastguard Worker kwds.pop('incref', True) and 944*cda5da8dSAndroid Build Coastguard Worker not getattr(process.current_process(), '_inheriting', False) 945*cda5da8dSAndroid Build Coastguard Worker ) 946*cda5da8dSAndroid Build Coastguard Worker return func(token, serializer, incref=incref, **kwds) 947*cda5da8dSAndroid Build Coastguard Worker 948*cda5da8dSAndroid Build Coastguard Worker# 949*cda5da8dSAndroid Build Coastguard Worker# Functions to create proxies and proxy types 950*cda5da8dSAndroid Build Coastguard Worker# 951*cda5da8dSAndroid Build Coastguard Worker 952*cda5da8dSAndroid Build Coastguard Workerdef MakeProxyType(name, exposed, _cache={}): 953*cda5da8dSAndroid Build Coastguard Worker ''' 954*cda5da8dSAndroid Build Coastguard Worker Return a proxy type whose methods are given by `exposed` 955*cda5da8dSAndroid Build Coastguard Worker ''' 956*cda5da8dSAndroid Build Coastguard Worker exposed = tuple(exposed) 957*cda5da8dSAndroid Build Coastguard Worker try: 958*cda5da8dSAndroid Build Coastguard Worker return _cache[(name, exposed)] 959*cda5da8dSAndroid Build Coastguard Worker except KeyError: 960*cda5da8dSAndroid Build Coastguard Worker pass 961*cda5da8dSAndroid Build Coastguard Worker 962*cda5da8dSAndroid Build Coastguard Worker dic = {} 963*cda5da8dSAndroid Build Coastguard Worker 964*cda5da8dSAndroid Build Coastguard Worker for meth in exposed: 965*cda5da8dSAndroid Build Coastguard Worker exec('''def %s(self, /, *args, **kwds): 966*cda5da8dSAndroid Build Coastguard Worker return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) 967*cda5da8dSAndroid Build Coastguard Worker 968*cda5da8dSAndroid Build Coastguard Worker ProxyType = type(name, (BaseProxy,), dic) 969*cda5da8dSAndroid Build Coastguard Worker ProxyType._exposed_ = exposed 970*cda5da8dSAndroid Build Coastguard Worker _cache[(name, exposed)] = ProxyType 971*cda5da8dSAndroid Build Coastguard Worker return ProxyType 972*cda5da8dSAndroid Build Coastguard Worker 973*cda5da8dSAndroid Build Coastguard Worker 974*cda5da8dSAndroid Build Coastguard Workerdef AutoProxy(token, serializer, manager=None, authkey=None, 975*cda5da8dSAndroid Build Coastguard Worker exposed=None, incref=True, manager_owned=False): 976*cda5da8dSAndroid Build Coastguard Worker ''' 977*cda5da8dSAndroid Build Coastguard Worker Return an auto-proxy for `token` 978*cda5da8dSAndroid Build Coastguard Worker ''' 979*cda5da8dSAndroid Build Coastguard Worker _Client = listener_client[serializer][1] 980*cda5da8dSAndroid Build Coastguard Worker 981*cda5da8dSAndroid Build Coastguard Worker if exposed is None: 982*cda5da8dSAndroid Build Coastguard Worker conn = _Client(token.address, authkey=authkey) 983*cda5da8dSAndroid Build Coastguard Worker try: 984*cda5da8dSAndroid Build Coastguard Worker exposed = dispatch(conn, None, 'get_methods', (token,)) 985*cda5da8dSAndroid Build Coastguard Worker finally: 986*cda5da8dSAndroid Build Coastguard Worker conn.close() 987*cda5da8dSAndroid Build Coastguard Worker 988*cda5da8dSAndroid Build Coastguard Worker if authkey is None and manager is not None: 989*cda5da8dSAndroid Build Coastguard Worker authkey = manager._authkey 990*cda5da8dSAndroid Build Coastguard Worker if authkey is None: 991*cda5da8dSAndroid Build Coastguard Worker authkey = process.current_process().authkey 992*cda5da8dSAndroid Build Coastguard Worker 993*cda5da8dSAndroid Build Coastguard Worker ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) 994*cda5da8dSAndroid Build Coastguard Worker proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, 995*cda5da8dSAndroid Build Coastguard Worker incref=incref, manager_owned=manager_owned) 996*cda5da8dSAndroid Build Coastguard Worker proxy._isauto = True 997*cda5da8dSAndroid Build Coastguard Worker return proxy 998*cda5da8dSAndroid Build Coastguard Worker 999*cda5da8dSAndroid Build Coastguard Worker# 1000*cda5da8dSAndroid Build Coastguard Worker# Types/callables which we will register with SyncManager 1001*cda5da8dSAndroid Build Coastguard Worker# 1002*cda5da8dSAndroid Build Coastguard Worker 1003*cda5da8dSAndroid Build Coastguard Workerclass Namespace(object): 1004*cda5da8dSAndroid Build Coastguard Worker def __init__(self, /, **kwds): 1005*cda5da8dSAndroid Build Coastguard Worker self.__dict__.update(kwds) 1006*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 1007*cda5da8dSAndroid Build Coastguard Worker items = list(self.__dict__.items()) 1008*cda5da8dSAndroid Build Coastguard Worker temp = [] 1009*cda5da8dSAndroid Build Coastguard Worker for name, value in items: 1010*cda5da8dSAndroid Build Coastguard Worker if not name.startswith('_'): 1011*cda5da8dSAndroid Build Coastguard Worker temp.append('%s=%r' % (name, value)) 1012*cda5da8dSAndroid Build Coastguard Worker temp.sort() 1013*cda5da8dSAndroid Build Coastguard Worker return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) 1014*cda5da8dSAndroid Build Coastguard Worker 1015*cda5da8dSAndroid Build Coastguard Workerclass Value(object): 1016*cda5da8dSAndroid Build Coastguard Worker def __init__(self, typecode, value, lock=True): 1017*cda5da8dSAndroid Build Coastguard Worker self._typecode = typecode 1018*cda5da8dSAndroid Build Coastguard Worker self._value = value 1019*cda5da8dSAndroid Build Coastguard Worker def get(self): 1020*cda5da8dSAndroid Build Coastguard Worker return self._value 1021*cda5da8dSAndroid Build Coastguard Worker def set(self, value): 1022*cda5da8dSAndroid Build Coastguard Worker self._value = value 1023*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 1024*cda5da8dSAndroid Build Coastguard Worker return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 1025*cda5da8dSAndroid Build Coastguard Worker value = property(get, set) 1026*cda5da8dSAndroid Build Coastguard Worker 1027*cda5da8dSAndroid Build Coastguard Workerdef Array(typecode, sequence, lock=True): 1028*cda5da8dSAndroid Build Coastguard Worker return array.array(typecode, sequence) 1029*cda5da8dSAndroid Build Coastguard Worker 1030*cda5da8dSAndroid Build Coastguard Worker# 1031*cda5da8dSAndroid Build Coastguard Worker# Proxy types used by SyncManager 1032*cda5da8dSAndroid Build Coastguard Worker# 1033*cda5da8dSAndroid Build Coastguard Worker 1034*cda5da8dSAndroid Build Coastguard Workerclass IteratorProxy(BaseProxy): 1035*cda5da8dSAndroid Build Coastguard Worker _exposed_ = ('__next__', 'send', 'throw', 'close') 1036*cda5da8dSAndroid Build Coastguard Worker def __iter__(self): 1037*cda5da8dSAndroid Build Coastguard Worker return self 1038*cda5da8dSAndroid Build Coastguard Worker def __next__(self, *args): 1039*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('__next__', args) 1040*cda5da8dSAndroid Build Coastguard Worker def send(self, *args): 1041*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('send', args) 1042*cda5da8dSAndroid Build Coastguard Worker def throw(self, *args): 1043*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('throw', args) 1044*cda5da8dSAndroid Build Coastguard Worker def close(self, *args): 1045*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('close', args) 1046*cda5da8dSAndroid Build Coastguard Worker 1047*cda5da8dSAndroid Build Coastguard Worker 1048*cda5da8dSAndroid Build Coastguard Workerclass AcquirerProxy(BaseProxy): 1049*cda5da8dSAndroid Build Coastguard Worker _exposed_ = ('acquire', 'release') 1050*cda5da8dSAndroid Build Coastguard Worker def acquire(self, blocking=True, timeout=None): 1051*cda5da8dSAndroid Build Coastguard Worker args = (blocking,) if timeout is None else (blocking, timeout) 1052*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('acquire', args) 1053*cda5da8dSAndroid Build Coastguard Worker def release(self): 1054*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('release') 1055*cda5da8dSAndroid Build Coastguard Worker def __enter__(self): 1056*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('acquire') 1057*cda5da8dSAndroid Build Coastguard Worker def __exit__(self, exc_type, exc_val, exc_tb): 1058*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('release') 1059*cda5da8dSAndroid Build Coastguard Worker 1060*cda5da8dSAndroid Build Coastguard Worker 1061*cda5da8dSAndroid Build Coastguard Workerclass ConditionProxy(AcquirerProxy): 1062*cda5da8dSAndroid Build Coastguard Worker _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') 1063*cda5da8dSAndroid Build Coastguard Worker def wait(self, timeout=None): 1064*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('wait', (timeout,)) 1065*cda5da8dSAndroid Build Coastguard Worker def notify(self, n=1): 1066*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('notify', (n,)) 1067*cda5da8dSAndroid Build Coastguard Worker def notify_all(self): 1068*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('notify_all') 1069*cda5da8dSAndroid Build Coastguard Worker def wait_for(self, predicate, timeout=None): 1070*cda5da8dSAndroid Build Coastguard Worker result = predicate() 1071*cda5da8dSAndroid Build Coastguard Worker if result: 1072*cda5da8dSAndroid Build Coastguard Worker return result 1073*cda5da8dSAndroid Build Coastguard Worker if timeout is not None: 1074*cda5da8dSAndroid Build Coastguard Worker endtime = time.monotonic() + timeout 1075*cda5da8dSAndroid Build Coastguard Worker else: 1076*cda5da8dSAndroid Build Coastguard Worker endtime = None 1077*cda5da8dSAndroid Build Coastguard Worker waittime = None 1078*cda5da8dSAndroid Build Coastguard Worker while not result: 1079*cda5da8dSAndroid Build Coastguard Worker if endtime is not None: 1080*cda5da8dSAndroid Build Coastguard Worker waittime = endtime - time.monotonic() 1081*cda5da8dSAndroid Build Coastguard Worker if waittime <= 0: 1082*cda5da8dSAndroid Build Coastguard Worker break 1083*cda5da8dSAndroid Build Coastguard Worker self.wait(waittime) 1084*cda5da8dSAndroid Build Coastguard Worker result = predicate() 1085*cda5da8dSAndroid Build Coastguard Worker return result 1086*cda5da8dSAndroid Build Coastguard Worker 1087*cda5da8dSAndroid Build Coastguard Worker 1088*cda5da8dSAndroid Build Coastguard Workerclass EventProxy(BaseProxy): 1089*cda5da8dSAndroid Build Coastguard Worker _exposed_ = ('is_set', 'set', 'clear', 'wait') 1090*cda5da8dSAndroid Build Coastguard Worker def is_set(self): 1091*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('is_set') 1092*cda5da8dSAndroid Build Coastguard Worker def set(self): 1093*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('set') 1094*cda5da8dSAndroid Build Coastguard Worker def clear(self): 1095*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('clear') 1096*cda5da8dSAndroid Build Coastguard Worker def wait(self, timeout=None): 1097*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('wait', (timeout,)) 1098*cda5da8dSAndroid Build Coastguard Worker 1099*cda5da8dSAndroid Build Coastguard Worker 1100*cda5da8dSAndroid Build Coastguard Workerclass BarrierProxy(BaseProxy): 1101*cda5da8dSAndroid Build Coastguard Worker _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') 1102*cda5da8dSAndroid Build Coastguard Worker def wait(self, timeout=None): 1103*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('wait', (timeout,)) 1104*cda5da8dSAndroid Build Coastguard Worker def abort(self): 1105*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('abort') 1106*cda5da8dSAndroid Build Coastguard Worker def reset(self): 1107*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('reset') 1108*cda5da8dSAndroid Build Coastguard Worker @property 1109*cda5da8dSAndroid Build Coastguard Worker def parties(self): 1110*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('__getattribute__', ('parties',)) 1111*cda5da8dSAndroid Build Coastguard Worker @property 1112*cda5da8dSAndroid Build Coastguard Worker def n_waiting(self): 1113*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('__getattribute__', ('n_waiting',)) 1114*cda5da8dSAndroid Build Coastguard Worker @property 1115*cda5da8dSAndroid Build Coastguard Worker def broken(self): 1116*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('__getattribute__', ('broken',)) 1117*cda5da8dSAndroid Build Coastguard Worker 1118*cda5da8dSAndroid Build Coastguard Worker 1119*cda5da8dSAndroid Build Coastguard Workerclass NamespaceProxy(BaseProxy): 1120*cda5da8dSAndroid Build Coastguard Worker _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') 1121*cda5da8dSAndroid Build Coastguard Worker def __getattr__(self, key): 1122*cda5da8dSAndroid Build Coastguard Worker if key[0] == '_': 1123*cda5da8dSAndroid Build Coastguard Worker return object.__getattribute__(self, key) 1124*cda5da8dSAndroid Build Coastguard Worker callmethod = object.__getattribute__(self, '_callmethod') 1125*cda5da8dSAndroid Build Coastguard Worker return callmethod('__getattribute__', (key,)) 1126*cda5da8dSAndroid Build Coastguard Worker def __setattr__(self, key, value): 1127*cda5da8dSAndroid Build Coastguard Worker if key[0] == '_': 1128*cda5da8dSAndroid Build Coastguard Worker return object.__setattr__(self, key, value) 1129*cda5da8dSAndroid Build Coastguard Worker callmethod = object.__getattribute__(self, '_callmethod') 1130*cda5da8dSAndroid Build Coastguard Worker return callmethod('__setattr__', (key, value)) 1131*cda5da8dSAndroid Build Coastguard Worker def __delattr__(self, key): 1132*cda5da8dSAndroid Build Coastguard Worker if key[0] == '_': 1133*cda5da8dSAndroid Build Coastguard Worker return object.__delattr__(self, key) 1134*cda5da8dSAndroid Build Coastguard Worker callmethod = object.__getattribute__(self, '_callmethod') 1135*cda5da8dSAndroid Build Coastguard Worker return callmethod('__delattr__', (key,)) 1136*cda5da8dSAndroid Build Coastguard Worker 1137*cda5da8dSAndroid Build Coastguard Worker 1138*cda5da8dSAndroid Build Coastguard Workerclass ValueProxy(BaseProxy): 1139*cda5da8dSAndroid Build Coastguard Worker _exposed_ = ('get', 'set') 1140*cda5da8dSAndroid Build Coastguard Worker def get(self): 1141*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('get') 1142*cda5da8dSAndroid Build Coastguard Worker def set(self, value): 1143*cda5da8dSAndroid Build Coastguard Worker return self._callmethod('set', (value,)) 1144*cda5da8dSAndroid Build Coastguard Worker value = property(get, set) 1145*cda5da8dSAndroid Build Coastguard Worker 1146*cda5da8dSAndroid Build Coastguard Worker __class_getitem__ = classmethod(types.GenericAlias) 1147*cda5da8dSAndroid Build Coastguard Worker 1148*cda5da8dSAndroid Build Coastguard Worker 1149*cda5da8dSAndroid Build Coastguard WorkerBaseListProxy = MakeProxyType('BaseListProxy', ( 1150*cda5da8dSAndroid Build Coastguard Worker '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', 1151*cda5da8dSAndroid Build Coastguard Worker '__mul__', '__reversed__', '__rmul__', '__setitem__', 1152*cda5da8dSAndroid Build Coastguard Worker 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 1153*cda5da8dSAndroid Build Coastguard Worker 'reverse', 'sort', '__imul__' 1154*cda5da8dSAndroid Build Coastguard Worker )) 1155*cda5da8dSAndroid Build Coastguard Workerclass ListProxy(BaseListProxy): 1156*cda5da8dSAndroid Build Coastguard Worker def __iadd__(self, value): 1157*cda5da8dSAndroid Build Coastguard Worker self._callmethod('extend', (value,)) 1158*cda5da8dSAndroid Build Coastguard Worker return self 1159*cda5da8dSAndroid Build Coastguard Worker def __imul__(self, value): 1160*cda5da8dSAndroid Build Coastguard Worker self._callmethod('__imul__', (value,)) 1161*cda5da8dSAndroid Build Coastguard Worker return self 1162*cda5da8dSAndroid Build Coastguard Worker 1163*cda5da8dSAndroid Build Coastguard Worker 1164*cda5da8dSAndroid Build Coastguard WorkerDictProxy = MakeProxyType('DictProxy', ( 1165*cda5da8dSAndroid Build Coastguard Worker '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', 1166*cda5da8dSAndroid Build Coastguard Worker '__setitem__', 'clear', 'copy', 'get', 'items', 1167*cda5da8dSAndroid Build Coastguard Worker 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' 1168*cda5da8dSAndroid Build Coastguard Worker )) 1169*cda5da8dSAndroid Build Coastguard WorkerDictProxy._method_to_typeid_ = { 1170*cda5da8dSAndroid Build Coastguard Worker '__iter__': 'Iterator', 1171*cda5da8dSAndroid Build Coastguard Worker } 1172*cda5da8dSAndroid Build Coastguard Worker 1173*cda5da8dSAndroid Build Coastguard Worker 1174*cda5da8dSAndroid Build Coastguard WorkerArrayProxy = MakeProxyType('ArrayProxy', ( 1175*cda5da8dSAndroid Build Coastguard Worker '__len__', '__getitem__', '__setitem__' 1176*cda5da8dSAndroid Build Coastguard Worker )) 1177*cda5da8dSAndroid Build Coastguard Worker 1178*cda5da8dSAndroid Build Coastguard Worker 1179*cda5da8dSAndroid Build Coastguard WorkerBasePoolProxy = MakeProxyType('PoolProxy', ( 1180*cda5da8dSAndroid Build Coastguard Worker 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 1181*cda5da8dSAndroid Build Coastguard Worker 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', 1182*cda5da8dSAndroid Build Coastguard Worker )) 1183*cda5da8dSAndroid Build Coastguard WorkerBasePoolProxy._method_to_typeid_ = { 1184*cda5da8dSAndroid Build Coastguard Worker 'apply_async': 'AsyncResult', 1185*cda5da8dSAndroid Build Coastguard Worker 'map_async': 'AsyncResult', 1186*cda5da8dSAndroid Build Coastguard Worker 'starmap_async': 'AsyncResult', 1187*cda5da8dSAndroid Build Coastguard Worker 'imap': 'Iterator', 1188*cda5da8dSAndroid Build Coastguard Worker 'imap_unordered': 'Iterator' 1189*cda5da8dSAndroid Build Coastguard Worker } 1190*cda5da8dSAndroid Build Coastguard Workerclass PoolProxy(BasePoolProxy): 1191*cda5da8dSAndroid Build Coastguard Worker def __enter__(self): 1192*cda5da8dSAndroid Build Coastguard Worker return self 1193*cda5da8dSAndroid Build Coastguard Worker def __exit__(self, exc_type, exc_val, exc_tb): 1194*cda5da8dSAndroid Build Coastguard Worker self.terminate() 1195*cda5da8dSAndroid Build Coastguard Worker 1196*cda5da8dSAndroid Build Coastguard Worker# 1197*cda5da8dSAndroid Build Coastguard Worker# Definition of SyncManager 1198*cda5da8dSAndroid Build Coastguard Worker# 1199*cda5da8dSAndroid Build Coastguard Worker 1200*cda5da8dSAndroid Build Coastguard Workerclass SyncManager(BaseManager): 1201*cda5da8dSAndroid Build Coastguard Worker ''' 1202*cda5da8dSAndroid Build Coastguard Worker Subclass of `BaseManager` which supports a number of shared object types. 1203*cda5da8dSAndroid Build Coastguard Worker 1204*cda5da8dSAndroid Build Coastguard Worker The types registered are those intended for the synchronization 1205*cda5da8dSAndroid Build Coastguard Worker of threads, plus `dict`, `list` and `Namespace`. 1206*cda5da8dSAndroid Build Coastguard Worker 1207*cda5da8dSAndroid Build Coastguard Worker The `multiprocessing.Manager()` function creates started instances of 1208*cda5da8dSAndroid Build Coastguard Worker this class. 1209*cda5da8dSAndroid Build Coastguard Worker ''' 1210*cda5da8dSAndroid Build Coastguard Worker 1211*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Queue', queue.Queue) 1212*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('JoinableQueue', queue.Queue) 1213*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Event', threading.Event, EventProxy) 1214*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Lock', threading.Lock, AcquirerProxy) 1215*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('RLock', threading.RLock, AcquirerProxy) 1216*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) 1217*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, 1218*cda5da8dSAndroid Build Coastguard Worker AcquirerProxy) 1219*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Condition', threading.Condition, ConditionProxy) 1220*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Barrier', threading.Barrier, BarrierProxy) 1221*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Pool', pool.Pool, PoolProxy) 1222*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('list', list, ListProxy) 1223*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('dict', dict, DictProxy) 1224*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Value', Value, ValueProxy) 1225*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Array', Array, ArrayProxy) 1226*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Namespace', Namespace, NamespaceProxy) 1227*cda5da8dSAndroid Build Coastguard Worker 1228*cda5da8dSAndroid Build Coastguard Worker# types returned by methods of PoolProxy 1229*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) 1230*cda5da8dSAndroid Build Coastguard WorkerSyncManager.register('AsyncResult', create_method=False) 1231*cda5da8dSAndroid Build Coastguard Worker 1232*cda5da8dSAndroid Build Coastguard Worker# 1233*cda5da8dSAndroid Build Coastguard Worker# Definition of SharedMemoryManager and SharedMemoryServer 1234*cda5da8dSAndroid Build Coastguard Worker# 1235*cda5da8dSAndroid Build Coastguard Worker 1236*cda5da8dSAndroid Build Coastguard Workerif HAS_SHMEM: 1237*cda5da8dSAndroid Build Coastguard Worker class _SharedMemoryTracker: 1238*cda5da8dSAndroid Build Coastguard Worker "Manages one or more shared memory segments." 1239*cda5da8dSAndroid Build Coastguard Worker 1240*cda5da8dSAndroid Build Coastguard Worker def __init__(self, name, segment_names=[]): 1241*cda5da8dSAndroid Build Coastguard Worker self.shared_memory_context_name = name 1242*cda5da8dSAndroid Build Coastguard Worker self.segment_names = segment_names 1243*cda5da8dSAndroid Build Coastguard Worker 1244*cda5da8dSAndroid Build Coastguard Worker def register_segment(self, segment_name): 1245*cda5da8dSAndroid Build Coastguard Worker "Adds the supplied shared memory block name to tracker." 1246*cda5da8dSAndroid Build Coastguard Worker util.debug(f"Register segment {segment_name!r} in pid {getpid()}") 1247*cda5da8dSAndroid Build Coastguard Worker self.segment_names.append(segment_name) 1248*cda5da8dSAndroid Build Coastguard Worker 1249*cda5da8dSAndroid Build Coastguard Worker def destroy_segment(self, segment_name): 1250*cda5da8dSAndroid Build Coastguard Worker """Calls unlink() on the shared memory block with the supplied name 1251*cda5da8dSAndroid Build Coastguard Worker and removes it from the list of blocks being tracked.""" 1252*cda5da8dSAndroid Build Coastguard Worker util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") 1253*cda5da8dSAndroid Build Coastguard Worker self.segment_names.remove(segment_name) 1254*cda5da8dSAndroid Build Coastguard Worker segment = shared_memory.SharedMemory(segment_name) 1255*cda5da8dSAndroid Build Coastguard Worker segment.close() 1256*cda5da8dSAndroid Build Coastguard Worker segment.unlink() 1257*cda5da8dSAndroid Build Coastguard Worker 1258*cda5da8dSAndroid Build Coastguard Worker def unlink(self): 1259*cda5da8dSAndroid Build Coastguard Worker "Calls destroy_segment() on all tracked shared memory blocks." 1260*cda5da8dSAndroid Build Coastguard Worker for segment_name in self.segment_names[:]: 1261*cda5da8dSAndroid Build Coastguard Worker self.destroy_segment(segment_name) 1262*cda5da8dSAndroid Build Coastguard Worker 1263*cda5da8dSAndroid Build Coastguard Worker def __del__(self): 1264*cda5da8dSAndroid Build Coastguard Worker util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") 1265*cda5da8dSAndroid Build Coastguard Worker self.unlink() 1266*cda5da8dSAndroid Build Coastguard Worker 1267*cda5da8dSAndroid Build Coastguard Worker def __getstate__(self): 1268*cda5da8dSAndroid Build Coastguard Worker return (self.shared_memory_context_name, self.segment_names) 1269*cda5da8dSAndroid Build Coastguard Worker 1270*cda5da8dSAndroid Build Coastguard Worker def __setstate__(self, state): 1271*cda5da8dSAndroid Build Coastguard Worker self.__init__(*state) 1272*cda5da8dSAndroid Build Coastguard Worker 1273*cda5da8dSAndroid Build Coastguard Worker 1274*cda5da8dSAndroid Build Coastguard Worker class SharedMemoryServer(Server): 1275*cda5da8dSAndroid Build Coastguard Worker 1276*cda5da8dSAndroid Build Coastguard Worker public = Server.public + \ 1277*cda5da8dSAndroid Build Coastguard Worker ['track_segment', 'release_segment', 'list_segments'] 1278*cda5da8dSAndroid Build Coastguard Worker 1279*cda5da8dSAndroid Build Coastguard Worker def __init__(self, *args, **kwargs): 1280*cda5da8dSAndroid Build Coastguard Worker Server.__init__(self, *args, **kwargs) 1281*cda5da8dSAndroid Build Coastguard Worker address = self.address 1282*cda5da8dSAndroid Build Coastguard Worker # The address of Linux abstract namespaces can be bytes 1283*cda5da8dSAndroid Build Coastguard Worker if isinstance(address, bytes): 1284*cda5da8dSAndroid Build Coastguard Worker address = os.fsdecode(address) 1285*cda5da8dSAndroid Build Coastguard Worker self.shared_memory_context = \ 1286*cda5da8dSAndroid Build Coastguard Worker _SharedMemoryTracker(f"shm_{address}_{getpid()}") 1287*cda5da8dSAndroid Build Coastguard Worker util.debug(f"SharedMemoryServer started by pid {getpid()}") 1288*cda5da8dSAndroid Build Coastguard Worker 1289*cda5da8dSAndroid Build Coastguard Worker def create(self, c, typeid, /, *args, **kwargs): 1290*cda5da8dSAndroid Build Coastguard Worker """Create a new distributed-shared object (not backed by a shared 1291*cda5da8dSAndroid Build Coastguard Worker memory block) and return its id to be used in a Proxy Object.""" 1292*cda5da8dSAndroid Build Coastguard Worker # Unless set up as a shared proxy, don't make shared_memory_context 1293*cda5da8dSAndroid Build Coastguard Worker # a standard part of kwargs. This makes things easier for supplying 1294*cda5da8dSAndroid Build Coastguard Worker # simple functions. 1295*cda5da8dSAndroid Build Coastguard Worker if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): 1296*cda5da8dSAndroid Build Coastguard Worker kwargs['shared_memory_context'] = self.shared_memory_context 1297*cda5da8dSAndroid Build Coastguard Worker return Server.create(self, c, typeid, *args, **kwargs) 1298*cda5da8dSAndroid Build Coastguard Worker 1299*cda5da8dSAndroid Build Coastguard Worker def shutdown(self, c): 1300*cda5da8dSAndroid Build Coastguard Worker "Call unlink() on all tracked shared memory, terminate the Server." 1301*cda5da8dSAndroid Build Coastguard Worker self.shared_memory_context.unlink() 1302*cda5da8dSAndroid Build Coastguard Worker return Server.shutdown(self, c) 1303*cda5da8dSAndroid Build Coastguard Worker 1304*cda5da8dSAndroid Build Coastguard Worker def track_segment(self, c, segment_name): 1305*cda5da8dSAndroid Build Coastguard Worker "Adds the supplied shared memory block name to Server's tracker." 1306*cda5da8dSAndroid Build Coastguard Worker self.shared_memory_context.register_segment(segment_name) 1307*cda5da8dSAndroid Build Coastguard Worker 1308*cda5da8dSAndroid Build Coastguard Worker def release_segment(self, c, segment_name): 1309*cda5da8dSAndroid Build Coastguard Worker """Calls unlink() on the shared memory block with the supplied name 1310*cda5da8dSAndroid Build Coastguard Worker and removes it from the tracker instance inside the Server.""" 1311*cda5da8dSAndroid Build Coastguard Worker self.shared_memory_context.destroy_segment(segment_name) 1312*cda5da8dSAndroid Build Coastguard Worker 1313*cda5da8dSAndroid Build Coastguard Worker def list_segments(self, c): 1314*cda5da8dSAndroid Build Coastguard Worker """Returns a list of names of shared memory blocks that the Server 1315*cda5da8dSAndroid Build Coastguard Worker is currently tracking.""" 1316*cda5da8dSAndroid Build Coastguard Worker return self.shared_memory_context.segment_names 1317*cda5da8dSAndroid Build Coastguard Worker 1318*cda5da8dSAndroid Build Coastguard Worker 1319*cda5da8dSAndroid Build Coastguard Worker class SharedMemoryManager(BaseManager): 1320*cda5da8dSAndroid Build Coastguard Worker """Like SyncManager but uses SharedMemoryServer instead of Server. 1321*cda5da8dSAndroid Build Coastguard Worker 1322*cda5da8dSAndroid Build Coastguard Worker It provides methods for creating and returning SharedMemory instances 1323*cda5da8dSAndroid Build Coastguard Worker and for creating a list-like object (ShareableList) backed by shared 1324*cda5da8dSAndroid Build Coastguard Worker memory. It also provides methods that create and return Proxy Objects 1325*cda5da8dSAndroid Build Coastguard Worker that support synchronization across processes (i.e. multi-process-safe 1326*cda5da8dSAndroid Build Coastguard Worker locks and semaphores). 1327*cda5da8dSAndroid Build Coastguard Worker """ 1328*cda5da8dSAndroid Build Coastguard Worker 1329*cda5da8dSAndroid Build Coastguard Worker _Server = SharedMemoryServer 1330*cda5da8dSAndroid Build Coastguard Worker 1331*cda5da8dSAndroid Build Coastguard Worker def __init__(self, *args, **kwargs): 1332*cda5da8dSAndroid Build Coastguard Worker if os.name == "posix": 1333*cda5da8dSAndroid Build Coastguard Worker # bpo-36867: Ensure the resource_tracker is running before 1334*cda5da8dSAndroid Build Coastguard Worker # launching the manager process, so that concurrent 1335*cda5da8dSAndroid Build Coastguard Worker # shared_memory manipulation both in the manager and in the 1336*cda5da8dSAndroid Build Coastguard Worker # current process does not create two resource_tracker 1337*cda5da8dSAndroid Build Coastguard Worker # processes. 1338*cda5da8dSAndroid Build Coastguard Worker from . import resource_tracker 1339*cda5da8dSAndroid Build Coastguard Worker resource_tracker.ensure_running() 1340*cda5da8dSAndroid Build Coastguard Worker BaseManager.__init__(self, *args, **kwargs) 1341*cda5da8dSAndroid Build Coastguard Worker util.debug(f"{self.__class__.__name__} created by pid {getpid()}") 1342*cda5da8dSAndroid Build Coastguard Worker 1343*cda5da8dSAndroid Build Coastguard Worker def __del__(self): 1344*cda5da8dSAndroid Build Coastguard Worker util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") 1345*cda5da8dSAndroid Build Coastguard Worker 1346*cda5da8dSAndroid Build Coastguard Worker def get_server(self): 1347*cda5da8dSAndroid Build Coastguard Worker 'Better than monkeypatching for now; merge into Server ultimately' 1348*cda5da8dSAndroid Build Coastguard Worker if self._state.value != State.INITIAL: 1349*cda5da8dSAndroid Build Coastguard Worker if self._state.value == State.STARTED: 1350*cda5da8dSAndroid Build Coastguard Worker raise ProcessError("Already started SharedMemoryServer") 1351*cda5da8dSAndroid Build Coastguard Worker elif self._state.value == State.SHUTDOWN: 1352*cda5da8dSAndroid Build Coastguard Worker raise ProcessError("SharedMemoryManager has shut down") 1353*cda5da8dSAndroid Build Coastguard Worker else: 1354*cda5da8dSAndroid Build Coastguard Worker raise ProcessError( 1355*cda5da8dSAndroid Build Coastguard Worker "Unknown state {!r}".format(self._state.value)) 1356*cda5da8dSAndroid Build Coastguard Worker return self._Server(self._registry, self._address, 1357*cda5da8dSAndroid Build Coastguard Worker self._authkey, self._serializer) 1358*cda5da8dSAndroid Build Coastguard Worker 1359*cda5da8dSAndroid Build Coastguard Worker def SharedMemory(self, size): 1360*cda5da8dSAndroid Build Coastguard Worker """Returns a new SharedMemory instance with the specified size in 1361*cda5da8dSAndroid Build Coastguard Worker bytes, to be tracked by the manager.""" 1362*cda5da8dSAndroid Build Coastguard Worker with self._Client(self._address, authkey=self._authkey) as conn: 1363*cda5da8dSAndroid Build Coastguard Worker sms = shared_memory.SharedMemory(None, create=True, size=size) 1364*cda5da8dSAndroid Build Coastguard Worker try: 1365*cda5da8dSAndroid Build Coastguard Worker dispatch(conn, None, 'track_segment', (sms.name,)) 1366*cda5da8dSAndroid Build Coastguard Worker except BaseException as e: 1367*cda5da8dSAndroid Build Coastguard Worker sms.unlink() 1368*cda5da8dSAndroid Build Coastguard Worker raise e 1369*cda5da8dSAndroid Build Coastguard Worker return sms 1370*cda5da8dSAndroid Build Coastguard Worker 1371*cda5da8dSAndroid Build Coastguard Worker def ShareableList(self, sequence): 1372*cda5da8dSAndroid Build Coastguard Worker """Returns a new ShareableList instance populated with the values 1373*cda5da8dSAndroid Build Coastguard Worker from the input sequence, to be tracked by the manager.""" 1374*cda5da8dSAndroid Build Coastguard Worker with self._Client(self._address, authkey=self._authkey) as conn: 1375*cda5da8dSAndroid Build Coastguard Worker sl = shared_memory.ShareableList(sequence) 1376*cda5da8dSAndroid Build Coastguard Worker try: 1377*cda5da8dSAndroid Build Coastguard Worker dispatch(conn, None, 'track_segment', (sl.shm.name,)) 1378*cda5da8dSAndroid Build Coastguard Worker except BaseException as e: 1379*cda5da8dSAndroid Build Coastguard Worker sl.shm.unlink() 1380*cda5da8dSAndroid Build Coastguard Worker raise e 1381*cda5da8dSAndroid Build Coastguard Worker return sl 1382