xref: /aosp_15_r20/external/autotest/client/common_lib/barrier.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Liimport sys, socket, errno, logging
3*9c5db199SXin Lifrom time import time, sleep
4*9c5db199SXin Lifrom autotest_lib.client.common_lib import error, utils
5*9c5db199SXin Li
6*9c5db199SXin Li# default barrier port
7*9c5db199SXin Li_DEFAULT_PORT = 11922
8*9c5db199SXin Li
9*9c5db199SXin Lidef _get_host_from_id(hostid):
10*9c5db199SXin Li    # Remove any trailing local identifier following a #.
11*9c5db199SXin Li    # This allows multiple members per host which is particularly
12*9c5db199SXin Li    # helpful in testing.
13*9c5db199SXin Li    if not hostid.startswith('#'):
14*9c5db199SXin Li        return hostid.split('#')[0]
15*9c5db199SXin Li    else:
16*9c5db199SXin Li        raise error.BarrierError(
17*9c5db199SXin Li            "Invalid Host id: Host Address should be specified")
18*9c5db199SXin Li
19*9c5db199SXin Li
20*9c5db199SXin Liclass BarrierAbortError(error.BarrierError):
21*9c5db199SXin Li    """Special BarrierError raised when an explicit abort is requested."""
22*9c5db199SXin Li
23*9c5db199SXin Li
24*9c5db199SXin Liclass listen_server(object):
25*9c5db199SXin Li    """
26*9c5db199SXin Li    Manages a listening socket for barrier.
27*9c5db199SXin Li
28*9c5db199SXin Li    Can be used to run multiple barrier instances with the same listening
29*9c5db199SXin Li    socket (if they were going to listen on the same port).
30*9c5db199SXin Li
31*9c5db199SXin Li    Attributes:
32*9c5db199SXin Li
33*9c5db199SXin Li    @attr address: Address to bind to (string).
34*9c5db199SXin Li    @attr port: Port to bind to.
35*9c5db199SXin Li    @attr socket: Listening socket object.
36*9c5db199SXin Li    """
37*9c5db199SXin Li    def __init__(self, address='', port=_DEFAULT_PORT):
38*9c5db199SXin Li        """
39*9c5db199SXin Li        Create a listen_server instance for the given address/port.
40*9c5db199SXin Li
41*9c5db199SXin Li        @param address: The address to listen on.
42*9c5db199SXin Li        @param port: The port to listen on.
43*9c5db199SXin Li        """
44*9c5db199SXin Li        self.address = address
45*9c5db199SXin Li        self.port = port
46*9c5db199SXin Li        # Open the port so that the listening server can accept incoming
47*9c5db199SXin Li        # connections.
48*9c5db199SXin Li        utils.run('iptables -A INPUT -p tcp -m tcp --dport %d -j ACCEPT' %
49*9c5db199SXin Li                  port)
50*9c5db199SXin Li        self.socket = self._setup()
51*9c5db199SXin Li
52*9c5db199SXin Li
53*9c5db199SXin Li    def _setup(self):
54*9c5db199SXin Li        """Create, bind and listen on the listening socket."""
55*9c5db199SXin Li        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
56*9c5db199SXin Li        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
57*9c5db199SXin Li        sock.bind((self.address, self.port))
58*9c5db199SXin Li        sock.listen(10)
59*9c5db199SXin Li
60*9c5db199SXin Li        return sock
61*9c5db199SXin Li
62*9c5db199SXin Li
63*9c5db199SXin Li    def close(self):
64*9c5db199SXin Li        """Close the listening socket."""
65*9c5db199SXin Li        self.socket.close()
66*9c5db199SXin Li
67*9c5db199SXin Li
68*9c5db199SXin Liclass barrier(object):
69*9c5db199SXin Li    """Multi-machine barrier support.
70*9c5db199SXin Li
71*9c5db199SXin Li    Provides multi-machine barrier mechanism.
72*9c5db199SXin Li    Execution stops until all members arrive at the barrier.
73*9c5db199SXin Li
74*9c5db199SXin Li    Implementation Details:
75*9c5db199SXin Li    .......................
76*9c5db199SXin Li
77*9c5db199SXin Li    When a barrier is forming the main node (first in sort order) in the
78*9c5db199SXin Li    set accepts connections from each member of the set.  As they arrive
79*9c5db199SXin Li    they indicate the barrier they are joining and their identifier (their
80*9c5db199SXin Li    hostname or IP address and optional tag).  They are then asked to wait.
81*9c5db199SXin Li    When all members are present the main node then checks that each
82*9c5db199SXin Li    member is still responding via a ping/pong exchange.  If this is
83*9c5db199SXin Li    successful then everyone has checked in at the barrier.  We then tell
84*9c5db199SXin Li    everyone they may continue via a rlse message.
85*9c5db199SXin Li
86*9c5db199SXin Li    Where the main is not the first to reach the barrier the client
87*9c5db199SXin Li    connects will fail.  Client will retry until they either succeed in
88*9c5db199SXin Li    connecting to main or the overall timeout is exceeded.
89*9c5db199SXin Li
90*9c5db199SXin Li    As an example here is the exchange for a three node barrier called
91*9c5db199SXin Li    'TAG'
92*9c5db199SXin Li
93*9c5db199SXin Li      MAIN                        CLIENT1         CLIENT2
94*9c5db199SXin Li        <-------------TAG C1-------------
95*9c5db199SXin Li        --------------wait-------------->
96*9c5db199SXin Li                      [...]
97*9c5db199SXin Li        <-------------TAG C2-----------------------------
98*9c5db199SXin Li        --------------wait------------------------------>
99*9c5db199SXin Li                      [...]
100*9c5db199SXin Li        --------------ping-------------->
101*9c5db199SXin Li        <-------------pong---------------
102*9c5db199SXin Li        --------------ping------------------------------>
103*9c5db199SXin Li        <-------------pong-------------------------------
104*9c5db199SXin Li                ----- BARRIER conditions MET -----
105*9c5db199SXin Li        --------------rlse-------------->
106*9c5db199SXin Li        --------------rlse------------------------------>
107*9c5db199SXin Li
108*9c5db199SXin Li    Note that once the last client has responded to pong the barrier is
109*9c5db199SXin Li    implicitly deemed satisifed, they have all acknowledged their presence.
110*9c5db199SXin Li    If we fail to send any of the rlse messages the barrier is still a
111*9c5db199SXin Li    success, the failed host has effectively broken 'right at the beginning'
112*9c5db199SXin Li    of the post barrier execution window.
113*9c5db199SXin Li
114*9c5db199SXin Li    In addition, there is another rendezvous, that makes each node a server
115*9c5db199SXin Li    and the main a client.  The connection process and usage is still the
116*9c5db199SXin Li    same but allows barriers from machines that only have a one-way
117*9c5db199SXin Li    connection initiation.  This is called rendezvous_servers.
118*9c5db199SXin Li
119*9c5db199SXin Li    For example:
120*9c5db199SXin Li        if ME == SERVER:
121*9c5db199SXin Li            server start
122*9c5db199SXin Li
123*9c5db199SXin Li        b = job.barrier(ME, 'server-up', 120)
124*9c5db199SXin Li        b.rendezvous(CLIENT, SERVER)
125*9c5db199SXin Li
126*9c5db199SXin Li        if ME == CLIENT:
127*9c5db199SXin Li            client run
128*9c5db199SXin Li
129*9c5db199SXin Li        b = job.barrier(ME, 'test-complete', 3600)
130*9c5db199SXin Li        b.rendezvous(CLIENT, SERVER)
131*9c5db199SXin Li
132*9c5db199SXin Li        if ME == SERVER:
133*9c5db199SXin Li            server stop
134*9c5db199SXin Li
135*9c5db199SXin Li    Any client can also request an abort of the job by setting
136*9c5db199SXin Li    abort=True in the rendezvous arguments.
137*9c5db199SXin Li    """
138*9c5db199SXin Li
139*9c5db199SXin Li    def __init__(self, hostid, tag, timeout=None, port=None,
140*9c5db199SXin Li                 listen_server=None):
141*9c5db199SXin Li        """
142*9c5db199SXin Li        @param hostid: My hostname/IP address + optional tag.
143*9c5db199SXin Li        @param tag: Symbolic name of the barrier in progress.
144*9c5db199SXin Li        @param timeout: Maximum seconds to wait for a the barrier to meet.
145*9c5db199SXin Li        @param port: Port number to listen on.
146*9c5db199SXin Li        @param listen_server: External listen_server instance to use instead
147*9c5db199SXin Li                of creating our own.  Create a listen_server instance and
148*9c5db199SXin Li                reuse it across multiple barrier instances so that the
149*9c5db199SXin Li                barrier code doesn't try to quickly re-bind on the same port
150*9c5db199SXin Li                (packets still in transit for the previous barrier they may
151*9c5db199SXin Li                reset new connections).
152*9c5db199SXin Li        """
153*9c5db199SXin Li        self._hostid = hostid
154*9c5db199SXin Li        self._tag = tag
155*9c5db199SXin Li        if listen_server:
156*9c5db199SXin Li            if port:
157*9c5db199SXin Li                raise error.BarrierError(
158*9c5db199SXin Li                        '"port" and "listen_server" are mutually exclusive.')
159*9c5db199SXin Li            self._port = listen_server.port
160*9c5db199SXin Li        else:
161*9c5db199SXin Li            self._port = port or _DEFAULT_PORT
162*9c5db199SXin Li        self._server = listen_server  # A listen_server instance or None.
163*9c5db199SXin Li        self._members = []  # List of hosts we expect to find at the barrier.
164*9c5db199SXin Li        self._timeout_secs = timeout
165*9c5db199SXin Li        self._start_time = None  # Timestamp of when we started waiting.
166*9c5db199SXin Li        self._mainid = None  # Host/IP + optional tag of selected main.
167*9c5db199SXin Li        logging.info("tag=%s port=%d timeout=%r",
168*9c5db199SXin Li                     self._tag, self._port, self._timeout_secs)
169*9c5db199SXin Li
170*9c5db199SXin Li        # Number of clients seen (should be the length of self._waiting).
171*9c5db199SXin Li        self._seen = 0
172*9c5db199SXin Li
173*9c5db199SXin Li        # Clients who have checked in and are waiting (if we are a main).
174*9c5db199SXin Li        self._waiting = {}  # Maps from hostname -> (client, addr) tuples.
175*9c5db199SXin Li
176*9c5db199SXin Li
177*9c5db199SXin Li    def _update_timeout(self, timeout):
178*9c5db199SXin Li        if timeout is not None and self._start_time is not None:
179*9c5db199SXin Li            self._timeout_secs = (time() - self._start_time) + timeout
180*9c5db199SXin Li        else:
181*9c5db199SXin Li            self._timeout_secs = timeout
182*9c5db199SXin Li
183*9c5db199SXin Li
184*9c5db199SXin Li    def _remaining(self):
185*9c5db199SXin Li        if self._timeout_secs is not None and self._start_time is not None:
186*9c5db199SXin Li            timeout = self._timeout_secs - (time() - self._start_time)
187*9c5db199SXin Li            if timeout <= 0:
188*9c5db199SXin Li                errmsg = "timeout waiting for barrier: %s" % self._tag
189*9c5db199SXin Li                logging.error(error)
190*9c5db199SXin Li                raise error.BarrierError(errmsg)
191*9c5db199SXin Li        else:
192*9c5db199SXin Li            timeout = self._timeout_secs
193*9c5db199SXin Li
194*9c5db199SXin Li        if self._timeout_secs is not None:
195*9c5db199SXin Li            logging.info("seconds remaining: %d", timeout)
196*9c5db199SXin Li        return timeout
197*9c5db199SXin Li
198*9c5db199SXin Li
199*9c5db199SXin Li    def _main_welcome(self, connection):
200*9c5db199SXin Li        client, addr = connection
201*9c5db199SXin Li        name = None
202*9c5db199SXin Li
203*9c5db199SXin Li        client.settimeout(5)
204*9c5db199SXin Li        try:
205*9c5db199SXin Li            # Get the clients name.
206*9c5db199SXin Li            intro = client.recv(1024)
207*9c5db199SXin Li            intro = intro.strip("\r\n")
208*9c5db199SXin Li
209*9c5db199SXin Li            intro_parts = intro.split(' ', 2)
210*9c5db199SXin Li            if len(intro_parts) != 2:
211*9c5db199SXin Li                logging.warning("Ignoring invalid data from %s: %r",
212*9c5db199SXin Li                             client.getpeername(), intro)
213*9c5db199SXin Li                client.close()
214*9c5db199SXin Li                return
215*9c5db199SXin Li            tag, name = intro_parts
216*9c5db199SXin Li
217*9c5db199SXin Li            logging.info("new client tag=%s, name=%s", tag, name)
218*9c5db199SXin Li
219*9c5db199SXin Li            # Ok, we know who is trying to attach.  Confirm that
220*9c5db199SXin Li            # they are coming to the same meeting.  Also, everyone
221*9c5db199SXin Li            # should be using a unique handle (their IP address).
222*9c5db199SXin Li            # If we see a duplicate, something _bad_ has happened
223*9c5db199SXin Li            # so drop them now.
224*9c5db199SXin Li            if self._tag != tag:
225*9c5db199SXin Li                logging.warning("client arriving for the wrong barrier: %s != %s",
226*9c5db199SXin Li                             self._tag, tag)
227*9c5db199SXin Li                client.settimeout(5)
228*9c5db199SXin Li                client.send("!tag")
229*9c5db199SXin Li                client.close()
230*9c5db199SXin Li                return
231*9c5db199SXin Li            elif name in self._waiting:
232*9c5db199SXin Li                logging.warning("duplicate client")
233*9c5db199SXin Li                client.settimeout(5)
234*9c5db199SXin Li                client.send("!dup")
235*9c5db199SXin Li                client.close()
236*9c5db199SXin Li                return
237*9c5db199SXin Li
238*9c5db199SXin Li            # Acknowledge the client
239*9c5db199SXin Li            client.send("wait")
240*9c5db199SXin Li
241*9c5db199SXin Li        except socket.timeout:
242*9c5db199SXin Li            # This is nominally an error, but as we do not know
243*9c5db199SXin Li            # who that was we cannot do anything valid other
244*9c5db199SXin Li            # than report it and let the normal timeout kill
245*9c5db199SXin Li            # us when that's appropriate.
246*9c5db199SXin Li            logging.warning("client handshake timeout: (%s:%d)",
247*9c5db199SXin Li                         addr[0], addr[1])
248*9c5db199SXin Li            client.close()
249*9c5db199SXin Li            return
250*9c5db199SXin Li
251*9c5db199SXin Li        logging.info("client now waiting: %s (%s:%d)",
252*9c5db199SXin Li                     name, addr[0], addr[1])
253*9c5db199SXin Li
254*9c5db199SXin Li        # They seem to be valid record them.
255*9c5db199SXin Li        self._waiting[name] = connection
256*9c5db199SXin Li        self._seen += 1
257*9c5db199SXin Li
258*9c5db199SXin Li
259*9c5db199SXin Li    def _node_hello(self, connection):
260*9c5db199SXin Li        (client, addr) = connection
261*9c5db199SXin Li        name = None
262*9c5db199SXin Li
263*9c5db199SXin Li        client.settimeout(5)
264*9c5db199SXin Li        try:
265*9c5db199SXin Li            client.send(self._tag + " " + self._hostid)
266*9c5db199SXin Li
267*9c5db199SXin Li            reply = client.recv(4)
268*9c5db199SXin Li            reply = reply.strip(b"\r\n")
269*9c5db199SXin Li            logging.info("main said: %s", reply)
270*9c5db199SXin Li            # Confirm the main accepted the connection.
271*9c5db199SXin Li            if reply != "wait":
272*9c5db199SXin Li                logging.warning("Bad connection request to main")
273*9c5db199SXin Li                client.close()
274*9c5db199SXin Li                return
275*9c5db199SXin Li
276*9c5db199SXin Li        except socket.timeout:
277*9c5db199SXin Li            # This is nominally an error, but as we do not know
278*9c5db199SXin Li            # who that was we cannot do anything valid other
279*9c5db199SXin Li            # than report it and let the normal timeout kill
280*9c5db199SXin Li            # us when that's appropriate.
281*9c5db199SXin Li            logging.error("main handshake timeout: (%s:%d)",
282*9c5db199SXin Li                          addr[0], addr[1])
283*9c5db199SXin Li            client.close()
284*9c5db199SXin Li            return
285*9c5db199SXin Li
286*9c5db199SXin Li        logging.info("node now waiting: (%s:%d)", addr[0], addr[1])
287*9c5db199SXin Li
288*9c5db199SXin Li        # They seem to be valid record them.
289*9c5db199SXin Li        self._waiting[self._hostid] = connection
290*9c5db199SXin Li        self._seen = 1
291*9c5db199SXin Li
292*9c5db199SXin Li
293*9c5db199SXin Li    def _main_release(self):
294*9c5db199SXin Li        # Check everyone is still there, that they have not
295*9c5db199SXin Li        # crashed or disconnected in the meantime.
296*9c5db199SXin Li        allpresent = True
297*9c5db199SXin Li        abort = self._abort
298*9c5db199SXin Li        for name in self._waiting:
299*9c5db199SXin Li            (client, addr) = self._waiting[name]
300*9c5db199SXin Li
301*9c5db199SXin Li            logging.info("checking client present: %s", name)
302*9c5db199SXin Li
303*9c5db199SXin Li            client.settimeout(5)
304*9c5db199SXin Li            reply = 'none'
305*9c5db199SXin Li            try:
306*9c5db199SXin Li                client.send("ping")
307*9c5db199SXin Li                reply = client.recv(1024)
308*9c5db199SXin Li            except socket.timeout:
309*9c5db199SXin Li                logging.warning("ping/pong timeout: %s", name)
310*9c5db199SXin Li                pass
311*9c5db199SXin Li
312*9c5db199SXin Li            if reply == 'abrt':
313*9c5db199SXin Li                logging.warning("Client %s requested abort", name)
314*9c5db199SXin Li                abort = True
315*9c5db199SXin Li            elif reply != "pong":
316*9c5db199SXin Li                allpresent = False
317*9c5db199SXin Li
318*9c5db199SXin Li        if not allpresent:
319*9c5db199SXin Li            raise error.BarrierError("main lost client")
320*9c5db199SXin Li
321*9c5db199SXin Li        if abort:
322*9c5db199SXin Li            logging.info("Aborting the clients")
323*9c5db199SXin Li            msg = 'abrt'
324*9c5db199SXin Li        else:
325*9c5db199SXin Li            logging.info("Releasing clients")
326*9c5db199SXin Li            msg = 'rlse'
327*9c5db199SXin Li
328*9c5db199SXin Li        # If every ones checks in then commit the release.
329*9c5db199SXin Li        for name in self._waiting:
330*9c5db199SXin Li            (client, addr) = self._waiting[name]
331*9c5db199SXin Li
332*9c5db199SXin Li            client.settimeout(5)
333*9c5db199SXin Li            try:
334*9c5db199SXin Li                client.send(msg)
335*9c5db199SXin Li            except socket.timeout:
336*9c5db199SXin Li                logging.warning("release timeout: %s", name)
337*9c5db199SXin Li                pass
338*9c5db199SXin Li
339*9c5db199SXin Li        if abort:
340*9c5db199SXin Li            raise BarrierAbortError("Client requested abort")
341*9c5db199SXin Li
342*9c5db199SXin Li
343*9c5db199SXin Li    def _waiting_close(self):
344*9c5db199SXin Li        # Either way, close out all the clients.  If we have
345*9c5db199SXin Li        # not released them then they know to abort.
346*9c5db199SXin Li        for name in self._waiting:
347*9c5db199SXin Li            (client, addr) = self._waiting[name]
348*9c5db199SXin Li
349*9c5db199SXin Li            logging.info("closing client: %s", name)
350*9c5db199SXin Li
351*9c5db199SXin Li            try:
352*9c5db199SXin Li                client.close()
353*9c5db199SXin Li            except:
354*9c5db199SXin Li                pass
355*9c5db199SXin Li
356*9c5db199SXin Li
357*9c5db199SXin Li    def _run_server(self, is_main):
358*9c5db199SXin Li        server = self._server or listen_server(port=self._port)
359*9c5db199SXin Li        failed = 0
360*9c5db199SXin Li        try:
361*9c5db199SXin Li            while True:
362*9c5db199SXin Li                try:
363*9c5db199SXin Li                    # Wait for callers welcoming each.
364*9c5db199SXin Li                    server.socket.settimeout(self._remaining())
365*9c5db199SXin Li                    connection = server.socket.accept()
366*9c5db199SXin Li                    if is_main:
367*9c5db199SXin Li                        self._main_welcome(connection)
368*9c5db199SXin Li                    else:
369*9c5db199SXin Li                        self._node_hello(connection)
370*9c5db199SXin Li                except socket.timeout:
371*9c5db199SXin Li                    logging.warning("timeout waiting for remaining clients")
372*9c5db199SXin Li                    pass
373*9c5db199SXin Li
374*9c5db199SXin Li                if is_main:
375*9c5db199SXin Li                    # Check if everyone is here.
376*9c5db199SXin Li                    logging.info("main seen %d of %d",
377*9c5db199SXin Li                                 self._seen, len(self._members))
378*9c5db199SXin Li                    if self._seen == len(self._members):
379*9c5db199SXin Li                        self._main_release()
380*9c5db199SXin Li                        break
381*9c5db199SXin Li                else:
382*9c5db199SXin Li                    # Check if main connected.
383*9c5db199SXin Li                    if self._seen:
384*9c5db199SXin Li                        logging.info("node connected to main")
385*9c5db199SXin Li                        self._node_wait()
386*9c5db199SXin Li                        break
387*9c5db199SXin Li        finally:
388*9c5db199SXin Li            self._waiting_close()
389*9c5db199SXin Li            # if we created the listening_server in the beginning of this
390*9c5db199SXin Li            # function then close the listening socket here
391*9c5db199SXin Li            if not self._server:
392*9c5db199SXin Li                server.close()
393*9c5db199SXin Li
394*9c5db199SXin Li
395*9c5db199SXin Li    def _run_client(self, is_main):
396*9c5db199SXin Li        while self._remaining() is None or self._remaining() > 0:
397*9c5db199SXin Li            try:
398*9c5db199SXin Li                remote = socket.socket(socket.AF_INET,
399*9c5db199SXin Li                        socket.SOCK_STREAM)
400*9c5db199SXin Li                remote.settimeout(30)
401*9c5db199SXin Li                if is_main:
402*9c5db199SXin Li                    # Connect to all node.
403*9c5db199SXin Li                    host = _get_host_from_id(self._members[self._seen])
404*9c5db199SXin Li                    logging.info("calling node: %s", host)
405*9c5db199SXin Li                    connection = (remote, (host, self._port))
406*9c5db199SXin Li                    remote.connect(connection[1])
407*9c5db199SXin Li                    self._main_welcome(connection)
408*9c5db199SXin Li                else:
409*9c5db199SXin Li                    # Just connect to the main.
410*9c5db199SXin Li                    host = _get_host_from_id(self._mainid)
411*9c5db199SXin Li                    logging.info("calling main")
412*9c5db199SXin Li                    connection = (remote, (host, self._port))
413*9c5db199SXin Li                    remote.connect(connection[1])
414*9c5db199SXin Li                    self._node_hello(connection)
415*9c5db199SXin Li            except socket.timeout:
416*9c5db199SXin Li                logging.warning("timeout calling host, retry")
417*9c5db199SXin Li                sleep(10)
418*9c5db199SXin Li                pass
419*9c5db199SXin Li            except socket.error as err:
420*9c5db199SXin Li                (code, str) = err
421*9c5db199SXin Li                if (code != errno.ECONNREFUSED and
422*9c5db199SXin Li                    code != errno.ETIMEDOUT):
423*9c5db199SXin Li                    raise
424*9c5db199SXin Li                sleep(10)
425*9c5db199SXin Li
426*9c5db199SXin Li            if is_main:
427*9c5db199SXin Li                # Check if everyone is here.
428*9c5db199SXin Li                logging.info("main seen %d of %d",
429*9c5db199SXin Li                             self._seen, len(self._members))
430*9c5db199SXin Li                if self._seen == len(self._members):
431*9c5db199SXin Li                    self._main_release()
432*9c5db199SXin Li                    break
433*9c5db199SXin Li            else:
434*9c5db199SXin Li                # Check if main connected.
435*9c5db199SXin Li                if self._seen:
436*9c5db199SXin Li                    logging.info("node connected to main")
437*9c5db199SXin Li                    self._node_wait()
438*9c5db199SXin Li                    break
439*9c5db199SXin Li
440*9c5db199SXin Li        self._waiting_close()
441*9c5db199SXin Li
442*9c5db199SXin Li
443*9c5db199SXin Li    def _node_wait(self):
444*9c5db199SXin Li        remote = self._waiting[self._hostid][0]
445*9c5db199SXin Li        mode = "wait"
446*9c5db199SXin Li        while True:
447*9c5db199SXin Li            # All control messages are the same size to allow
448*9c5db199SXin Li            # us to split individual messages easily.
449*9c5db199SXin Li            remote.settimeout(self._remaining())
450*9c5db199SXin Li            reply = remote.recv(4)
451*9c5db199SXin Li            if not reply:
452*9c5db199SXin Li                break
453*9c5db199SXin Li
454*9c5db199SXin Li            reply = reply.strip("\r\n")
455*9c5db199SXin Li            logging.info("main said: %s", reply)
456*9c5db199SXin Li
457*9c5db199SXin Li            mode = reply
458*9c5db199SXin Li            if reply == "ping":
459*9c5db199SXin Li                # Ensure we have sufficient time for the
460*9c5db199SXin Li                # ping/pong/rlse cyle to complete normally.
461*9c5db199SXin Li                self._update_timeout(10 + 10 * len(self._members))
462*9c5db199SXin Li
463*9c5db199SXin Li                if self._abort:
464*9c5db199SXin Li                    msg = "abrt"
465*9c5db199SXin Li                else:
466*9c5db199SXin Li                    msg = "pong"
467*9c5db199SXin Li                logging.info(msg)
468*9c5db199SXin Li                remote.settimeout(self._remaining())
469*9c5db199SXin Li                remote.send(msg)
470*9c5db199SXin Li
471*9c5db199SXin Li            elif reply == "rlse" or reply == "abrt":
472*9c5db199SXin Li                # Ensure we have sufficient time for the
473*9c5db199SXin Li                # ping/pong/rlse cyle to complete normally.
474*9c5db199SXin Li                self._update_timeout(10 + 10 * len(self._members))
475*9c5db199SXin Li
476*9c5db199SXin Li                logging.info("was released, waiting for close")
477*9c5db199SXin Li
478*9c5db199SXin Li        if mode == "rlse":
479*9c5db199SXin Li            pass
480*9c5db199SXin Li        elif mode == "wait":
481*9c5db199SXin Li            raise error.BarrierError("main abort -- barrier timeout")
482*9c5db199SXin Li        elif mode == "ping":
483*9c5db199SXin Li            raise error.BarrierError("main abort -- client lost")
484*9c5db199SXin Li        elif mode == "!tag":
485*9c5db199SXin Li            raise error.BarrierError("main abort -- incorrect tag")
486*9c5db199SXin Li        elif mode == "!dup":
487*9c5db199SXin Li            raise error.BarrierError("main abort -- duplicate client")
488*9c5db199SXin Li        elif mode == "abrt":
489*9c5db199SXin Li            raise BarrierAbortError("Client requested abort")
490*9c5db199SXin Li        else:
491*9c5db199SXin Li            raise error.BarrierError("main handshake failure: " + mode)
492*9c5db199SXin Li
493*9c5db199SXin Li
494*9c5db199SXin Li    def rendezvous(self, *hosts, **dargs):
495*9c5db199SXin Li        # if called with abort=True, this will raise an exception
496*9c5db199SXin Li        # on all the clients.
497*9c5db199SXin Li        self._start_time = time()
498*9c5db199SXin Li        self._members = list(hosts)
499*9c5db199SXin Li        self._members.sort()
500*9c5db199SXin Li        self._mainid = self._members.pop(0)
501*9c5db199SXin Li        self._abort = dargs.get('abort', False)
502*9c5db199SXin Li
503*9c5db199SXin Li        logging.info("mainid: %s", self._mainid)
504*9c5db199SXin Li        if self._abort:
505*9c5db199SXin Li            logging.debug("%s is aborting", self._hostid)
506*9c5db199SXin Li        if not len(self._members):
507*9c5db199SXin Li            logging.info("No other members listed.")
508*9c5db199SXin Li            return
509*9c5db199SXin Li        logging.info("members: %s", ",".join(self._members))
510*9c5db199SXin Li
511*9c5db199SXin Li        self._seen = 0
512*9c5db199SXin Li        self._waiting = {}
513*9c5db199SXin Li
514*9c5db199SXin Li        # Figure out who is the main in this barrier.
515*9c5db199SXin Li        if self._hostid == self._mainid:
516*9c5db199SXin Li            logging.info("selected as main")
517*9c5db199SXin Li            self._run_server(is_main=True)
518*9c5db199SXin Li        else:
519*9c5db199SXin Li            logging.info("selected as node")
520*9c5db199SXin Li            self._run_client(is_main=False)
521*9c5db199SXin Li
522*9c5db199SXin Li
523*9c5db199SXin Li    def rendezvous_servers(self, mainid, *hosts, **dargs):
524*9c5db199SXin Li        # if called with abort=True, this will raise an exception
525*9c5db199SXin Li        # on all the clients.
526*9c5db199SXin Li        self._start_time = time()
527*9c5db199SXin Li        self._members = list(hosts)
528*9c5db199SXin Li        self._members.sort()
529*9c5db199SXin Li        self._mainid = mainid
530*9c5db199SXin Li        self._abort = dargs.get('abort', False)
531*9c5db199SXin Li
532*9c5db199SXin Li        logging.info("mainid: %s", self._mainid)
533*9c5db199SXin Li        if not len(self._members):
534*9c5db199SXin Li            logging.info("No other members listed.")
535*9c5db199SXin Li            return
536*9c5db199SXin Li        logging.info("members: %s", ",".join(self._members))
537*9c5db199SXin Li
538*9c5db199SXin Li        self._seen = 0
539*9c5db199SXin Li        self._waiting = {}
540*9c5db199SXin Li
541*9c5db199SXin Li        # Figure out who is the main in this barrier.
542*9c5db199SXin Li        if self._hostid == self._mainid:
543*9c5db199SXin Li            logging.info("selected as main")
544*9c5db199SXin Li            self._run_client(is_main=True)
545*9c5db199SXin Li        else:
546*9c5db199SXin Li            logging.info("selected as node")
547*9c5db199SXin Li            self._run_server(is_main=False)
548