1# Lint as: python2, python3
2# Copyright 2019 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Wrapper test to run verification on a labstation."""
7
8import json
9import logging
10import os
11import re
12import time
13
14from autotest_lib.client.common_lib import error
15from autotest_lib.server import test
16from autotest_lib.server import utils as server_utils
17from autotest_lib.server import site_utils
18from autotest_lib.server.hosts import servo_host as _servo_host
19from autotest_lib.server.hosts import servo_constants
20from autotest_lib.server.hosts import factory
21from autotest_lib.server.hosts import host_info
22
23
24class servo_LabstationVerification(test.test):
25    """Wrapper test to run verifications on a labstation image.
26
27    This test verifies basic servod behavior on the host supplied to it e.g.
28    that servod can start etc, before inferring the DUT attached to the servo
29    device, and running more comprehensive servod tests by using a full
30    cros_host and servo_host setup.
31    """
32    version = 1
33
34    UL_BIT_MASK = 0x2
35
36    # Regex to match ipv4 byte.
37    IPV4_RE_BLOCK = r'(25[0-5]|2[0-4][0-9]|[01]?[0-9]?[0-9])'
38
39    # Full regex to match an ipv4 with optional subnet mask.
40    RE_IPV4 = re.compile(r'^(%(block)s\.){3}(%(block)s)(/\d+)?$' %
41                         {'block':IPV4_RE_BLOCK})
42
43    # Timeout in seconds to wait after cold_reset before attempting to ping
44    # again. This includes a potential fw screen (30s), and some buffer
45    # for the network.
46    RESET_TIMEOUT_S = 60
47
48    def get_servo_mac(self, servo_proxy):
49        """Given a servo's serial retrieve ethernet port mac address.
50
51        @param servo_proxy: proxy to talk to servod
52
53        @returns: mac address of the ethernet port as a string
54        @raises: error.TestError: if mac address cannot be inferred
55        """
56        # TODO(coconutruben): once mac address retrieval through v4 is
57        # implemented remove these lines of code, and replace with
58        # servo_v4_eth_mac.
59        try:
60            serial = servo_proxy.get('support.serialname')
61            if serial == 'unknown':
62                serial = servo_proxy.get('serialname')
63        except error.TestFail as e:
64            if 'No control named' in e:
65                serial = servo_proxy.get('serialname')
66            else:
67                raise e
68        ctrl_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
69                                 'serial_to_mac_map.json')
70        with open(ctrl_path, 'r') as f:
71            serial_mac_map = json.load(f)
72        if not serial in serial_mac_map:
73            raise error.TestError('Unable to retrieve mac address for '
74                                  'serial %s' % serial)
75        return str(serial_mac_map[serial])
76
77    def _flip_UL_bit(self, byte):
78        """Helper to flip the Universal/Local bit in a given byte.
79
80        For some IPv6's extended unique identifier (EUI) 64 calculation
81        part of the logic is to flip the U/L bit on the first byte.
82
83        Note: it is the callers responsibility to ensure that |byte| is
84        only one byte. This function will just flip the 7th bit of whatever
85        is supplied and return that.
86
87        @param byte: the byte to flip
88
89        @returns: |byte| with it's U/L bit flipped.
90        """
91        return byte ^ self.UL_BIT_MASK
92
93    def _from_mac_to_ipv6_eui_64(self, mac):
94        """Convert a MAC address (IEEE EUI48) to a IEEE EUI64 node component.
95
96        This follows guidelines to convert a mac address to an IPv6 node
97        component by
98        - splitting the mac into two parts
99        - inserting 0xfffe in between the two parts
100        - flipping the U/L bit on the first byte
101
102        @param mac: string containing the mac address
103
104        @returns: string containing the IEEE EUI64 node component to |mac|
105        """
106        mac_bytes = [b.lower() for b in mac.split(':')]
107        # First, flip the 7th bit again. This converts the string coming from
108        # the mac (as it's a hex) into an int, flips it, before casting it back
109        # to a hex as is expected for the mac address.
110        mac_bytes[0] = hex(self._flip_UL_bit(int(mac_bytes[0],16)))[2:]
111        mac_bytes = (mac_bytes[:3] + ['ff', 'fe'] + mac_bytes[-3:])
112        ipv6_components = []
113        while mac_bytes:
114            # IPv6 has two bytes between :
115            ipv6_components.append('%s%s' % (mac_bytes.pop(0),
116                                             mac_bytes.pop(0)))
117        # Lastly, remove the leading 0s to have a well formatted concise IPv6.
118        return ':'.join([c.lstrip('0') for c in ipv6_components])
119
120    def _mac_to_ipv6_addr(self, mac, ipv6_network_component):
121        """Helper to generate an IPv6 address given network component and mac.
122
123        @param mac: the mac address of the target network interface
124        @param ipv6_network_component: prefix + subnet id portion of IPv6 [:64]
125
126        @returns: an IPv6 address that could be used to target the network
127                  interface at |mac| if it's on the same network as the network
128                  component indicates
129        """
130        # Do not add an extra/miss a ':' when glueing both parts together.
131        glue = '' if ipv6_network_component[-1] == ':' else ':'
132        return '%s%s%s' % (ipv6_network_component, glue,
133                           self._from_mac_to_ipv6_eui_64(mac))
134
135    def _from_ipv6_to_mac_address(self, ipv6):
136        """Given an IPv6 address retrieve the mac address.
137
138        Assuming the address at |ipv6| followed the conversion standard layed
139        out at _from_mac_to_ipv6_eui_64() above, this helper does the inverse.
140
141        @param ipv6: full IPv6 address to extract the mac address from
142
143        @returns: mac address extracted from node component as a string
144        """
145        # The node component i.e. the one holding the mac info is the 64 bits.
146        components = ipv6.split(':')[-4:]
147        # This is reversing the EUI 64 logic.
148        mac_bytes = []
149        for component in components:
150            # Expand the components fully again.
151            full_component = component.rjust(4,'0')
152            # Mac addresses use one byte components as opposed to the two byte
153            # ones for IPv6 - split them up.
154            mac_bytes.extend([full_component[:2], full_component[2:]])
155        # First, flip the 7th bit again.
156        mac_bytes[0] = self._flip_UL_bit(mac_bytes[0])
157        # Second, remove the 0xFFFE bytes inserted in the middle again.
158        mac_bytes = mac_bytes[:3] + mac_bytes[-3:]
159        return ':'.join([c.lower() for c in mac_bytes])
160
161    def _build_ssh_cmd(self, hostname, cmd):
162        """Build the ssh command to run |cmd| via bash on |hostname|.
163
164        @param hostname: hostname/ip where to run the cmd on
165        @param cmd: cmd on hostname to run
166
167        @returns: ssh command to run
168        """
169        ssh_cmd = [r'ssh', '-q', '-o', 'StrictHostKeyChecking=no',
170                   r'-o', 'UserKnownHostsFile=/dev/null',
171                   r'root@%s' % hostname,
172                   r'"%s"' % cmd]
173        return ' '.join(ssh_cmd)
174
175    def _ip_info_from_host(self, host, ip, info, host_name):
176        """Retrieve some |info| related to |ip| from host on |ip|.
177
178        @param host: object that implements 'run', where the command
179                     will be executed form
180        @param ip: ip address to run on and to filter for
181        @param info: one of 'ipv4' or 'dev'
182        @param host_name: executing host's name, for error message
183
184        @returns: ipv4 associated on the same nic as |ip| if |info|== 'ipv4'
185                  nic dev name associated with |ip| if |info|== 'dev'
186
187        @raises error.TestError: if output of 'ip --brief addr' is unexpected
188        @raises error.TestError: info not in ['ipv4', 'dev']
189        """
190        if info not in ['ipv4', 'dev']:
191            raise error.TestFail('Cannot retrieve info %r', info)
192        ip_stub = r"ip --brief addr | grep %s" % ip
193        cmd = self._build_ssh_cmd(ip, ip_stub)
194        logging.info('command to find %s on %s: %s', info, host_name, cmd)
195        # The expected output here is of the form:
196        # [net device] [UP/DOWN] [ipv4]/[subnet mask] [ipv6]/[subnet mask]+
197        try:
198            output = host.run(cmd).stdout.strip()
199        except (error.AutoservRunError, error.CmdError) as e:
200            logging.error(str(e))
201            raise error.TestFail('Failed to retrieve %s on %s' % (info, ip))
202        logging.debug('ip raw output: %s', output)
203        components = output.split()
204        if info == 'ipv4':
205            # To be safe, get all IPs, and subsequently report the first ipv4
206            # found.
207            raw_ips = components[2:]
208            for raw_ip in raw_ips:
209                if re.match(self.RE_IPV4, raw_ip):
210                    ret = raw_ip.split('/')[0]
211                    logging.info('ipv4 found: %s', ret)
212                    break
213            else:
214                raise error.TestFail('No ipv4 address found in ip command: %s' %
215                                     ', '.join(raw_ips))
216        if info == 'dev':
217            ret = components[0]
218            logging.info('dev found: %s', ret)
219        return ret
220
221    def get_dut_on_servo_ip(self, servo_host_proxy):
222        """Retrieve the IPv4 IP of the DUT attached to a servo.
223
224        Note: this will reboot the DUT if it fails initially to get the IP
225        Note: for this to work, servo host and dut have to be on the same subnet
226
227        @param servo_host_proxy: proxy to talk to the servo host
228
229        @returns: IPv4 address of DUT attached to servo on |servo_host_proxy|
230
231        @raises error.TestError: if the ip cannot be inferred
232        """
233        # Note: throughout this method, sh refers to servo host, dh to DUT host.
234        # Figure out servo hosts IPv6 address that's based on its mac address.
235        servo_proxy = servo_host_proxy._servo
236        sh_ip = server_utils.get_ip_address(servo_host_proxy.hostname)
237        sh_nic_dev = self._ip_info_from_host(servo_host_proxy, sh_ip, 'dev',
238                                             'servo host')
239        addr_cmd ='cat /sys/class/net/%s/address' % sh_nic_dev
240        sh_dev_addr = servo_host_proxy.run(addr_cmd).stdout.strip()
241        logging.debug('Inferred Labstation MAC to be: %s', sh_dev_addr)
242        sh_dev_ipv6_stub = self._from_mac_to_ipv6_eui_64(sh_dev_addr)
243        # This will get us the IPv6 address that uses the mac address as node id
244        cmd = (r'ifconfig %s | grep -oE "([0-9a-f]{0,4}:){4}%s"' %
245               (sh_nic_dev, sh_dev_ipv6_stub))
246        servo_host_ipv6 = servo_host_proxy.run(cmd).stdout.strip()
247        logging.debug('Inferred Labstation IPv6 to be: %s', servo_host_ipv6)
248        # Figure out DUTs expected IPv6 address
249        # The network component should be shared between the DUT and the servo
250        # host as long as they're on the same subnet.
251        network_component = ':'.join(servo_host_ipv6.split(':')[:4])
252        dut_ipv6 = self._mac_to_ipv6_addr(self.get_servo_mac(servo_proxy),
253                                          network_component)
254        logging.info('Inferred DUT IPv6 to be: %s', dut_ipv6)
255        # Dynamically generate the correct shell-script to retrieve the ipv4.
256        try:
257            server_utils.run('ping -6 -c 1 -w 35 %s' % dut_ipv6)
258        except error.CmdError:
259            # If the DUT cannot be pinged, then try to reset it and try to
260            # ping again.
261            logging.info('Failed to ping DUT on ipv6: %s. Cold resetting',
262                         dut_ipv6)
263            servo_proxy.get_power_state_controller().reset()
264            time.sleep(self.RESET_TIMEOUT_S)
265        dut_ipv4 = None
266        try:
267            # Pass |server_utils| here as it implements the same interface
268            # as a host to run things locally i.e. on the autoserv runner.
269            dut_ipv4 = self._ip_info_from_host(server_utils, dut_ipv6, 'ipv4',
270                                               'autoserv')
271            return dut_ipv4
272        except error.TestFail:
273            logging.info('Failed to retrieve the DUT ipv4 directly. '
274                         'Going to attempt to tunnel request through '
275                         'labstation and forgive the error for now.')
276        # Lastly, attempt to run the command from the labstation instead
277        # to guard against networking issues.
278        dut_ipv4 = self._ip_info_from_host(servo_host_proxy, dut_ipv6, 'ipv4',
279                                           'autoserv')
280        return dut_ipv4
281
282    def _set_dut_stable_version(self, dut_host, stable_version=None):
283        """Helper method to set stable_version in DUT host.
284
285        @param dut_host: CrosHost object representing the DUT.
286        """
287        if not stable_version:
288            stable_version = self.cros_version
289        logging.info('Setting stable_version to %s for DUT %s.',
290                     stable_version, dut_host.hostname)
291        info = dut_host.host_info_store.get()
292        info.stable_versions['cros'] = stable_version
293        dut_host.host_info_store.commit(info)
294
295    def _get_dut_info_from_config(self):
296        """Get DUT info from json config file.
297
298        @returns a list of dicts that each dict represents a dut.
299        """
300        ctrl_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
301                                 'labstation_to_dut_map.json')
302        with open(ctrl_path, 'r') as f:
303            data = json.load(f, object_hook=self._byteify)
304            # create a default dut dict in case the servohost is not in config
305            # map, this is normally happened in local testing.
306            default_dut = {
307                'hostname': None,
308                'servo_port': '9999',
309                'servo_serial': None
310            }
311            return data.get(self.labstation_host.hostname, [default_dut])
312
313    def _byteify(self, data, ignore_dicts=False):
314        """Helper method to convert unicode to string.
315        """
316        # In python2 we need to convert unicode into str, however in python3
317        # unicode renamed to str so json.load will load data into str by
318        # default so run encode with a str in python3 will actually convert
319        # the data into bytes which we don't want. To make the test compatible
320        # to both python2 and python3, we check data type != dict, list, str
321        # for now. # TODO(xianuowang@): remove this method once we fully
322        # migrated to python3.
323        if type(data) not in {dict, list, str}:
324            return data.encode('utf-8')
325        if isinstance(data, list):
326            return [self._byteify(item, ignore_dicts=True) for item in data]
327        if isinstance(data, dict) and not ignore_dicts:
328            return {
329                self._byteify(key, ignore_dicts=True):
330                    self._byteify(value, ignore_dicts=True)
331                for key, value in list(data.items())
332            }
333        return data
334
335    def _setup_servod(self):
336        """Setup all servod instances under servohost for later testing.
337        """
338        for dut in self.dut_list:
339            # Use board: nami as default for local testing.
340            board = dut.get('board', 'nami')
341            port = dut.get('servo_port')
342            serial = dut.get('servo_serial')
343            servo_args = {
344                    servo_constants.SERVO_HOST_ATTR:
345                    self.labstation_host.hostname,
346                    servo_constants.SERVO_PORT_ATTR: port,
347                    servo_constants.SERVO_SERIAL_ATTR: serial,
348                    servo_constants.SERVO_BOARD_ATTR: board,
349                    servo_constants.ADDITIONAL_SERVOD_ARGS: 'DUAL_V4=1',
350                    'is_in_lab': False,
351            }
352
353            logging.info('Setting up servod for port %s', port)
354            # We need try_lab_servo option here, so servo firmware will get
355            # updated before run tests.
356            servo_host, _ = _servo_host.create_servo_host(None,
357                                                          servo_args,
358                                                          try_lab_servo=True)
359            try:
360                validate_cmd = 'servodutil show -p %s' % port
361                servo_host.run_grep(validate_cmd,
362                    stdout_err_regexp='No servod scratch entry found.')
363            except error.AutoservRunError:
364                raise error.TestFail('Servod of port %s did not come up on'
365                                     ' labstation.' % port)
366
367            self.servo_hosts.append(servo_host)
368
369    def setup_hosts(self):
370        """Prepare all cros and servo hosts that need to run."""
371        # Servod came up successfully at this point - build a ServoHost and
372        # CrosHost for later testing to verfiy servo functionality.
373
374        for dut_info, servo_host in zip(self.dut_list, self.servo_hosts):
375            dut_hostname = dut_info.get('hostname')
376            if not dut_hostname:
377                # TODO(coconutruben@): remove this statement once the inferring
378                # is the default.
379                logging.info('hostname not specified for DUT, through '
380                             'static config or command-line. Will attempt '
381                             'to infer through hardware address.')
382                dut_hostname = self.get_dut_on_servo_ip(servo_host)
383            labels = []
384            if dut_info.get('board'):
385                labels.append('board:%s' % dut_info.get('board'))
386            if dut_info.get('model'):
387                labels.append('model:%s' % dut_info.get('model'))
388            info = host_info.HostInfo(labels=labels)
389            host_info_store = host_info.InMemoryHostInfoStore(info=info)
390            machine = {
391                    'hostname': dut_hostname,
392                    'host_info_store': host_info_store,
393                    'afe_host': site_utils.EmptyAFEHost()
394            }
395            dut_host = factory.create_host(machine)
396            dut_host.set_servo_host(servo_host)
397
398            # Copy labstation's stable_version to dut_host for later test
399            # consume.
400            # TODO(xianuowang@): remove this logic once we figured out how to
401            # propagate DUT's stable_version to the test.
402            stable_version_from_config = dut_info.get('stable_version')
403            self._set_dut_stable_version(dut_host, stable_version_from_config)
404            # Store |dut_host| in |machine_dict| so that parallel running can
405            # find the host.
406            self.machine_dict[dut_host.hostname] = dut_host
407
408    def initialize(self, host, config=None, local=False):
409        """Setup servod on |host| to run subsequent tests.
410
411        @param host: LabstationHost object representing the servohost.
412        @param config: the args argument from test_that in a dict.
413        @param local: whether a test image is already on the usb stick.
414        """
415        # Cache whether this is a local run or not.
416        self.local = local
417        # This list hosts the servo_hosts, in the same order as the |dut_list|
418        # below.
419        self.servo_hosts = []
420        # This dict houses a mapping of |dut| hostnames to initialized cros_host
421        # objects for the tests to run.
422        self.machine_dict = {}
423        # Save the host.
424        self.labstation_host = host
425        # Make sure recovery is quick in case of failure.
426        self.job.fast = True
427        # Get list of duts under the servohost.
428        self.dut_list = self._get_dut_info_from_config()
429        # Setup servod for all duts.
430        self._setup_servod()
431        # We need a cros build number for testing download image to usb and
432        # use servo to reimage DUT purpose. So copying labstation's
433        # stable_version here since we don't really care about which build
434        # to install on the DUT.
435        self.cros_version = (
436            self.labstation_host.host_info_store.get().cros_stable_version)
437
438        if config:
439            if 'dut_ip' in config:
440                # Retrieve DUT ip from args if caller specified it.
441                # |dut_ip| is special in that it can be used for (quick) setup
442                # testing if the setup is not in the configuration file.
443                # This has two implications:
444                # - the user can only test one dut/servo pair
445                # - the config has to be empty.
446                # TODO(coconutruben): remove this logic for a more holistic
447                # command-line overwrite solution.
448                if len(self.dut_list) == 1 and not self.dut_list[0]['hostname']:
449                    self.dut_list[0]['hostname'] = config['dut_ip']
450                    logging.info('Setting the hostname of the only dut to %s.',
451                                 self.dut_list[0]['hostname'])
452                else:
453                    logging.info('dut_ip %s will be ignored. The target '
454                                 'labstation is to be part of static config.')
455            if 'cros_version' in config:
456                # We allow user to override a cros image build.
457                self.cros_version = config['cros_version']
458        # Lastly, setup the hosts so that testing can occur in parallel.
459        self.setup_hosts()
460
461    def _run_on_machine(self, machine):
462        """Thin wrapper to run 'servo_Verification' on all machines.
463
464        @param machine: hostname of the dut to run 'servo_Verification' against.
465
466        @raises error.TestFail: 'servo_Verification' fails
467        @raises error.TestFail: |machine| unknown (not in |self.machine_dict|)
468        """
469        dut_host = self.machine_dict.get(machine, None)
470        if dut_host is None:
471            raise error.TestFail('dut machine %r not known to suite. Known '
472                                 'machines: %r', machine,
473                                 ', '.join(list(self.machine_dict.keys())))
474        logging.info('About to run on machine %s', machine)
475        if not self.job.run_test('servo_Verification', host=dut_host,
476                                 local=self.local):
477            raise error.TestFail('At least one test failed.')
478
479    def run_once(self):
480        """Run through all hosts in |self.machine_dict|."""
481        self.job.parallel_simple(self._run_on_machine,
482                                 list(self.machine_dict.keys()))
483        # TODO(coconutruben): at this point, you can print a report what kind of
484        # servod setups failed and which succeeded. Build that out so that
485        # debugging failures is cleaner given multiple setups.
486
487    def cleanup(self):
488        """Clean up by calling close for dut host, which will also take care
489        of servo cleanup.
490        """
491        for _, dut in list(self.machine_dict.items()):
492            dut.close()
493