1# Lint as: python2, python3 2# Copyright 2019 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Wrapper test to run verification on a labstation.""" 7 8import json 9import logging 10import os 11import re 12import time 13 14from autotest_lib.client.common_lib import error 15from autotest_lib.server import test 16from autotest_lib.server import utils as server_utils 17from autotest_lib.server import site_utils 18from autotest_lib.server.hosts import servo_host as _servo_host 19from autotest_lib.server.hosts import servo_constants 20from autotest_lib.server.hosts import factory 21from autotest_lib.server.hosts import host_info 22 23 24class servo_LabstationVerification(test.test): 25 """Wrapper test to run verifications on a labstation image. 26 27 This test verifies basic servod behavior on the host supplied to it e.g. 28 that servod can start etc, before inferring the DUT attached to the servo 29 device, and running more comprehensive servod tests by using a full 30 cros_host and servo_host setup. 31 """ 32 version = 1 33 34 UL_BIT_MASK = 0x2 35 36 # Regex to match ipv4 byte. 37 IPV4_RE_BLOCK = r'(25[0-5]|2[0-4][0-9]|[01]?[0-9]?[0-9])' 38 39 # Full regex to match an ipv4 with optional subnet mask. 40 RE_IPV4 = re.compile(r'^(%(block)s\.){3}(%(block)s)(/\d+)?$' % 41 {'block':IPV4_RE_BLOCK}) 42 43 # Timeout in seconds to wait after cold_reset before attempting to ping 44 # again. This includes a potential fw screen (30s), and some buffer 45 # for the network. 46 RESET_TIMEOUT_S = 60 47 48 def get_servo_mac(self, servo_proxy): 49 """Given a servo's serial retrieve ethernet port mac address. 50 51 @param servo_proxy: proxy to talk to servod 52 53 @returns: mac address of the ethernet port as a string 54 @raises: error.TestError: if mac address cannot be inferred 55 """ 56 # TODO(coconutruben): once mac address retrieval through v4 is 57 # implemented remove these lines of code, and replace with 58 # servo_v4_eth_mac. 59 try: 60 serial = servo_proxy.get('support.serialname') 61 if serial == 'unknown': 62 serial = servo_proxy.get('serialname') 63 except error.TestFail as e: 64 if 'No control named' in e: 65 serial = servo_proxy.get('serialname') 66 else: 67 raise e 68 ctrl_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 69 'serial_to_mac_map.json') 70 with open(ctrl_path, 'r') as f: 71 serial_mac_map = json.load(f) 72 if not serial in serial_mac_map: 73 raise error.TestError('Unable to retrieve mac address for ' 74 'serial %s' % serial) 75 return str(serial_mac_map[serial]) 76 77 def _flip_UL_bit(self, byte): 78 """Helper to flip the Universal/Local bit in a given byte. 79 80 For some IPv6's extended unique identifier (EUI) 64 calculation 81 part of the logic is to flip the U/L bit on the first byte. 82 83 Note: it is the callers responsibility to ensure that |byte| is 84 only one byte. This function will just flip the 7th bit of whatever 85 is supplied and return that. 86 87 @param byte: the byte to flip 88 89 @returns: |byte| with it's U/L bit flipped. 90 """ 91 return byte ^ self.UL_BIT_MASK 92 93 def _from_mac_to_ipv6_eui_64(self, mac): 94 """Convert a MAC address (IEEE EUI48) to a IEEE EUI64 node component. 95 96 This follows guidelines to convert a mac address to an IPv6 node 97 component by 98 - splitting the mac into two parts 99 - inserting 0xfffe in between the two parts 100 - flipping the U/L bit on the first byte 101 102 @param mac: string containing the mac address 103 104 @returns: string containing the IEEE EUI64 node component to |mac| 105 """ 106 mac_bytes = [b.lower() for b in mac.split(':')] 107 # First, flip the 7th bit again. This converts the string coming from 108 # the mac (as it's a hex) into an int, flips it, before casting it back 109 # to a hex as is expected for the mac address. 110 mac_bytes[0] = hex(self._flip_UL_bit(int(mac_bytes[0],16)))[2:] 111 mac_bytes = (mac_bytes[:3] + ['ff', 'fe'] + mac_bytes[-3:]) 112 ipv6_components = [] 113 while mac_bytes: 114 # IPv6 has two bytes between : 115 ipv6_components.append('%s%s' % (mac_bytes.pop(0), 116 mac_bytes.pop(0))) 117 # Lastly, remove the leading 0s to have a well formatted concise IPv6. 118 return ':'.join([c.lstrip('0') for c in ipv6_components]) 119 120 def _mac_to_ipv6_addr(self, mac, ipv6_network_component): 121 """Helper to generate an IPv6 address given network component and mac. 122 123 @param mac: the mac address of the target network interface 124 @param ipv6_network_component: prefix + subnet id portion of IPv6 [:64] 125 126 @returns: an IPv6 address that could be used to target the network 127 interface at |mac| if it's on the same network as the network 128 component indicates 129 """ 130 # Do not add an extra/miss a ':' when glueing both parts together. 131 glue = '' if ipv6_network_component[-1] == ':' else ':' 132 return '%s%s%s' % (ipv6_network_component, glue, 133 self._from_mac_to_ipv6_eui_64(mac)) 134 135 def _from_ipv6_to_mac_address(self, ipv6): 136 """Given an IPv6 address retrieve the mac address. 137 138 Assuming the address at |ipv6| followed the conversion standard layed 139 out at _from_mac_to_ipv6_eui_64() above, this helper does the inverse. 140 141 @param ipv6: full IPv6 address to extract the mac address from 142 143 @returns: mac address extracted from node component as a string 144 """ 145 # The node component i.e. the one holding the mac info is the 64 bits. 146 components = ipv6.split(':')[-4:] 147 # This is reversing the EUI 64 logic. 148 mac_bytes = [] 149 for component in components: 150 # Expand the components fully again. 151 full_component = component.rjust(4,'0') 152 # Mac addresses use one byte components as opposed to the two byte 153 # ones for IPv6 - split them up. 154 mac_bytes.extend([full_component[:2], full_component[2:]]) 155 # First, flip the 7th bit again. 156 mac_bytes[0] = self._flip_UL_bit(mac_bytes[0]) 157 # Second, remove the 0xFFFE bytes inserted in the middle again. 158 mac_bytes = mac_bytes[:3] + mac_bytes[-3:] 159 return ':'.join([c.lower() for c in mac_bytes]) 160 161 def _build_ssh_cmd(self, hostname, cmd): 162 """Build the ssh command to run |cmd| via bash on |hostname|. 163 164 @param hostname: hostname/ip where to run the cmd on 165 @param cmd: cmd on hostname to run 166 167 @returns: ssh command to run 168 """ 169 ssh_cmd = [r'ssh', '-q', '-o', 'StrictHostKeyChecking=no', 170 r'-o', 'UserKnownHostsFile=/dev/null', 171 r'root@%s' % hostname, 172 r'"%s"' % cmd] 173 return ' '.join(ssh_cmd) 174 175 def _ip_info_from_host(self, host, ip, info, host_name): 176 """Retrieve some |info| related to |ip| from host on |ip|. 177 178 @param host: object that implements 'run', where the command 179 will be executed form 180 @param ip: ip address to run on and to filter for 181 @param info: one of 'ipv4' or 'dev' 182 @param host_name: executing host's name, for error message 183 184 @returns: ipv4 associated on the same nic as |ip| if |info|== 'ipv4' 185 nic dev name associated with |ip| if |info|== 'dev' 186 187 @raises error.TestError: if output of 'ip --brief addr' is unexpected 188 @raises error.TestError: info not in ['ipv4', 'dev'] 189 """ 190 if info not in ['ipv4', 'dev']: 191 raise error.TestFail('Cannot retrieve info %r', info) 192 ip_stub = r"ip --brief addr | grep %s" % ip 193 cmd = self._build_ssh_cmd(ip, ip_stub) 194 logging.info('command to find %s on %s: %s', info, host_name, cmd) 195 # The expected output here is of the form: 196 # [net device] [UP/DOWN] [ipv4]/[subnet mask] [ipv6]/[subnet mask]+ 197 try: 198 output = host.run(cmd).stdout.strip() 199 except (error.AutoservRunError, error.CmdError) as e: 200 logging.error(str(e)) 201 raise error.TestFail('Failed to retrieve %s on %s' % (info, ip)) 202 logging.debug('ip raw output: %s', output) 203 components = output.split() 204 if info == 'ipv4': 205 # To be safe, get all IPs, and subsequently report the first ipv4 206 # found. 207 raw_ips = components[2:] 208 for raw_ip in raw_ips: 209 if re.match(self.RE_IPV4, raw_ip): 210 ret = raw_ip.split('/')[0] 211 logging.info('ipv4 found: %s', ret) 212 break 213 else: 214 raise error.TestFail('No ipv4 address found in ip command: %s' % 215 ', '.join(raw_ips)) 216 if info == 'dev': 217 ret = components[0] 218 logging.info('dev found: %s', ret) 219 return ret 220 221 def get_dut_on_servo_ip(self, servo_host_proxy): 222 """Retrieve the IPv4 IP of the DUT attached to a servo. 223 224 Note: this will reboot the DUT if it fails initially to get the IP 225 Note: for this to work, servo host and dut have to be on the same subnet 226 227 @param servo_host_proxy: proxy to talk to the servo host 228 229 @returns: IPv4 address of DUT attached to servo on |servo_host_proxy| 230 231 @raises error.TestError: if the ip cannot be inferred 232 """ 233 # Note: throughout this method, sh refers to servo host, dh to DUT host. 234 # Figure out servo hosts IPv6 address that's based on its mac address. 235 servo_proxy = servo_host_proxy._servo 236 sh_ip = server_utils.get_ip_address(servo_host_proxy.hostname) 237 sh_nic_dev = self._ip_info_from_host(servo_host_proxy, sh_ip, 'dev', 238 'servo host') 239 addr_cmd ='cat /sys/class/net/%s/address' % sh_nic_dev 240 sh_dev_addr = servo_host_proxy.run(addr_cmd).stdout.strip() 241 logging.debug('Inferred Labstation MAC to be: %s', sh_dev_addr) 242 sh_dev_ipv6_stub = self._from_mac_to_ipv6_eui_64(sh_dev_addr) 243 # This will get us the IPv6 address that uses the mac address as node id 244 cmd = (r'ifconfig %s | grep -oE "([0-9a-f]{0,4}:){4}%s"' % 245 (sh_nic_dev, sh_dev_ipv6_stub)) 246 servo_host_ipv6 = servo_host_proxy.run(cmd).stdout.strip() 247 logging.debug('Inferred Labstation IPv6 to be: %s', servo_host_ipv6) 248 # Figure out DUTs expected IPv6 address 249 # The network component should be shared between the DUT and the servo 250 # host as long as they're on the same subnet. 251 network_component = ':'.join(servo_host_ipv6.split(':')[:4]) 252 dut_ipv6 = self._mac_to_ipv6_addr(self.get_servo_mac(servo_proxy), 253 network_component) 254 logging.info('Inferred DUT IPv6 to be: %s', dut_ipv6) 255 # Dynamically generate the correct shell-script to retrieve the ipv4. 256 try: 257 server_utils.run('ping -6 -c 1 -w 35 %s' % dut_ipv6) 258 except error.CmdError: 259 # If the DUT cannot be pinged, then try to reset it and try to 260 # ping again. 261 logging.info('Failed to ping DUT on ipv6: %s. Cold resetting', 262 dut_ipv6) 263 servo_proxy.get_power_state_controller().reset() 264 time.sleep(self.RESET_TIMEOUT_S) 265 dut_ipv4 = None 266 try: 267 # Pass |server_utils| here as it implements the same interface 268 # as a host to run things locally i.e. on the autoserv runner. 269 dut_ipv4 = self._ip_info_from_host(server_utils, dut_ipv6, 'ipv4', 270 'autoserv') 271 return dut_ipv4 272 except error.TestFail: 273 logging.info('Failed to retrieve the DUT ipv4 directly. ' 274 'Going to attempt to tunnel request through ' 275 'labstation and forgive the error for now.') 276 # Lastly, attempt to run the command from the labstation instead 277 # to guard against networking issues. 278 dut_ipv4 = self._ip_info_from_host(servo_host_proxy, dut_ipv6, 'ipv4', 279 'autoserv') 280 return dut_ipv4 281 282 def _set_dut_stable_version(self, dut_host, stable_version=None): 283 """Helper method to set stable_version in DUT host. 284 285 @param dut_host: CrosHost object representing the DUT. 286 """ 287 if not stable_version: 288 stable_version = self.cros_version 289 logging.info('Setting stable_version to %s for DUT %s.', 290 stable_version, dut_host.hostname) 291 info = dut_host.host_info_store.get() 292 info.stable_versions['cros'] = stable_version 293 dut_host.host_info_store.commit(info) 294 295 def _get_dut_info_from_config(self): 296 """Get DUT info from json config file. 297 298 @returns a list of dicts that each dict represents a dut. 299 """ 300 ctrl_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 301 'labstation_to_dut_map.json') 302 with open(ctrl_path, 'r') as f: 303 data = json.load(f, object_hook=self._byteify) 304 # create a default dut dict in case the servohost is not in config 305 # map, this is normally happened in local testing. 306 default_dut = { 307 'hostname': None, 308 'servo_port': '9999', 309 'servo_serial': None 310 } 311 return data.get(self.labstation_host.hostname, [default_dut]) 312 313 def _byteify(self, data, ignore_dicts=False): 314 """Helper method to convert unicode to string. 315 """ 316 # In python2 we need to convert unicode into str, however in python3 317 # unicode renamed to str so json.load will load data into str by 318 # default so run encode with a str in python3 will actually convert 319 # the data into bytes which we don't want. To make the test compatible 320 # to both python2 and python3, we check data type != dict, list, str 321 # for now. # TODO(xianuowang@): remove this method once we fully 322 # migrated to python3. 323 if type(data) not in {dict, list, str}: 324 return data.encode('utf-8') 325 if isinstance(data, list): 326 return [self._byteify(item, ignore_dicts=True) for item in data] 327 if isinstance(data, dict) and not ignore_dicts: 328 return { 329 self._byteify(key, ignore_dicts=True): 330 self._byteify(value, ignore_dicts=True) 331 for key, value in list(data.items()) 332 } 333 return data 334 335 def _setup_servod(self): 336 """Setup all servod instances under servohost for later testing. 337 """ 338 for dut in self.dut_list: 339 # Use board: nami as default for local testing. 340 board = dut.get('board', 'nami') 341 port = dut.get('servo_port') 342 serial = dut.get('servo_serial') 343 servo_args = { 344 servo_constants.SERVO_HOST_ATTR: 345 self.labstation_host.hostname, 346 servo_constants.SERVO_PORT_ATTR: port, 347 servo_constants.SERVO_SERIAL_ATTR: serial, 348 servo_constants.SERVO_BOARD_ATTR: board, 349 servo_constants.ADDITIONAL_SERVOD_ARGS: 'DUAL_V4=1', 350 'is_in_lab': False, 351 } 352 353 logging.info('Setting up servod for port %s', port) 354 # We need try_lab_servo option here, so servo firmware will get 355 # updated before run tests. 356 servo_host, _ = _servo_host.create_servo_host(None, 357 servo_args, 358 try_lab_servo=True) 359 try: 360 validate_cmd = 'servodutil show -p %s' % port 361 servo_host.run_grep(validate_cmd, 362 stdout_err_regexp='No servod scratch entry found.') 363 except error.AutoservRunError: 364 raise error.TestFail('Servod of port %s did not come up on' 365 ' labstation.' % port) 366 367 self.servo_hosts.append(servo_host) 368 369 def setup_hosts(self): 370 """Prepare all cros and servo hosts that need to run.""" 371 # Servod came up successfully at this point - build a ServoHost and 372 # CrosHost for later testing to verfiy servo functionality. 373 374 for dut_info, servo_host in zip(self.dut_list, self.servo_hosts): 375 dut_hostname = dut_info.get('hostname') 376 if not dut_hostname: 377 # TODO(coconutruben@): remove this statement once the inferring 378 # is the default. 379 logging.info('hostname not specified for DUT, through ' 380 'static config or command-line. Will attempt ' 381 'to infer through hardware address.') 382 dut_hostname = self.get_dut_on_servo_ip(servo_host) 383 labels = [] 384 if dut_info.get('board'): 385 labels.append('board:%s' % dut_info.get('board')) 386 if dut_info.get('model'): 387 labels.append('model:%s' % dut_info.get('model')) 388 info = host_info.HostInfo(labels=labels) 389 host_info_store = host_info.InMemoryHostInfoStore(info=info) 390 machine = { 391 'hostname': dut_hostname, 392 'host_info_store': host_info_store, 393 'afe_host': site_utils.EmptyAFEHost() 394 } 395 dut_host = factory.create_host(machine) 396 dut_host.set_servo_host(servo_host) 397 398 # Copy labstation's stable_version to dut_host for later test 399 # consume. 400 # TODO(xianuowang@): remove this logic once we figured out how to 401 # propagate DUT's stable_version to the test. 402 stable_version_from_config = dut_info.get('stable_version') 403 self._set_dut_stable_version(dut_host, stable_version_from_config) 404 # Store |dut_host| in |machine_dict| so that parallel running can 405 # find the host. 406 self.machine_dict[dut_host.hostname] = dut_host 407 408 def initialize(self, host, config=None, local=False): 409 """Setup servod on |host| to run subsequent tests. 410 411 @param host: LabstationHost object representing the servohost. 412 @param config: the args argument from test_that in a dict. 413 @param local: whether a test image is already on the usb stick. 414 """ 415 # Cache whether this is a local run or not. 416 self.local = local 417 # This list hosts the servo_hosts, in the same order as the |dut_list| 418 # below. 419 self.servo_hosts = [] 420 # This dict houses a mapping of |dut| hostnames to initialized cros_host 421 # objects for the tests to run. 422 self.machine_dict = {} 423 # Save the host. 424 self.labstation_host = host 425 # Make sure recovery is quick in case of failure. 426 self.job.fast = True 427 # Get list of duts under the servohost. 428 self.dut_list = self._get_dut_info_from_config() 429 # Setup servod for all duts. 430 self._setup_servod() 431 # We need a cros build number for testing download image to usb and 432 # use servo to reimage DUT purpose. So copying labstation's 433 # stable_version here since we don't really care about which build 434 # to install on the DUT. 435 self.cros_version = ( 436 self.labstation_host.host_info_store.get().cros_stable_version) 437 438 if config: 439 if 'dut_ip' in config: 440 # Retrieve DUT ip from args if caller specified it. 441 # |dut_ip| is special in that it can be used for (quick) setup 442 # testing if the setup is not in the configuration file. 443 # This has two implications: 444 # - the user can only test one dut/servo pair 445 # - the config has to be empty. 446 # TODO(coconutruben): remove this logic for a more holistic 447 # command-line overwrite solution. 448 if len(self.dut_list) == 1 and not self.dut_list[0]['hostname']: 449 self.dut_list[0]['hostname'] = config['dut_ip'] 450 logging.info('Setting the hostname of the only dut to %s.', 451 self.dut_list[0]['hostname']) 452 else: 453 logging.info('dut_ip %s will be ignored. The target ' 454 'labstation is to be part of static config.') 455 if 'cros_version' in config: 456 # We allow user to override a cros image build. 457 self.cros_version = config['cros_version'] 458 # Lastly, setup the hosts so that testing can occur in parallel. 459 self.setup_hosts() 460 461 def _run_on_machine(self, machine): 462 """Thin wrapper to run 'servo_Verification' on all machines. 463 464 @param machine: hostname of the dut to run 'servo_Verification' against. 465 466 @raises error.TestFail: 'servo_Verification' fails 467 @raises error.TestFail: |machine| unknown (not in |self.machine_dict|) 468 """ 469 dut_host = self.machine_dict.get(machine, None) 470 if dut_host is None: 471 raise error.TestFail('dut machine %r not known to suite. Known ' 472 'machines: %r', machine, 473 ', '.join(list(self.machine_dict.keys()))) 474 logging.info('About to run on machine %s', machine) 475 if not self.job.run_test('servo_Verification', host=dut_host, 476 local=self.local): 477 raise error.TestFail('At least one test failed.') 478 479 def run_once(self): 480 """Run through all hosts in |self.machine_dict|.""" 481 self.job.parallel_simple(self._run_on_machine, 482 list(self.machine_dict.keys())) 483 # TODO(coconutruben): at this point, you can print a report what kind of 484 # servod setups failed and which succeeded. Build that out so that 485 # debugging failures is cleaner given multiple setups. 486 487 def cleanup(self): 488 """Clean up by calling close for dut host, which will also take care 489 of servo cleanup. 490 """ 491 for _, dut in list(self.machine_dict.items()): 492 dut.close() 493