xref: /aosp_15_r20/external/autotest/server/cros/faft/fingerprint_test.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright 2018 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import logging
7import os
8import time
9
10import six
11
12from autotest_lib.server import test
13from autotest_lib.server.cros import filesystem_util
14from autotest_lib.client.common_lib import error, utils
15
16
17class FingerprintTest(test.test):
18    """Base class that sets up helpers for fingerprint tests."""
19    version = 1
20
21    # Location of firmware from the build on the DUT
22    _FINGERPRINT_BUILD_FW_DIR = '/opt/google/biod/fw'
23
24    _DISABLE_FP_UPDATER_FILE = '.disable_fp_updater'
25
26    _UPSTART_DIR = '/etc/init'
27    _BIOD_UPSTART_JOB_FILE = 'biod.conf'
28    _STATEFUL_PARTITION_DIR = '/mnt/stateful_partition'
29
30    _GENIMAGES_SCRIPT_NAME = 'gen_test_images.sh'
31    _GENIMAGES_OUTPUT_DIR_NAME = 'images'
32
33    _TEST_IMAGE_FORMAT_MAP = {
34        'TEST_IMAGE_ORIGINAL': '%s.bin',
35        'TEST_IMAGE_DEV': '%s.dev',
36        'TEST_IMAGE_CORRUPT_FIRST_BYTE': '%s_corrupt_first_byte.bin',
37        'TEST_IMAGE_CORRUPT_LAST_BYTE': '%s_corrupt_last_byte.bin',
38        'TEST_IMAGE_DEV_RB_ZERO': '%s.dev.rb0',
39        'TEST_IMAGE_DEV_RB_ONE': '%s.dev.rb1',
40        'TEST_IMAGE_DEV_RB_NINE': '%s.dev.rb9'
41    }
42
43    _ROLLBACK_ZERO_BLOCK_ID = '0'
44    _ROLLBACK_INITIAL_BLOCK_ID = '1'
45    _ROLLBACK_INITIAL_MIN_VERSION = '0'
46    _ROLLBACK_INITIAL_RW_VERSION = '0'
47
48    _SERVER_GENERATED_FW_DIR_NAME = 'generated_fw'
49
50    _DUT_TMP_PATH_BASE = '/tmp/fp_test'
51
52    # Name of key in "futility show" output corresponds to the signing key ID
53    _FUTILITY_KEY_ID_KEY_NAME = 'ID'
54
55    # Types of firmware
56    _FIRMWARE_TYPE_RO = 'RO'
57    _FIRMWARE_TYPE_RW = 'RW'
58
59    # Types of signing keys
60    _KEY_TYPE_DEV = 'dev'
61    _KEY_TYPE_PRE_MP = 'premp'
62    _KEY_TYPE_MP = 'mp'
63
64    # EC board names for FPMCUs
65    _FP_BOARD_NAME_BLOONCHIPPER = 'bloonchipper'
66    _FP_BOARD_NAME_DARTMONKEY = 'dartmonkey'
67    _FP_BOARD_NAME_NOCTURNE = 'nocturne_fp'
68    _FP_BOARD_NAME_NAMI = 'nami_fp'
69
70    # Map from signing key ID to type of signing key
71    _KEY_ID_MAP_ = {
72        # bloonchipper
73        '61382804da86b4156d666cc9a976088f8b647d44': _KEY_TYPE_DEV,
74        '07b1af57220c196e363e68d73a5966047c77011e': _KEY_TYPE_PRE_MP,
75        '1c590ef36399f6a2b2ef87079c135b69ef89eb60': _KEY_TYPE_MP,
76
77        # dartmonkey
78        '257a0aa3ac9e81aa4bc3aabdb6d3d079117c5799': _KEY_TYPE_MP,
79
80        # nocturne
81        '8a8fc039a9463271995392f079b83ce33832d07d': _KEY_TYPE_DEV,
82        '6f38c866182bd9bf7a4462c06ac04fa6a0074351': _KEY_TYPE_MP,
83        'f6f7d96c48bd154dbae7e3fe3a3b4c6268a10934': _KEY_TYPE_PRE_MP,
84
85        # nami
86        '754aea623d69975a22998f7b97315dd53115d723': _KEY_TYPE_PRE_MP,
87        '35486c0090ca390408f1fbbf2a182966084fe2f8': _KEY_TYPE_MP
88
89    }
90
91    # RO versions that are flashed in the factory
92    # (for eternity for a given board)
93    _GOLDEN_RO_FIRMWARE_VERSION_MAP = {
94            _FP_BOARD_NAME_BLOONCHIPPER: {
95                    'hatch': 'bloonchipper_v2.0.4277-9f652bb3',
96                    'zork': 'bloonchipper_v2.0.5938-197506c1',
97                    'volteer': 'bloonchipper_v2.0.5938-197506c1',
98                    'brya': 'bloonchipper_v2.0.5938-197506c1',
99                    'guybrush': 'bloonchipper_v2.0.5938-197506c1',
100            },
101            _FP_BOARD_NAME_DARTMONKEY: 'dartmonkey_v2.0.2887-311310808',
102            _FP_BOARD_NAME_NOCTURNE: 'nocturne_fp_v2.2.64-58cf5974e',
103            _FP_BOARD_NAME_NAMI: 'nami_fp_v2.2.144-7a08e07eb',
104    }
105
106    _FIRMWARE_VERSION_SHA256SUM = 'sha256sum'
107    _FIRMWARE_VERSION_RO_VERSION = 'ro_version'
108    _FIRMWARE_VERSION_RW_VERSION = 'rw_version'
109    _FIRMWARE_VERSION_KEY_ID = 'key_id'
110
111    # Map of attributes for a given board's various firmware file releases
112    #
113    # Two purposes:
114    #   1) Documents the exact versions and keys used for a given firmware file.
115    #   2) Used to verify that files that end up in the build (and therefore
116    #      what we release) is exactly what we expect.
117    _FIRMWARE_VERSION_MAP = {
118        _FP_BOARD_NAME_BLOONCHIPPER: {
119            'bloonchipper_v2.0.4277-9f652bb3-RO_v2.0.13589-727a419-RW.bin': {
120                _FIRMWARE_VERSION_SHA256SUM: 'b500a08d1c4f49ac1455214f1957f178288a2f4b36b40e7cd49acad1d0896ccc',
121                _FIRMWARE_VERSION_RO_VERSION: 'bloonchipper_v2.0.4277-9f652bb3',
122                _FIRMWARE_VERSION_RW_VERSION: 'bloonchipper_v2.0.13589-727a419',
123                _FIRMWARE_VERSION_KEY_ID: '1c590ef36399f6a2b2ef87079c135b69ef89eb60',
124            },
125            'bloonchipper_v2.0.5938-197506c1-RO_v2.0.13589-727a419-RW.bin': {
126                _FIRMWARE_VERSION_SHA256SUM: 'dfa1a9e409893441c990edde86dbe6d0e301c03b7a9e604ec6af5fc1691ef1be',
127                _FIRMWARE_VERSION_RO_VERSION: 'bloonchipper_v2.0.5938-197506c1',
128                _FIRMWARE_VERSION_RW_VERSION: 'bloonchipper_v2.0.13589-727a419',
129                _FIRMWARE_VERSION_KEY_ID: '1c590ef36399f6a2b2ef87079c135b69ef89eb60',
130            },
131        },
132        _FP_BOARD_NAME_NOCTURNE: {
133            'nocturne_fp_v2.2.64-58cf5974e-RO_v2.0.13584-6fcfe697-RW.bin': {
134                _FIRMWARE_VERSION_SHA256SUM: '8ebc978bf18fc1629a8ab9b33ac91817d850ce5ca9c55dc69c99b0acfb540948',
135                _FIRMWARE_VERSION_RO_VERSION: 'nocturne_fp_v2.2.64-58cf5974e',
136                _FIRMWARE_VERSION_RW_VERSION: 'nocturne_fp_v2.0.13584-6fcfe697',
137                _FIRMWARE_VERSION_KEY_ID: '6f38c866182bd9bf7a4462c06ac04fa6a0074351',
138            },
139        },
140        _FP_BOARD_NAME_NAMI: {
141            'nami_fp_v2.2.144-7a08e07eb-RO_v2.0.13584-6fcfe69780-RW.bin': {
142                _FIRMWARE_VERSION_SHA256SUM: 'e198db08020ac71a11a53d641d6ada750061fb3f3faa2728aab7835266ed9e7b',
143                _FIRMWARE_VERSION_RO_VERSION: 'nami_fp_v2.2.144-7a08e07eb',
144                _FIRMWARE_VERSION_RW_VERSION: 'nami_fp_v2.0.13584-6fcfe69780',
145                _FIRMWARE_VERSION_KEY_ID: '35486c0090ca390408f1fbbf2a182966084fe2f8',
146            },
147        },
148        _FP_BOARD_NAME_DARTMONKEY: {
149            'dartmonkey_v2.0.2887-311310808-RO_v2.0.13584-6fcfe6978-RW.bin': {
150                _FIRMWARE_VERSION_SHA256SUM: '8fa168c19d886b5fe8e852bba7d3b04cd0cd2344d377d9b3d278a45d76b206a1',
151                _FIRMWARE_VERSION_RO_VERSION: 'dartmonkey_v2.0.2887-311310808',
152                _FIRMWARE_VERSION_RW_VERSION: 'dartmonkey_v2.0.13584-6fcfe6978',
153                _FIRMWARE_VERSION_KEY_ID: '257a0aa3ac9e81aa4bc3aabdb6d3d079117c5799',
154            }
155        }
156    }
157
158    _BIOD_UPSTART_JOB_NAME = 'biod'
159    _POWERD_UPSTART_JOB_NAME = 'powerd'
160    # TODO(crbug.com/925545)
161    _TIMBERSLIDE_UPSTART_JOB_NAME = \
162        'timberslide LOG_PATH=/sys/kernel/debug/cros_fp/console_log'
163
164    _INIT_ENTROPY_CMD = 'bio_wash --factory_init'
165
166    _CROS_FP_ARG = '--name=cros_fp'
167    _CROS_CONFIG_FINGERPRINT_PATH = '/fingerprint'
168    _ECTOOL_RO_VERSION = 'RO version'
169    _ECTOOL_RW_VERSION = 'RW version'
170    _ECTOOL_FIRMWARE_COPY = 'Firmware copy'
171    _ECTOOL_ROLLBACK_BLOCK_ID = 'Rollback block id'
172    _ECTOOL_ROLLBACK_MIN_VERSION = 'Rollback min version'
173    _ECTOOL_ROLLBACK_RW_VERSION = 'RW rollback version'
174
175    @staticmethod
176    def _parse_colon_delimited_output(ectool_output):
177        """
178        Converts ectool's (or any other tool with similar output) colon
179        delimited output into python dict. Ignores any lines that do not have
180        colons.
181
182        Example:
183        RO version:    nocturne_fp_v2.2.64-58cf5974e
184        RW version:    nocturne_fp_v2.2.110-b936c0a3c
185
186        becomes:
187        {
188          'RO version': 'nocturne_fp_v2.2.64-58cf5974e',
189          'RW version': 'nocturne_fp_v2.2.110-b936c0a3c'
190        }
191        """
192        ret = {}
193        try:
194            for line in ectool_output.strip().split('\n'):
195                splits = line.split(':', 1)
196                if len(splits) != 2:
197                    continue
198                key = splits[0].strip()
199                val = splits[1].strip()
200                ret[key] = val
201        except:
202            raise error.TestFail('Unable to parse ectool output: %s'
203                                 % ectool_output)
204        return ret
205
206    def initialize(self, host):
207        """Perform minimal initialization, to avoid AttributeError in cleanup"""
208        self.host = host
209        self.servo = host.servo
210
211        self._validate_compatible_servo_version()
212
213        self.servo.initialize_dut()
214
215        self.fp_board = self.get_fp_board()
216        self._build_fw_file = self.get_build_fw_file()
217
218    def setup_test(self, test_dir, use_dev_signed_fw=False,
219                   enable_hardware_write_protect=True,
220                   enable_software_write_protect=True,
221                   force_firmware_flashing=False, init_entropy=True):
222        """Perform more complete initialization, including copying test files"""
223        logging.info('HW write protect enabled: %s',
224                     self.is_hardware_write_protect_enabled())
225
226        # TODO(crbug.com/925545): stop timberslide so /var/log/cros_fp.log
227        # continues to update after flashing.
228        self._timberslide_running = self.host.upstart_status(
229            self._TIMBERSLIDE_UPSTART_JOB_NAME)
230        if self._timberslide_running:
231            logging.info('Stopping %s', self._TIMBERSLIDE_UPSTART_JOB_NAME)
232            self.host.upstart_stop(self._TIMBERSLIDE_UPSTART_JOB_NAME)
233
234        self._biod_running = self.host.upstart_status(
235            self._BIOD_UPSTART_JOB_NAME)
236        if self._biod_running:
237            logging.info('Stopping %s', self._BIOD_UPSTART_JOB_NAME)
238            self.host.upstart_stop(self._BIOD_UPSTART_JOB_NAME)
239
240        # TODO(b/183123775): Remove when bug is fixed.
241        #  Disabling powerd to prevent the display from turning off, which kills
242        #  USB on some platforms.
243        self._powerd_running = self.host.upstart_status(
244            self._POWERD_UPSTART_JOB_NAME)
245        if self._powerd_running:
246            logging.info('Stopping %s', self._POWERD_UPSTART_JOB_NAME)
247            self.host.upstart_stop(self._POWERD_UPSTART_JOB_NAME)
248
249        # On some platforms an AP reboot is needed after flashing firmware to
250        # rebind the driver.
251        self._dut_needs_reboot = self.is_uart_device()
252
253        if filesystem_util.is_rootfs_writable(self.host):
254            if self._dut_needs_reboot:
255                logging.warning('rootfs is writable')
256            else:
257                raise error.TestFail('rootfs is writable')
258
259        if not self.biod_upstart_job_enabled():
260            raise error.TestFail(
261                    'Biod upstart job is disabled at the beginning of test')
262        if not self.fp_updater_is_enabled():
263            raise error.TestFail(
264                    'Fingerprint firmware updater is disabled at the beginning of test'
265            )
266
267        # Disable biod and updater so that they won't interfere after reboot.
268        if self._dut_needs_reboot:
269            self.disable_biod_upstart_job()
270            self.disable_fp_updater()
271
272        # create tmp working directory on device (automatically cleaned up)
273        self._dut_working_dir = self.host.get_tmp_dir(
274            parent=self._DUT_TMP_PATH_BASE)
275        logging.info('Created dut_working_dir: %s', self._dut_working_dir)
276        self.copy_files_to_dut(test_dir, self._dut_working_dir)
277
278        self.validate_build_fw_file()
279
280        gen_script = os.path.abspath(os.path.join(self.autodir,
281                                                  'server', 'cros', 'faft',
282                                                  self._GENIMAGES_SCRIPT_NAME))
283        self._dut_firmware_test_images_dir = \
284            self._generate_test_firmware_images(gen_script,
285                                                self._build_fw_file,
286                                                self._dut_working_dir)
287        logging.info('dut_firmware_test_images_dir: %s',
288                     self._dut_firmware_test_images_dir)
289
290        self._initialize_test_firmware_image_attrs(
291            self._dut_firmware_test_images_dir)
292
293        self._initialize_running_fw_version(use_dev_signed_fw,
294                                            force_firmware_flashing)
295
296        if init_entropy:
297            self._initialize_fw_entropy()
298
299        self._initialize_hw_and_sw_write_protect(enable_hardware_write_protect,
300                                                 enable_software_write_protect)
301
302    def cleanup(self):
303        """Restores original state."""
304        if hasattr(self, '_need_fw_restore') and self._need_fw_restore:
305            # Once the tests complete we need to make sure we're running the
306            # original firmware (not dev version) and potentially reset rollback.
307            self._initialize_running_fw_version(use_dev_signed_fw=False,
308                                                force_firmware_flashing=False)
309            self._initialize_fw_entropy()
310
311        # Re-enable biod and updater after flashing and initializing entropy so
312        # that they don't interfere if there was a reboot.
313        if hasattr(self, '_dut_needs_reboot') and self._dut_needs_reboot:
314            if not self.biod_upstart_job_enabled():
315                self.enable_biod_upstart_job()
316            if not self.fp_updater_is_enabled():
317                self.enable_fp_updater()
318        self._initialize_hw_and_sw_write_protect(
319            enable_hardware_write_protect=True,
320            enable_software_write_protect=True)
321        # TODO(b/183123775)
322        if hasattr(self, '_powerd_running') and self._powerd_running:
323            logging.info('Restarting powerd')
324            self.host.upstart_restart(self._POWERD_UPSTART_JOB_NAME)
325        if hasattr(self, '_biod_running') and self._biod_running:
326            logging.info('Restarting biod')
327            self.host.upstart_restart(self._BIOD_UPSTART_JOB_NAME)
328        # TODO(crbug.com/925545)
329        if hasattr(self, '_timberslide_running') and self._timberslide_running:
330            logging.info('Restarting timberslide')
331            self.host.upstart_restart(self._TIMBERSLIDE_UPSTART_JOB_NAME)
332
333        super(FingerprintTest, self).cleanup()
334
335    def after_run_once(self):
336        """Logs which iteration just ran."""
337        logging.info('successfully ran iteration %d', self.iteration)
338
339    def _validate_compatible_servo_version(self):
340        """Asserts if a compatible servo version is not attached."""
341        servo_version = self.servo.get_servo_version()
342        logging.info('servo version: %s', servo_version)
343
344    def _generate_test_firmware_images(self, gen_script, build_fw_file,
345                                       dut_working_dir):
346        """
347        Copies the fingerprint firmware from the DUT to the server running
348        the tests, which runs a script to generate various test versions of
349        the firmware.
350
351        @return full path to location of test images on DUT
352        """
353        # create subdirectory under existing tmp dir
354        server_tmp_dir = os.path.join(self.tmpdir,
355                                      self._SERVER_GENERATED_FW_DIR_NAME)
356        os.mkdir(server_tmp_dir)
357        logging.info('server_tmp_dir: %s', server_tmp_dir)
358
359        # Copy firmware from device to server
360        self.get_files_from_dut(build_fw_file, server_tmp_dir)
361
362        # Run the test image generation script on server
363        pushd = os.getcwd()
364        os.chdir(server_tmp_dir)
365        cmd = ' '.join([gen_script,
366                        self.get_fp_board(),
367                        os.path.basename(build_fw_file)])
368        result = self.run_server_cmd(cmd)
369        if result.exit_status != 0:
370            raise error.TestFail('Failed to run test image generation script')
371
372        os.chdir(pushd)
373
374        # Copy resulting files to DUT tmp dir
375        server_generated_images_dir = \
376            os.path.join(server_tmp_dir, self._GENIMAGES_OUTPUT_DIR_NAME)
377        self.copy_files_to_dut(server_generated_images_dir, dut_working_dir)
378
379        return os.path.join(dut_working_dir, self._GENIMAGES_OUTPUT_DIR_NAME)
380
381    def _initialize_test_firmware_image_attrs(self, dut_fw_test_images_dir):
382        """Sets attributes with full path to test images on DUT.
383
384        Example: self.TEST_IMAGE_DEV = /some/path/images/nocturne_fp.dev
385        """
386        for key, val in six.iteritems(self._TEST_IMAGE_FORMAT_MAP):
387            full_path = os.path.join(dut_fw_test_images_dir,
388                                     val % self.get_fp_board())
389            setattr(self, key, full_path)
390
391    def _initialize_running_fw_version(self, use_dev_signed_fw,
392                                       force_firmware_flashing):
393        """
394        Ensures that the running firmware version matches build version
395        and factory rollback settings; flashes to correct version if either
396        fails to match is requested to force flashing.
397
398        RO firmware: original version released at factory
399        RW firmware: firmware from current build
400        """
401        build_rw_firmware_version = \
402            self.get_build_rw_firmware_version(use_dev_signed_fw)
403        golden_ro_firmware_version = \
404            self.get_golden_ro_firmware_version(use_dev_signed_fw)
405        logging.info('Build RW firmware version: %s', build_rw_firmware_version)
406        logging.info('Golden RO firmware version: %s',
407                     golden_ro_firmware_version)
408
409        running_rw_firmware = self.ensure_running_rw_firmware()
410
411        fw_versions_match = self.running_fw_version_matches_given_version(
412            build_rw_firmware_version, golden_ro_firmware_version)
413
414        if not running_rw_firmware or not fw_versions_match \
415            or not self.is_rollback_set_to_initial_val() \
416            or force_firmware_flashing:
417            fw_file = self._build_fw_file
418            if use_dev_signed_fw:
419                fw_file = self.TEST_IMAGE_DEV
420            self.flash_rw_ro_firmware(fw_file)
421            if not self.running_fw_version_matches_given_version(
422                build_rw_firmware_version, golden_ro_firmware_version):
423                raise error.TestFail(
424                    'Running firmware version does not match expected version')
425
426    def _initialize_fw_entropy(self):
427        """Sets the entropy (key) in FPMCU flash (if not set)."""
428        result = self.run_cmd(self._INIT_ENTROPY_CMD)
429        if result.exit_status != 0:
430            raise error.TestFail('Unable to initialize entropy')
431
432    def _initialize_hw_and_sw_write_protect(self, enable_hardware_write_protect,
433                                            enable_software_write_protect):
434        """Enables/disables hardware/software write protect."""
435        # sw: 0, hw: 0 => initial_hw(0) -> sw(0) -> hw(0)
436        # sw: 0, hw: 1 => initial_hw(0) -> sw(0) -> hw(1)
437        # sw: 1, hw: 0 => initial_hw(1) -> sw(1) -> hw(0)
438        # sw: 1, hw: 1 => initial_hw(1) -> sw(1) -> hw(1)
439        hardware_write_protect_initial_enabled = True
440        if not enable_software_write_protect:
441            hardware_write_protect_initial_enabled = False
442
443        self.set_hardware_write_protect(hardware_write_protect_initial_enabled)
444
445        self.set_software_write_protect(enable_software_write_protect)
446        self.set_hardware_write_protect(enable_hardware_write_protect)
447
448    def get_fp_board(self):
449        """Returns name of fingerprint EC.
450
451        nocturne and nami are special cases and have "_fp" appended. Newer
452        FPMCUs have unique names.
453        See go/cros-fingerprint-firmware-branching-and-signing.
454        """
455        # Use cros_config to get fingerprint board.
456        # Due to b/160271883, we will try running the cmd via cat instead.
457        result = self._run_cros_config_cmd_cat('fingerprint/board')
458        if result.exit_status != 0:
459            raise error.TestFail(
460                'Unable to get fingerprint board with cros_config')
461        return result.stdout.rstrip()
462
463    def is_uart_device(self) -> bool:
464        """Returns True if the boards transpot device is UART"""
465        uart_devices = ['zork', 'guybrush']
466        return self.get_host_board() in uart_devices
467
468    def get_host_board(self):
469        """Returns name of the host board."""
470        return self.host.get_board().split(':')[-1]
471
472    def get_build_fw_file(self):
473        """Returns full path to build FW file on DUT."""
474        ls_cmd = 'ls %s/%s*.bin' % (
475            self._FINGERPRINT_BUILD_FW_DIR, self.fp_board)
476        result = self.run_cmd(ls_cmd)
477        if result.exit_status != 0:
478            raise error.TestFail(
479                'Unable to find firmware file on device:'
480                ' command failed (rc=%s): %s'
481                % (result.exit_status, result.stderr.strip() or ls_cmd))
482        ret = result.stdout.rstrip()
483        logging.info('Build firmware file: %s', ret)
484        return ret
485
486    def check_equal(self, a, b):
487        """Raises exception if "a" does not equal "b"."""
488        if a != b:
489            raise error.TestFail('"%s" does not match expected "%s" for board '
490                                 '%s' % (a, b, self.get_fp_board()))
491
492    def validate_build_fw_file(self,
493                               allowed_types=(_KEY_TYPE_PRE_MP, _KEY_TYPE_MP)):
494        """
495        Checks that all attributes in the given firmware file match their
496        expected values.
497
498        @param allowed_types: If key type is something else, raise TestFail.
499                              Default: pre-MP or MP.
500        @type allowed_types: tuple | list
501        """
502        build_fw_file = self._build_fw_file
503        # check hash
504        actual_hash = self._calculate_sha256sum(build_fw_file)
505        expected_hash = self._get_expected_firmware_hash(build_fw_file)
506        self.check_equal(actual_hash, expected_hash)
507
508        # check signing key_id
509        actual_key_id = self._read_firmware_key_id(build_fw_file)
510        expected_key_id = self._get_expected_firmware_key_id(build_fw_file)
511        self.check_equal(actual_key_id, expected_key_id)
512
513        # check that the signing key for firmware in the build
514        # is "pre mass production" (pre-mp) or "mass production" (MP)
515        key_type = self._get_key_type(actual_key_id)
516        if key_type not in allowed_types:
517            raise error.TestFail(
518                'Firmware key type must be %s for board %s; got %s (%s)' %
519                (' or '.join(allowed_types), self.fp_board, key_type,
520                 actual_key_id))
521
522        # check ro_version
523        actual_ro_version = self._read_firmware_ro_version(build_fw_file)
524        expected_ro_version = \
525            self._get_expected_firmware_ro_version(build_fw_file)
526        self.check_equal(actual_ro_version, expected_ro_version)
527
528        # check rw_version
529        actual_rw_version = self._read_firmware_rw_version(build_fw_file)
530        expected_rw_version = \
531            self._get_expected_firmware_rw_version(build_fw_file)
532        self.check_equal(actual_rw_version, expected_rw_version)
533
534        logging.info("Validated build firmware metadata.")
535
536    def _get_key_type(self, key_id):
537        """Returns the key "type" for a given "key id"."""
538        key_type = self._KEY_ID_MAP_.get(key_id)
539        if key_type is None:
540            raise error.TestFail('Unable to get key type for key id: %s'
541                                 % key_id)
542        return key_type
543
544    def _get_expected_firmware_info(self, build_fw_file, info_type):
545        """
546        Returns expected firmware info for a given firmware file name.
547        """
548        build_fw_file_name = os.path.basename(build_fw_file)
549
550        board = self.get_fp_board()
551        board_expected_fw_info = self._FIRMWARE_VERSION_MAP.get(board)
552        if board_expected_fw_info is None:
553            raise error.TestFail('Unable to get firmware info for board: %s'
554                                 % board)
555
556        expected_fw_info = board_expected_fw_info.get(build_fw_file_name)
557        if expected_fw_info is None:
558            raise error.TestFail('Unable to get firmware info for file: %s'
559                                 % build_fw_file_name)
560
561        ret = expected_fw_info.get(info_type)
562        if ret is None:
563            raise error.TestFail('Unable to get firmware info type: %s'
564                                 % info_type)
565
566        return ret
567
568    def _get_expected_firmware_hash(self, build_fw_file):
569        """Returns expected hash of firmware file."""
570        return self._get_expected_firmware_info(
571            build_fw_file, self._FIRMWARE_VERSION_SHA256SUM)
572
573    def _get_expected_firmware_key_id(self, build_fw_file):
574        """Returns expected "key id" for firmware file."""
575        return self._get_expected_firmware_info(
576            build_fw_file, self._FIRMWARE_VERSION_KEY_ID)
577
578    def _get_expected_firmware_ro_version(self, build_fw_file):
579        """Returns expected RO version for firmware file."""
580        return self._get_expected_firmware_info(
581            build_fw_file, self._FIRMWARE_VERSION_RO_VERSION)
582
583    def _get_expected_firmware_rw_version(self, build_fw_file):
584        """Returns expected RW version for firmware file."""
585        return self._get_expected_firmware_info(
586            build_fw_file, self._FIRMWARE_VERSION_RW_VERSION)
587
588    def _read_firmware_key_id(self, file_name):
589        """Returns "key id" as read from the given file."""
590        result = self._run_futility_show_cmd(file_name)
591        parsed = self._parse_colon_delimited_output(result)
592        key_id = parsed.get(self._FUTILITY_KEY_ID_KEY_NAME)
593        if key_id is None:
594            raise error.TestFail('Failed to get key ID for file: %s'
595                                 % file_name)
596        return key_id
597
598    def _read_firmware_ro_version(self, file_name):
599        """Returns RO firmware version as read from the given file."""
600        return self._run_dump_fmap_cmd(file_name, 'RO_FRID')
601
602    def _read_firmware_rw_version(self, file_name):
603        """Returns RW firmware version as read from the given file."""
604        return self._run_dump_fmap_cmd(file_name, 'RW_FWID')
605
606    def _calculate_sha256sum(self, file_name):
607        """Returns SHA256 hash of the given file contents."""
608        result = self._run_sha256sum_cmd(file_name)
609        return result.stdout.split()[0]
610
611    def _get_running_firmware_info(self, key):
612        """
613        Returns requested firmware info (RW version, RO version, or firmware
614        type).
615        """
616        result = self._run_ectool_cmd('version')
617        parsed = self._parse_colon_delimited_output(result.stdout)
618        if result.exit_status != 0:
619            raise error.TestFail('Failed to get running firmware info')
620        info = parsed.get(key)
621        if info is None:
622            raise error.TestFail(
623                'Failed to get running firmware info: %s' % key)
624        return info
625
626    def get_running_rw_firmware_version(self):
627        """Returns running RW firmware version."""
628        return self._get_running_firmware_info(self._ECTOOL_RW_VERSION)
629
630    def get_running_ro_firmware_version(self):
631        """Returns running RO firmware version."""
632        return self._get_running_firmware_info(self._ECTOOL_RO_VERSION)
633
634    def get_running_firmware_type(self):
635        """Returns type of firmware we are running (RW or RO)."""
636        return self._get_running_firmware_info(self._ECTOOL_FIRMWARE_COPY)
637
638    def _get_rollback_info(self, info_type):
639        """Returns requested type of rollback info."""
640        result = self._run_ectool_cmd('rollbackinfo')
641        parsed = self._parse_colon_delimited_output(result.stdout)
642        if result.exit_status != 0:
643            raise error.TestFail('Failed to get rollback info')
644        info = parsed.get(info_type)
645        if info is None:
646            raise error.TestFail('Failed to get rollback info: %s' % info_type)
647        return info
648
649    def get_rollback_id(self):
650        """Returns rollback ID."""
651        return self._get_rollback_info(self._ECTOOL_ROLLBACK_BLOCK_ID)
652
653    def get_rollback_min_version(self):
654        """Returns rollback min version."""
655        return self._get_rollback_info(self._ECTOOL_ROLLBACK_MIN_VERSION)
656
657    def get_rollback_rw_version(self):
658        """Returns RW rollback version."""
659        return self._get_rollback_info(self._ECTOOL_ROLLBACK_RW_VERSION)
660
661    def _construct_dev_version(self, orig_version):
662        """
663        Given a "regular" version string from a signed build, returns the
664        special "dev" version that we use when creating the test images.
665        """
666        fw_version = orig_version
667        if len(fw_version) + len('.dev') > 31:
668            fw_version = fw_version[:27]
669        fw_version = fw_version + '.dev'
670        return fw_version
671
672    def get_golden_ro_firmware_version(self, use_dev_signed_fw):
673        """Returns RO firmware version used in factory."""
674        board = self.get_fp_board()
675        golden_version = self._GOLDEN_RO_FIRMWARE_VERSION_MAP.get(board)
676        if isinstance(golden_version, dict):
677            golden_version = golden_version.get(self.get_host_board())
678        if golden_version is None:
679            raise error.TestFail('Unable to get golden RO version for board: %s'
680                                 % board)
681        if use_dev_signed_fw:
682            golden_version = self._construct_dev_version(golden_version)
683        return golden_version
684
685    def get_build_rw_firmware_version(self, use_dev_signed_fw):
686        """Returns RW firmware version from build."""
687        fw_version = self._read_firmware_rw_version(self._build_fw_file)
688        if use_dev_signed_fw:
689            fw_version = self._construct_dev_version(fw_version)
690        return fw_version
691
692    def ensure_running_rw_firmware(self):
693        """
694        Check whether the device is running RW firmware. If not, try rebooting
695        to RW.
696
697        @return true if successfully verified running RW firmware, false
698        otherwise.
699        """
700        try:
701            if self.get_running_firmware_type() != self._FIRMWARE_TYPE_RW:
702                self._reboot_ec()
703                if self.get_running_firmware_type() != self._FIRMWARE_TYPE_RW:
704                    # RW may be corrupted.
705                    return False
706        except:
707            # We may not always be able to read the firmware version.
708            # For example, if the firmware is erased due to RDP1, running any
709            # commands (such as getting the version) won't work.
710            return False
711        return True
712
713    def running_fw_version_matches_given_version(self, rw_version, ro_version):
714        """
715        Returns True if the running RO and RW firmware versions match the
716        provided versions.
717        """
718        try:
719            running_rw_firmware_version = self.get_running_rw_firmware_version()
720            running_ro_firmware_version = self.get_running_ro_firmware_version()
721
722            logging.info('RW firmware, running: %s, expected: %s',
723                         running_rw_firmware_version, rw_version)
724            logging.info('RO firmware, running: %s, expected: %s',
725                         running_ro_firmware_version, ro_version)
726
727            return (running_rw_firmware_version == rw_version and
728                    running_ro_firmware_version == ro_version)
729        except:
730            # We may not always be able to read the firmware version.
731            # For example, if the firmware is erased due to RDP1, running any
732            # commands (such as getting the version) won't work.
733            return False
734
735    def is_rollback_set_to_initial_val(self):
736        """
737        Returns True if rollbackinfo matches the initial value that it
738        should have coming from the factory.
739        """
740        return (self.get_rollback_id() ==
741                self._ROLLBACK_INITIAL_BLOCK_ID
742                and
743                self.get_rollback_min_version() ==
744                self._ROLLBACK_INITIAL_MIN_VERSION
745                and
746                self.get_rollback_rw_version() ==
747                self._ROLLBACK_INITIAL_RW_VERSION)
748
749    def is_rollback_unset(self):
750        """
751        Returns True if rollbackinfo matches the uninitialized value that it
752        should have after flashing the entire flash.
753        """
754        return (self.get_rollback_id() == self._ROLLBACK_ZERO_BLOCK_ID
755                and self.get_rollback_min_version() ==
756                self._ROLLBACK_INITIAL_MIN_VERSION
757                and self.get_rollback_rw_version() ==
758                self._ROLLBACK_INITIAL_RW_VERSION)
759
760    def biod_upstart_job_enabled(self):
761        """Returns whether biod's upstart job file is at original location."""
762        return self.host.is_file_exists(
763                os.path.join(self._UPSTART_DIR, self._BIOD_UPSTART_JOB_FILE))
764
765    def disable_biod_upstart_job(self):
766        """
767        Disable biod's upstart job so that biod will not run after a reboot.
768        """
769        logging.info('Disabling biod\'s upstart job')
770        filesystem_util.make_rootfs_writable(self.host)
771        cmd = 'mv %s %s' % (os.path.join(
772                self._UPSTART_DIR,
773                self._BIOD_UPSTART_JOB_FILE), self._STATEFUL_PARTITION_DIR)
774        result = self.run_cmd(cmd)
775        if result.exit_status != 0:
776            raise error.TestFail('Unable to disable biod upstart job: %s' %
777                                 result.stderr.strip())
778
779    def enable_biod_upstart_job(self):
780        """
781        Enable biod's upstart job so that biod will run after a reboot.
782        """
783        logging.info('Enabling biod\'s upstart job')
784        filesystem_util.make_rootfs_writable(self.host)
785        cmd = 'mv %s %s' % (os.path.join(
786                self._STATEFUL_PARTITION_DIR,
787                self._BIOD_UPSTART_JOB_FILE), self._UPSTART_DIR)
788        result = self.run_cmd(cmd)
789        if result.exit_status != 0:
790            raise error.TestFail('Unable to enable biod upstart job: %s' %
791                                 result.stderr.strip())
792
793    def fp_updater_is_enabled(self):
794        """Returns whether the fingerprint firmware updater is disabled."""
795        return not self.host.is_file_exists(
796                os.path.join(self._FINGERPRINT_BUILD_FW_DIR,
797                             self._DISABLE_FP_UPDATER_FILE))
798
799    def disable_fp_updater(self):
800        """Disable the fingerprint firmware updater."""
801        filesystem_util.make_rootfs_writable(self.host)
802        touch_cmd = 'touch %s' % os.path.join(self._FINGERPRINT_BUILD_FW_DIR,
803                                              self._DISABLE_FP_UPDATER_FILE)
804        logging.info('Disabling fp firmware updater')
805        result = self.run_cmd(touch_cmd)
806        if result.exit_status != 0:
807            raise error.TestFail(
808                    'Unable to write file to disable fp updater:'
809                    ' command failed (rc=%s): %s' %
810                    (result.exit_status, result.stderr.strip() or touch_cmd))
811        self.run_cmd('sync')
812
813    def enable_fp_updater(self):
814        """
815        Enable the fingerprint firmware updater. Must be called only after
816        disable_fp_updater().
817        """
818        filesystem_util.make_rootfs_writable(self.host)
819        rm_cmd = 'rm %s' % os.path.join(self._FINGERPRINT_BUILD_FW_DIR,
820                                        self._DISABLE_FP_UPDATER_FILE)
821        logging.info('Enabling fp firmware updater')
822        result = self.run_cmd(rm_cmd)
823        if result.exit_status != 0:
824            raise error.TestFail(
825                    'Unable to rm .disable_fp_updater:'
826                    ' command failed (rc=%s): %s' %
827                    (result.exit_status, result.stderr.strip() or rm_cmd))
828        self.run_cmd('sync')
829
830    def flash_rw_ro_firmware(self, fw_path):
831        """Flashes *all* firmware (both RO and RW)."""
832        # Check if FPMCU firmware needs to be re-flashed during cleanup
833        self._need_fw_restore = True
834        self.set_hardware_write_protect(False)
835        flash_cmd = 'flash_fp_mcu' + ' --noservices ' + fw_path
836        logging.info('Running flash cmd: %s', flash_cmd)
837        flash_result = self.run_cmd(flash_cmd)
838        self.set_hardware_write_protect(True)
839
840        # Zork cannot rebind cros-ec-uart after flashing, so an AP reboot is
841        # needed to talk to FPMCU. See b/170213489.
842        # We have to do this even if flashing failed.
843        if hasattr(self, '_dut_needs_reboot') and self._dut_needs_reboot:
844            self.host.reboot()
845            if self.fp_updater_is_enabled():
846                raise error.TestFail(
847                        'Fp updater was not disabled when firmware is flashed')
848            # If we just re-enable fp updater, it can still update (race
849            # condition), so do it later in cleanup.
850
851        if flash_result.exit_status != 0:
852            raise error.TestFail('Flashing RW/RO firmware failed')
853
854    def is_hardware_write_protect_enabled(self):
855        """Returns state of hardware write protect."""
856        fw_wp_state = self.servo.get('fw_wp_state')
857        return fw_wp_state == 'on' or fw_wp_state == 'force_on'
858
859    def set_hardware_write_protect(self, enable):
860        """Enables or disables hardware write protect."""
861        self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
862
863    def set_software_write_protect(self, enable):
864        """Enables or disables software write protect."""
865        arg  = 'enable' if enable else 'disable'
866        self._run_ectool_cmd('flashprotect ' + arg)
867        # TODO(b/116396469): The flashprotect command returns an error even on
868        # success.
869        # if result.exit_status != 0:
870        #    raise error.TestFail('Failed to modify software write protect')
871
872        # TODO(b/116396469): "flashprotect enable" command is slow, so wait for
873        # it to complete before attempting to reboot.
874        time.sleep(2)
875        self._reboot_ec()
876
877    def _reboot_ec(self):
878        """Reboots the fingerprint MCU (FPMCU)."""
879        self._run_ectool_cmd('reboot_ec')
880        # TODO(b/116396469): The reboot_ec command returns an error even on
881        # success.
882        # if result.exit_status != 0:
883        #    raise error.TestFail('Failed to reboot ec')
884        time.sleep(2)
885
886    def get_files_from_dut(self, src, dst):
887        """Copes files from DUT to server."""
888        logging.info('Copying files from (%s) to (%s).', src, dst)
889        self.host.get_file(src, dst, delete_dest=True)
890
891    def copy_files_to_dut(self, src_dir, dst_dir):
892        """Copies files from server to DUT."""
893        logging.info('Copying files from (%s) to (%s).', src_dir, dst_dir)
894        self.host.send_file(src_dir, dst_dir, delete_dest=True)
895        # Sync the filesystem in case we need to reboot the AP soon.
896        self.run_cmd('sync')
897
898    def run_server_cmd(self, command, timeout=65):
899        """Runs command on server; return result with output and exit code."""
900        logging.info('Server execute: %s', command)
901        result = utils.run(command, timeout=timeout, ignore_status=True)
902        logging.info('exit_code: %d', result.exit_status)
903        logging.info('stdout:\n%s', result.stdout)
904        logging.info('stderr:\n%s', result.stderr)
905        return result
906
907    def run_cmd(self, command, timeout=300):
908        """Runs command on the DUT; return result with output and exit code."""
909        logging.debug('DUT Execute: %s', command)
910        result = self.host.run(command, timeout=timeout, ignore_status=True)
911        logging.info('exit_code: %d', result.exit_status)
912        logging.info('stdout:\n%s', result.stdout)
913        logging.info('stderr:\n%s', result.stderr)
914        return result
915
916    def _run_ectool_cmd(self, command):
917        """Runs ectool on DUT; return result with output and exit code."""
918        cmd = 'ectool ' + self._CROS_FP_ARG + ' ' + command
919        result = self.run_cmd(cmd)
920        return result
921
922    def _run_cros_config_cmd(self, command):
923        """Runs cros_config on DUT; return result with output and exit code."""
924        cmd = 'cros_config ' + self._CROS_CONFIG_FINGERPRINT_PATH + ' ' \
925              + command
926        result = self.run_cmd(cmd)
927        return result
928
929    def _run_cros_config_cmd_cat(self, command):
930        """Runs cat /run/chromeos-config/v1 on DUT; return result."""
931        cmd = "cat /run/chromeos-config/v1/{}".format(command)
932        return self.run_cmd(cmd)
933
934    def _run_dump_fmap_cmd(self, fw_file, section):
935        """
936        Runs "dump_fmap" on DUT for given file.
937        Returns value of given section.
938        """
939        # Write result to stderr while redirecting stderr to stdout
940        # and dropping stdout. This is done because dump_map only writes the
941        # value read from a section to a file (will not just print it to
942        # stdout).
943        cmd = 'dump_fmap -x ' + fw_file + ' ' + section +\
944              ':/dev/stderr /dev/stderr >& /dev/stdout > /dev/null'
945        result = self.run_cmd(cmd)
946        if result.exit_status != 0:
947            raise error.TestFail('Failed to read section: %s' % section)
948        return result.stdout.rstrip('\0')
949
950    def _run_futility_show_cmd(self, fw_file):
951        """
952        Runs "futility show" on DUT for given file.
953        Returns stdout on success.
954        """
955        futility_cmd = 'futility show ' + fw_file
956        result = self.run_cmd(futility_cmd)
957        if result.exit_status != 0:
958            raise error.TestFail('Unable to run futility on device')
959        return result.stdout
960
961    def _run_sha256sum_cmd(self, file_name):
962        """
963        Runs "sha256sum" on DUT for given file.
964        Returns stdout on success.
965        """
966        sha_cmd = 'sha256sum ' + file_name
967        result = self.run_cmd(sha_cmd)
968        if result.exit_status != 0:
969            raise error.TestFail('Unable to calculate sha256sum on device')
970        return result
971
972    def run_test(self, test_name, *args):
973        """Runs test on DUT."""
974        logging.info('Running %s', test_name)
975        # Redirecting stderr to stdout since some commands intentionally fail
976        # and it's easier to read when everything ordered in the same output
977        test_cmd = ' '.join([os.path.join(self._dut_working_dir, test_name)] +
978                            list(args) + ['2>&1'])
979        # Change the working dir so we can write files from within the test
980        # (otherwise defaults to $HOME (/root), which is not usually writable)
981        # Note that dut_working_dir is automatically cleaned up so tests don't
982        # need to worry about files from previous invocations or other tests.
983        test_cmd = '(cd ' + self._dut_working_dir + ' && ' + test_cmd + ')'
984        logging.info('Test command: %s', test_cmd)
985        result = self.run_cmd(test_cmd)
986        if result.exit_status != 0:
987            raise error.TestFail(test_name + ' failed')
988