1*9c5db199SXin Li# Lint as: python2, python3 2*9c5db199SXin Liimport sys, socket, errno, logging 3*9c5db199SXin Lifrom time import time, sleep 4*9c5db199SXin Lifrom autotest_lib.client.common_lib import error, utils 5*9c5db199SXin Li 6*9c5db199SXin Li# default barrier port 7*9c5db199SXin Li_DEFAULT_PORT = 11922 8*9c5db199SXin Li 9*9c5db199SXin Lidef _get_host_from_id(hostid): 10*9c5db199SXin Li # Remove any trailing local identifier following a #. 11*9c5db199SXin Li # This allows multiple members per host which is particularly 12*9c5db199SXin Li # helpful in testing. 13*9c5db199SXin Li if not hostid.startswith('#'): 14*9c5db199SXin Li return hostid.split('#')[0] 15*9c5db199SXin Li else: 16*9c5db199SXin Li raise error.BarrierError( 17*9c5db199SXin Li "Invalid Host id: Host Address should be specified") 18*9c5db199SXin Li 19*9c5db199SXin Li 20*9c5db199SXin Liclass BarrierAbortError(error.BarrierError): 21*9c5db199SXin Li """Special BarrierError raised when an explicit abort is requested.""" 22*9c5db199SXin Li 23*9c5db199SXin Li 24*9c5db199SXin Liclass listen_server(object): 25*9c5db199SXin Li """ 26*9c5db199SXin Li Manages a listening socket for barrier. 27*9c5db199SXin Li 28*9c5db199SXin Li Can be used to run multiple barrier instances with the same listening 29*9c5db199SXin Li socket (if they were going to listen on the same port). 30*9c5db199SXin Li 31*9c5db199SXin Li Attributes: 32*9c5db199SXin Li 33*9c5db199SXin Li @attr address: Address to bind to (string). 34*9c5db199SXin Li @attr port: Port to bind to. 35*9c5db199SXin Li @attr socket: Listening socket object. 36*9c5db199SXin Li """ 37*9c5db199SXin Li def __init__(self, address='', port=_DEFAULT_PORT): 38*9c5db199SXin Li """ 39*9c5db199SXin Li Create a listen_server instance for the given address/port. 40*9c5db199SXin Li 41*9c5db199SXin Li @param address: The address to listen on. 42*9c5db199SXin Li @param port: The port to listen on. 43*9c5db199SXin Li """ 44*9c5db199SXin Li self.address = address 45*9c5db199SXin Li self.port = port 46*9c5db199SXin Li # Open the port so that the listening server can accept incoming 47*9c5db199SXin Li # connections. 48*9c5db199SXin Li utils.run('iptables -A INPUT -p tcp -m tcp --dport %d -j ACCEPT' % 49*9c5db199SXin Li port) 50*9c5db199SXin Li self.socket = self._setup() 51*9c5db199SXin Li 52*9c5db199SXin Li 53*9c5db199SXin Li def _setup(self): 54*9c5db199SXin Li """Create, bind and listen on the listening socket.""" 55*9c5db199SXin Li sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 56*9c5db199SXin Li sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 57*9c5db199SXin Li sock.bind((self.address, self.port)) 58*9c5db199SXin Li sock.listen(10) 59*9c5db199SXin Li 60*9c5db199SXin Li return sock 61*9c5db199SXin Li 62*9c5db199SXin Li 63*9c5db199SXin Li def close(self): 64*9c5db199SXin Li """Close the listening socket.""" 65*9c5db199SXin Li self.socket.close() 66*9c5db199SXin Li 67*9c5db199SXin Li 68*9c5db199SXin Liclass barrier(object): 69*9c5db199SXin Li """Multi-machine barrier support. 70*9c5db199SXin Li 71*9c5db199SXin Li Provides multi-machine barrier mechanism. 72*9c5db199SXin Li Execution stops until all members arrive at the barrier. 73*9c5db199SXin Li 74*9c5db199SXin Li Implementation Details: 75*9c5db199SXin Li ....................... 76*9c5db199SXin Li 77*9c5db199SXin Li When a barrier is forming the main node (first in sort order) in the 78*9c5db199SXin Li set accepts connections from each member of the set. As they arrive 79*9c5db199SXin Li they indicate the barrier they are joining and their identifier (their 80*9c5db199SXin Li hostname or IP address and optional tag). They are then asked to wait. 81*9c5db199SXin Li When all members are present the main node then checks that each 82*9c5db199SXin Li member is still responding via a ping/pong exchange. If this is 83*9c5db199SXin Li successful then everyone has checked in at the barrier. We then tell 84*9c5db199SXin Li everyone they may continue via a rlse message. 85*9c5db199SXin Li 86*9c5db199SXin Li Where the main is not the first to reach the barrier the client 87*9c5db199SXin Li connects will fail. Client will retry until they either succeed in 88*9c5db199SXin Li connecting to main or the overall timeout is exceeded. 89*9c5db199SXin Li 90*9c5db199SXin Li As an example here is the exchange for a three node barrier called 91*9c5db199SXin Li 'TAG' 92*9c5db199SXin Li 93*9c5db199SXin Li MAIN CLIENT1 CLIENT2 94*9c5db199SXin Li <-------------TAG C1------------- 95*9c5db199SXin Li --------------wait--------------> 96*9c5db199SXin Li [...] 97*9c5db199SXin Li <-------------TAG C2----------------------------- 98*9c5db199SXin Li --------------wait------------------------------> 99*9c5db199SXin Li [...] 100*9c5db199SXin Li --------------ping--------------> 101*9c5db199SXin Li <-------------pong--------------- 102*9c5db199SXin Li --------------ping------------------------------> 103*9c5db199SXin Li <-------------pong------------------------------- 104*9c5db199SXin Li ----- BARRIER conditions MET ----- 105*9c5db199SXin Li --------------rlse--------------> 106*9c5db199SXin Li --------------rlse------------------------------> 107*9c5db199SXin Li 108*9c5db199SXin Li Note that once the last client has responded to pong the barrier is 109*9c5db199SXin Li implicitly deemed satisifed, they have all acknowledged their presence. 110*9c5db199SXin Li If we fail to send any of the rlse messages the barrier is still a 111*9c5db199SXin Li success, the failed host has effectively broken 'right at the beginning' 112*9c5db199SXin Li of the post barrier execution window. 113*9c5db199SXin Li 114*9c5db199SXin Li In addition, there is another rendezvous, that makes each node a server 115*9c5db199SXin Li and the main a client. The connection process and usage is still the 116*9c5db199SXin Li same but allows barriers from machines that only have a one-way 117*9c5db199SXin Li connection initiation. This is called rendezvous_servers. 118*9c5db199SXin Li 119*9c5db199SXin Li For example: 120*9c5db199SXin Li if ME == SERVER: 121*9c5db199SXin Li server start 122*9c5db199SXin Li 123*9c5db199SXin Li b = job.barrier(ME, 'server-up', 120) 124*9c5db199SXin Li b.rendezvous(CLIENT, SERVER) 125*9c5db199SXin Li 126*9c5db199SXin Li if ME == CLIENT: 127*9c5db199SXin Li client run 128*9c5db199SXin Li 129*9c5db199SXin Li b = job.barrier(ME, 'test-complete', 3600) 130*9c5db199SXin Li b.rendezvous(CLIENT, SERVER) 131*9c5db199SXin Li 132*9c5db199SXin Li if ME == SERVER: 133*9c5db199SXin Li server stop 134*9c5db199SXin Li 135*9c5db199SXin Li Any client can also request an abort of the job by setting 136*9c5db199SXin Li abort=True in the rendezvous arguments. 137*9c5db199SXin Li """ 138*9c5db199SXin Li 139*9c5db199SXin Li def __init__(self, hostid, tag, timeout=None, port=None, 140*9c5db199SXin Li listen_server=None): 141*9c5db199SXin Li """ 142*9c5db199SXin Li @param hostid: My hostname/IP address + optional tag. 143*9c5db199SXin Li @param tag: Symbolic name of the barrier in progress. 144*9c5db199SXin Li @param timeout: Maximum seconds to wait for a the barrier to meet. 145*9c5db199SXin Li @param port: Port number to listen on. 146*9c5db199SXin Li @param listen_server: External listen_server instance to use instead 147*9c5db199SXin Li of creating our own. Create a listen_server instance and 148*9c5db199SXin Li reuse it across multiple barrier instances so that the 149*9c5db199SXin Li barrier code doesn't try to quickly re-bind on the same port 150*9c5db199SXin Li (packets still in transit for the previous barrier they may 151*9c5db199SXin Li reset new connections). 152*9c5db199SXin Li """ 153*9c5db199SXin Li self._hostid = hostid 154*9c5db199SXin Li self._tag = tag 155*9c5db199SXin Li if listen_server: 156*9c5db199SXin Li if port: 157*9c5db199SXin Li raise error.BarrierError( 158*9c5db199SXin Li '"port" and "listen_server" are mutually exclusive.') 159*9c5db199SXin Li self._port = listen_server.port 160*9c5db199SXin Li else: 161*9c5db199SXin Li self._port = port or _DEFAULT_PORT 162*9c5db199SXin Li self._server = listen_server # A listen_server instance or None. 163*9c5db199SXin Li self._members = [] # List of hosts we expect to find at the barrier. 164*9c5db199SXin Li self._timeout_secs = timeout 165*9c5db199SXin Li self._start_time = None # Timestamp of when we started waiting. 166*9c5db199SXin Li self._mainid = None # Host/IP + optional tag of selected main. 167*9c5db199SXin Li logging.info("tag=%s port=%d timeout=%r", 168*9c5db199SXin Li self._tag, self._port, self._timeout_secs) 169*9c5db199SXin Li 170*9c5db199SXin Li # Number of clients seen (should be the length of self._waiting). 171*9c5db199SXin Li self._seen = 0 172*9c5db199SXin Li 173*9c5db199SXin Li # Clients who have checked in and are waiting (if we are a main). 174*9c5db199SXin Li self._waiting = {} # Maps from hostname -> (client, addr) tuples. 175*9c5db199SXin Li 176*9c5db199SXin Li 177*9c5db199SXin Li def _update_timeout(self, timeout): 178*9c5db199SXin Li if timeout is not None and self._start_time is not None: 179*9c5db199SXin Li self._timeout_secs = (time() - self._start_time) + timeout 180*9c5db199SXin Li else: 181*9c5db199SXin Li self._timeout_secs = timeout 182*9c5db199SXin Li 183*9c5db199SXin Li 184*9c5db199SXin Li def _remaining(self): 185*9c5db199SXin Li if self._timeout_secs is not None and self._start_time is not None: 186*9c5db199SXin Li timeout = self._timeout_secs - (time() - self._start_time) 187*9c5db199SXin Li if timeout <= 0: 188*9c5db199SXin Li errmsg = "timeout waiting for barrier: %s" % self._tag 189*9c5db199SXin Li logging.error(error) 190*9c5db199SXin Li raise error.BarrierError(errmsg) 191*9c5db199SXin Li else: 192*9c5db199SXin Li timeout = self._timeout_secs 193*9c5db199SXin Li 194*9c5db199SXin Li if self._timeout_secs is not None: 195*9c5db199SXin Li logging.info("seconds remaining: %d", timeout) 196*9c5db199SXin Li return timeout 197*9c5db199SXin Li 198*9c5db199SXin Li 199*9c5db199SXin Li def _main_welcome(self, connection): 200*9c5db199SXin Li client, addr = connection 201*9c5db199SXin Li name = None 202*9c5db199SXin Li 203*9c5db199SXin Li client.settimeout(5) 204*9c5db199SXin Li try: 205*9c5db199SXin Li # Get the clients name. 206*9c5db199SXin Li intro = client.recv(1024) 207*9c5db199SXin Li intro = intro.strip("\r\n") 208*9c5db199SXin Li 209*9c5db199SXin Li intro_parts = intro.split(' ', 2) 210*9c5db199SXin Li if len(intro_parts) != 2: 211*9c5db199SXin Li logging.warning("Ignoring invalid data from %s: %r", 212*9c5db199SXin Li client.getpeername(), intro) 213*9c5db199SXin Li client.close() 214*9c5db199SXin Li return 215*9c5db199SXin Li tag, name = intro_parts 216*9c5db199SXin Li 217*9c5db199SXin Li logging.info("new client tag=%s, name=%s", tag, name) 218*9c5db199SXin Li 219*9c5db199SXin Li # Ok, we know who is trying to attach. Confirm that 220*9c5db199SXin Li # they are coming to the same meeting. Also, everyone 221*9c5db199SXin Li # should be using a unique handle (their IP address). 222*9c5db199SXin Li # If we see a duplicate, something _bad_ has happened 223*9c5db199SXin Li # so drop them now. 224*9c5db199SXin Li if self._tag != tag: 225*9c5db199SXin Li logging.warning("client arriving for the wrong barrier: %s != %s", 226*9c5db199SXin Li self._tag, tag) 227*9c5db199SXin Li client.settimeout(5) 228*9c5db199SXin Li client.send("!tag") 229*9c5db199SXin Li client.close() 230*9c5db199SXin Li return 231*9c5db199SXin Li elif name in self._waiting: 232*9c5db199SXin Li logging.warning("duplicate client") 233*9c5db199SXin Li client.settimeout(5) 234*9c5db199SXin Li client.send("!dup") 235*9c5db199SXin Li client.close() 236*9c5db199SXin Li return 237*9c5db199SXin Li 238*9c5db199SXin Li # Acknowledge the client 239*9c5db199SXin Li client.send("wait") 240*9c5db199SXin Li 241*9c5db199SXin Li except socket.timeout: 242*9c5db199SXin Li # This is nominally an error, but as we do not know 243*9c5db199SXin Li # who that was we cannot do anything valid other 244*9c5db199SXin Li # than report it and let the normal timeout kill 245*9c5db199SXin Li # us when that's appropriate. 246*9c5db199SXin Li logging.warning("client handshake timeout: (%s:%d)", 247*9c5db199SXin Li addr[0], addr[1]) 248*9c5db199SXin Li client.close() 249*9c5db199SXin Li return 250*9c5db199SXin Li 251*9c5db199SXin Li logging.info("client now waiting: %s (%s:%d)", 252*9c5db199SXin Li name, addr[0], addr[1]) 253*9c5db199SXin Li 254*9c5db199SXin Li # They seem to be valid record them. 255*9c5db199SXin Li self._waiting[name] = connection 256*9c5db199SXin Li self._seen += 1 257*9c5db199SXin Li 258*9c5db199SXin Li 259*9c5db199SXin Li def _node_hello(self, connection): 260*9c5db199SXin Li (client, addr) = connection 261*9c5db199SXin Li name = None 262*9c5db199SXin Li 263*9c5db199SXin Li client.settimeout(5) 264*9c5db199SXin Li try: 265*9c5db199SXin Li client.send(self._tag + " " + self._hostid) 266*9c5db199SXin Li 267*9c5db199SXin Li reply = client.recv(4) 268*9c5db199SXin Li reply = reply.strip(b"\r\n") 269*9c5db199SXin Li logging.info("main said: %s", reply) 270*9c5db199SXin Li # Confirm the main accepted the connection. 271*9c5db199SXin Li if reply != "wait": 272*9c5db199SXin Li logging.warning("Bad connection request to main") 273*9c5db199SXin Li client.close() 274*9c5db199SXin Li return 275*9c5db199SXin Li 276*9c5db199SXin Li except socket.timeout: 277*9c5db199SXin Li # This is nominally an error, but as we do not know 278*9c5db199SXin Li # who that was we cannot do anything valid other 279*9c5db199SXin Li # than report it and let the normal timeout kill 280*9c5db199SXin Li # us when that's appropriate. 281*9c5db199SXin Li logging.error("main handshake timeout: (%s:%d)", 282*9c5db199SXin Li addr[0], addr[1]) 283*9c5db199SXin Li client.close() 284*9c5db199SXin Li return 285*9c5db199SXin Li 286*9c5db199SXin Li logging.info("node now waiting: (%s:%d)", addr[0], addr[1]) 287*9c5db199SXin Li 288*9c5db199SXin Li # They seem to be valid record them. 289*9c5db199SXin Li self._waiting[self._hostid] = connection 290*9c5db199SXin Li self._seen = 1 291*9c5db199SXin Li 292*9c5db199SXin Li 293*9c5db199SXin Li def _main_release(self): 294*9c5db199SXin Li # Check everyone is still there, that they have not 295*9c5db199SXin Li # crashed or disconnected in the meantime. 296*9c5db199SXin Li allpresent = True 297*9c5db199SXin Li abort = self._abort 298*9c5db199SXin Li for name in self._waiting: 299*9c5db199SXin Li (client, addr) = self._waiting[name] 300*9c5db199SXin Li 301*9c5db199SXin Li logging.info("checking client present: %s", name) 302*9c5db199SXin Li 303*9c5db199SXin Li client.settimeout(5) 304*9c5db199SXin Li reply = 'none' 305*9c5db199SXin Li try: 306*9c5db199SXin Li client.send("ping") 307*9c5db199SXin Li reply = client.recv(1024) 308*9c5db199SXin Li except socket.timeout: 309*9c5db199SXin Li logging.warning("ping/pong timeout: %s", name) 310*9c5db199SXin Li pass 311*9c5db199SXin Li 312*9c5db199SXin Li if reply == 'abrt': 313*9c5db199SXin Li logging.warning("Client %s requested abort", name) 314*9c5db199SXin Li abort = True 315*9c5db199SXin Li elif reply != "pong": 316*9c5db199SXin Li allpresent = False 317*9c5db199SXin Li 318*9c5db199SXin Li if not allpresent: 319*9c5db199SXin Li raise error.BarrierError("main lost client") 320*9c5db199SXin Li 321*9c5db199SXin Li if abort: 322*9c5db199SXin Li logging.info("Aborting the clients") 323*9c5db199SXin Li msg = 'abrt' 324*9c5db199SXin Li else: 325*9c5db199SXin Li logging.info("Releasing clients") 326*9c5db199SXin Li msg = 'rlse' 327*9c5db199SXin Li 328*9c5db199SXin Li # If every ones checks in then commit the release. 329*9c5db199SXin Li for name in self._waiting: 330*9c5db199SXin Li (client, addr) = self._waiting[name] 331*9c5db199SXin Li 332*9c5db199SXin Li client.settimeout(5) 333*9c5db199SXin Li try: 334*9c5db199SXin Li client.send(msg) 335*9c5db199SXin Li except socket.timeout: 336*9c5db199SXin Li logging.warning("release timeout: %s", name) 337*9c5db199SXin Li pass 338*9c5db199SXin Li 339*9c5db199SXin Li if abort: 340*9c5db199SXin Li raise BarrierAbortError("Client requested abort") 341*9c5db199SXin Li 342*9c5db199SXin Li 343*9c5db199SXin Li def _waiting_close(self): 344*9c5db199SXin Li # Either way, close out all the clients. If we have 345*9c5db199SXin Li # not released them then they know to abort. 346*9c5db199SXin Li for name in self._waiting: 347*9c5db199SXin Li (client, addr) = self._waiting[name] 348*9c5db199SXin Li 349*9c5db199SXin Li logging.info("closing client: %s", name) 350*9c5db199SXin Li 351*9c5db199SXin Li try: 352*9c5db199SXin Li client.close() 353*9c5db199SXin Li except: 354*9c5db199SXin Li pass 355*9c5db199SXin Li 356*9c5db199SXin Li 357*9c5db199SXin Li def _run_server(self, is_main): 358*9c5db199SXin Li server = self._server or listen_server(port=self._port) 359*9c5db199SXin Li failed = 0 360*9c5db199SXin Li try: 361*9c5db199SXin Li while True: 362*9c5db199SXin Li try: 363*9c5db199SXin Li # Wait for callers welcoming each. 364*9c5db199SXin Li server.socket.settimeout(self._remaining()) 365*9c5db199SXin Li connection = server.socket.accept() 366*9c5db199SXin Li if is_main: 367*9c5db199SXin Li self._main_welcome(connection) 368*9c5db199SXin Li else: 369*9c5db199SXin Li self._node_hello(connection) 370*9c5db199SXin Li except socket.timeout: 371*9c5db199SXin Li logging.warning("timeout waiting for remaining clients") 372*9c5db199SXin Li pass 373*9c5db199SXin Li 374*9c5db199SXin Li if is_main: 375*9c5db199SXin Li # Check if everyone is here. 376*9c5db199SXin Li logging.info("main seen %d of %d", 377*9c5db199SXin Li self._seen, len(self._members)) 378*9c5db199SXin Li if self._seen == len(self._members): 379*9c5db199SXin Li self._main_release() 380*9c5db199SXin Li break 381*9c5db199SXin Li else: 382*9c5db199SXin Li # Check if main connected. 383*9c5db199SXin Li if self._seen: 384*9c5db199SXin Li logging.info("node connected to main") 385*9c5db199SXin Li self._node_wait() 386*9c5db199SXin Li break 387*9c5db199SXin Li finally: 388*9c5db199SXin Li self._waiting_close() 389*9c5db199SXin Li # if we created the listening_server in the beginning of this 390*9c5db199SXin Li # function then close the listening socket here 391*9c5db199SXin Li if not self._server: 392*9c5db199SXin Li server.close() 393*9c5db199SXin Li 394*9c5db199SXin Li 395*9c5db199SXin Li def _run_client(self, is_main): 396*9c5db199SXin Li while self._remaining() is None or self._remaining() > 0: 397*9c5db199SXin Li try: 398*9c5db199SXin Li remote = socket.socket(socket.AF_INET, 399*9c5db199SXin Li socket.SOCK_STREAM) 400*9c5db199SXin Li remote.settimeout(30) 401*9c5db199SXin Li if is_main: 402*9c5db199SXin Li # Connect to all node. 403*9c5db199SXin Li host = _get_host_from_id(self._members[self._seen]) 404*9c5db199SXin Li logging.info("calling node: %s", host) 405*9c5db199SXin Li connection = (remote, (host, self._port)) 406*9c5db199SXin Li remote.connect(connection[1]) 407*9c5db199SXin Li self._main_welcome(connection) 408*9c5db199SXin Li else: 409*9c5db199SXin Li # Just connect to the main. 410*9c5db199SXin Li host = _get_host_from_id(self._mainid) 411*9c5db199SXin Li logging.info("calling main") 412*9c5db199SXin Li connection = (remote, (host, self._port)) 413*9c5db199SXin Li remote.connect(connection[1]) 414*9c5db199SXin Li self._node_hello(connection) 415*9c5db199SXin Li except socket.timeout: 416*9c5db199SXin Li logging.warning("timeout calling host, retry") 417*9c5db199SXin Li sleep(10) 418*9c5db199SXin Li pass 419*9c5db199SXin Li except socket.error as err: 420*9c5db199SXin Li (code, str) = err 421*9c5db199SXin Li if (code != errno.ECONNREFUSED and 422*9c5db199SXin Li code != errno.ETIMEDOUT): 423*9c5db199SXin Li raise 424*9c5db199SXin Li sleep(10) 425*9c5db199SXin Li 426*9c5db199SXin Li if is_main: 427*9c5db199SXin Li # Check if everyone is here. 428*9c5db199SXin Li logging.info("main seen %d of %d", 429*9c5db199SXin Li self._seen, len(self._members)) 430*9c5db199SXin Li if self._seen == len(self._members): 431*9c5db199SXin Li self._main_release() 432*9c5db199SXin Li break 433*9c5db199SXin Li else: 434*9c5db199SXin Li # Check if main connected. 435*9c5db199SXin Li if self._seen: 436*9c5db199SXin Li logging.info("node connected to main") 437*9c5db199SXin Li self._node_wait() 438*9c5db199SXin Li break 439*9c5db199SXin Li 440*9c5db199SXin Li self._waiting_close() 441*9c5db199SXin Li 442*9c5db199SXin Li 443*9c5db199SXin Li def _node_wait(self): 444*9c5db199SXin Li remote = self._waiting[self._hostid][0] 445*9c5db199SXin Li mode = "wait" 446*9c5db199SXin Li while True: 447*9c5db199SXin Li # All control messages are the same size to allow 448*9c5db199SXin Li # us to split individual messages easily. 449*9c5db199SXin Li remote.settimeout(self._remaining()) 450*9c5db199SXin Li reply = remote.recv(4) 451*9c5db199SXin Li if not reply: 452*9c5db199SXin Li break 453*9c5db199SXin Li 454*9c5db199SXin Li reply = reply.strip("\r\n") 455*9c5db199SXin Li logging.info("main said: %s", reply) 456*9c5db199SXin Li 457*9c5db199SXin Li mode = reply 458*9c5db199SXin Li if reply == "ping": 459*9c5db199SXin Li # Ensure we have sufficient time for the 460*9c5db199SXin Li # ping/pong/rlse cyle to complete normally. 461*9c5db199SXin Li self._update_timeout(10 + 10 * len(self._members)) 462*9c5db199SXin Li 463*9c5db199SXin Li if self._abort: 464*9c5db199SXin Li msg = "abrt" 465*9c5db199SXin Li else: 466*9c5db199SXin Li msg = "pong" 467*9c5db199SXin Li logging.info(msg) 468*9c5db199SXin Li remote.settimeout(self._remaining()) 469*9c5db199SXin Li remote.send(msg) 470*9c5db199SXin Li 471*9c5db199SXin Li elif reply == "rlse" or reply == "abrt": 472*9c5db199SXin Li # Ensure we have sufficient time for the 473*9c5db199SXin Li # ping/pong/rlse cyle to complete normally. 474*9c5db199SXin Li self._update_timeout(10 + 10 * len(self._members)) 475*9c5db199SXin Li 476*9c5db199SXin Li logging.info("was released, waiting for close") 477*9c5db199SXin Li 478*9c5db199SXin Li if mode == "rlse": 479*9c5db199SXin Li pass 480*9c5db199SXin Li elif mode == "wait": 481*9c5db199SXin Li raise error.BarrierError("main abort -- barrier timeout") 482*9c5db199SXin Li elif mode == "ping": 483*9c5db199SXin Li raise error.BarrierError("main abort -- client lost") 484*9c5db199SXin Li elif mode == "!tag": 485*9c5db199SXin Li raise error.BarrierError("main abort -- incorrect tag") 486*9c5db199SXin Li elif mode == "!dup": 487*9c5db199SXin Li raise error.BarrierError("main abort -- duplicate client") 488*9c5db199SXin Li elif mode == "abrt": 489*9c5db199SXin Li raise BarrierAbortError("Client requested abort") 490*9c5db199SXin Li else: 491*9c5db199SXin Li raise error.BarrierError("main handshake failure: " + mode) 492*9c5db199SXin Li 493*9c5db199SXin Li 494*9c5db199SXin Li def rendezvous(self, *hosts, **dargs): 495*9c5db199SXin Li # if called with abort=True, this will raise an exception 496*9c5db199SXin Li # on all the clients. 497*9c5db199SXin Li self._start_time = time() 498*9c5db199SXin Li self._members = list(hosts) 499*9c5db199SXin Li self._members.sort() 500*9c5db199SXin Li self._mainid = self._members.pop(0) 501*9c5db199SXin Li self._abort = dargs.get('abort', False) 502*9c5db199SXin Li 503*9c5db199SXin Li logging.info("mainid: %s", self._mainid) 504*9c5db199SXin Li if self._abort: 505*9c5db199SXin Li logging.debug("%s is aborting", self._hostid) 506*9c5db199SXin Li if not len(self._members): 507*9c5db199SXin Li logging.info("No other members listed.") 508*9c5db199SXin Li return 509*9c5db199SXin Li logging.info("members: %s", ",".join(self._members)) 510*9c5db199SXin Li 511*9c5db199SXin Li self._seen = 0 512*9c5db199SXin Li self._waiting = {} 513*9c5db199SXin Li 514*9c5db199SXin Li # Figure out who is the main in this barrier. 515*9c5db199SXin Li if self._hostid == self._mainid: 516*9c5db199SXin Li logging.info("selected as main") 517*9c5db199SXin Li self._run_server(is_main=True) 518*9c5db199SXin Li else: 519*9c5db199SXin Li logging.info("selected as node") 520*9c5db199SXin Li self._run_client(is_main=False) 521*9c5db199SXin Li 522*9c5db199SXin Li 523*9c5db199SXin Li def rendezvous_servers(self, mainid, *hosts, **dargs): 524*9c5db199SXin Li # if called with abort=True, this will raise an exception 525*9c5db199SXin Li # on all the clients. 526*9c5db199SXin Li self._start_time = time() 527*9c5db199SXin Li self._members = list(hosts) 528*9c5db199SXin Li self._members.sort() 529*9c5db199SXin Li self._mainid = mainid 530*9c5db199SXin Li self._abort = dargs.get('abort', False) 531*9c5db199SXin Li 532*9c5db199SXin Li logging.info("mainid: %s", self._mainid) 533*9c5db199SXin Li if not len(self._members): 534*9c5db199SXin Li logging.info("No other members listed.") 535*9c5db199SXin Li return 536*9c5db199SXin Li logging.info("members: %s", ",".join(self._members)) 537*9c5db199SXin Li 538*9c5db199SXin Li self._seen = 0 539*9c5db199SXin Li self._waiting = {} 540*9c5db199SXin Li 541*9c5db199SXin Li # Figure out who is the main in this barrier. 542*9c5db199SXin Li if self._hostid == self._mainid: 543*9c5db199SXin Li logging.info("selected as main") 544*9c5db199SXin Li self._run_client(is_main=True) 545*9c5db199SXin Li else: 546*9c5db199SXin Li logging.info("selected as node") 547*9c5db199SXin Li self._run_server(is_main=False) 548