1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import print_function 6 7import logging 8import time 9import six 10import subprocess 11 12from autotest_lib.client.common_lib import error, utils 13from autotest_lib.client.common_lib.cros import tpm_utils 14from autotest_lib.client.cros import constants 15from autotest_lib.server.cros.faft.firmware_test import FirmwareTest 16 17 18class firmware_IntegratedU2F(FirmwareTest): 19 """Verify U2F using the on-board cr50 firmware works.""" 20 version = 1 21 22 U2F_TEST_PATH = '/usr/local/bin/U2FTest' 23 24 U2F_FORCE_PATH = '/var/lib/u2f/force/u2f.force' 25 G2F_FORCE_PATH = '/var/lib/u2f/force/g2f.force' 26 USER_KEYS_FORCE_PATH = '/var/lib/u2f/force/user_keys.force' 27 28 VID = '18D1' 29 PID = '502C' 30 SHORT_WAIT = 1 31 32 def cleanup(self): 33 """Remove *.force files""" 34 self.host.run('rm -f /var/lib/u2f/force/*.force') 35 36 # Restart u2fd so that flag change takes effect. 37 self.host.run('restart u2fd') 38 39 # Put the device back to a known state; also restarts the device. 40 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 41 42 super(firmware_IntegratedU2F, self).cleanup() 43 44 45 def u2fd_is_running(self): 46 """Returns True if u2fd is running on the host""" 47 return 'running' in self.host.run('status u2fd').stdout 48 49 50 def cryptohome_ready(self): 51 """Return True if cryptohome is running.""" 52 return 'running' in self.host.run('status cryptohomed').stdout 53 54 55 def owner_key_exists(self): 56 """Return True if constants.OWNER_KEY_FILE exists.""" 57 logging.info('checking for owner key') 58 return self.host.path_exists(constants.OWNER_KEY_FILE) 59 60 61 def wait_for_policy(self): 62 """Start u2fd on the host""" 63 64 # Wait for cryptohome to show the TPM is ready before logging in. 65 if not utils.wait_for_value(self.cryptohome_ready, True, 66 timeout_sec=60): 67 raise error.TestError('Cryptohome did not start') 68 69 # Wait for the owner key to exist before trying to start u2fd. 70 if not utils.wait_for_value(self.owner_key_exists, True, 71 timeout_sec=120): 72 raise error.TestError('Device did not create owner key') 73 74 75 def attestation_init_complete(self): 76 """Return True if prepare_for_enrollment has completed""" 77 return 'prepared_for_enrollment: true' in self.host.run( 78 'attestation_client status').stdout 79 80 def chaps_init_complete(self): 81 """Return True if chaps token initialization has completed""" 82 try: 83 return 'available with 2 token' in self.host.run( 84 'chaps_client --ping').stderr 85 except error.AutoservRunError: 86 logging.info('Chaps no response') 87 return False 88 89 def wait_for_cr50(self): 90 """Wait for cr50 to complete any OOBE initialization""" 91 92 if not utils.wait_for_value( 93 self.attestation_init_complete, True, timeout_sec=120): 94 raise error.TestError('Attestation initialization did not complete') 95 96 if not utils.wait_for_value( 97 self.chaps_init_complete, True, timeout_sec=120): 98 raise error.TestError('Chaps initialization did not complete') 99 100 101 def set_u2fd_flags(self, u2f, g2f, user_keys): 102 # Start by removing all flags. 103 self.host.run('rm -f /var/lib/u2f/force/*.force') 104 105 if u2f: 106 self.host.run('touch %s' % self.U2F_FORCE_PATH) 107 108 if g2f: 109 self.host.run('touch %s' % self.G2F_FORCE_PATH) 110 111 if user_keys: 112 self.host.run('touch %s' % self.USER_KEYS_FORCE_PATH) 113 114 # Restart u2fd so that flag change takes effect. 115 self.host.run('restart u2fd') 116 117 # Make sure it is still running 118 if not self.u2fd_is_running(): 119 raise error.TestFail('could not start u2fd') 120 logging.info('u2fd is running') 121 122 123 def find_u2f_device(self): 124 """Find the U2F device 125 126 Returns: 127 0 if the device hasn't been found. Non-zero if it has 128 """ 129 self.device = '' 130 path = '/sys/bus/hid/devices/*:%s:%s.*/hidraw' % (self.VID, self.PID) 131 try: 132 self.device = self.host.run('ls ' + path).stdout.strip() 133 except error.AutoservRunError as e: 134 logging.info('Could not find device') 135 return len(self.device) 136 137 138 def update_u2f_device_path(self): 139 """Get the integrated u2f device.""" 140 start_time = time.time() 141 utils.wait_for_value(self.find_u2f_device, max_threshold=1, 142 timeout_sec=30) 143 wait_time = int(time.time() - start_time) 144 if wait_time: 145 logging.info('Took %ss to find device', wait_time) 146 self.dev_path = '/dev/' + self.device 147 148 149 def check_u2ftest_and_press_power_button(self): 150 """Check stdout and press the power button if prompted 151 152 Returns: 153 True if the process has terminated. 154 """ 155 time.sleep(self.SHORT_WAIT) 156 self.output += self.get_u2ftest_output() 157 logging.info(self.output) 158 if 'Touch device and hit enter..' in self.output: 159 # press the power button 160 self.servo.power_short_press() 161 logging.info('pressed power button') 162 time.sleep(self.SHORT_WAIT) 163 # send enter to the test process 164 self.u2ftest_job.sp.stdin.write(b'\n') 165 self.u2ftest_job.sp.stdin.flush() 166 logging.info('hit enter') 167 self.output = '' 168 return self.u2ftest_job.sp.poll() is not None 169 170 171 def get_u2ftest_output(self): 172 """Read the new output""" 173 self.u2ftest_job.process_output() 174 output = self.stdout.getvalue() 175 self.stdout.seek(self.last_len) 176 self.last_len = len(output) 177 return self.stdout.read().strip() 178 179 def run_u2ftest(self): 180 """Run U2FTest with the U2F device""" 181 self.last_len = 0 182 self.output = '' 183 184 u2ftest_cmd = utils.sh_escape('%s %s' % (self.U2F_TEST_PATH, 185 self.dev_path)) 186 full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'), 187 u2ftest_cmd) 188 self.stdout = six.StringIO() 189 # Start running U2FTest in the background. 190 self.u2ftest_job = utils.BgJob(full_ssh_command, 191 nickname='u2ftest', 192 stdout_tee=self.stdout, 193 stderr_tee=utils.TEE_TO_LOGS, 194 stdin=subprocess.PIPE) 195 if self.u2ftest_job == None: 196 raise error.TestFail('could not start u2ftest') 197 198 try: 199 utils.wait_for_value(self.check_u2ftest_and_press_power_button, 200 expected_value=True, timeout_sec=30) 201 finally: 202 self.close_u2ftest() 203 204 205 def close_u2ftest(self): 206 """Terminate the process and check the results.""" 207 exit_status = utils.nuke_subprocess(self.u2ftest_job.sp) 208 209 stdout = self.stdout.getvalue().strip() 210 if stdout: 211 logging.debug('stdout of U2FTest:\n%s', stdout) 212 if exit_status: 213 logging.error('stderr of U2FTest:\n%s', self.output) 214 raise error.TestError('U2FTest: %s' % self.output) 215 216 217 def run_once(self, host): 218 """Run U2FTest""" 219 self.host = host 220 221 if not self.host.path_exists(self.U2F_TEST_PATH): 222 raise error.TestNAError('Device does not have U2FTest support') 223 224 # u2fd reads files from the user's home dir, so we need to log in. 225 self.host.run('/usr/local/autotest/bin/autologin.py') 226 227 # u2fd needs the policy file to exist. 228 self.wait_for_policy() 229 230 # Wait for OOBE initialiation to complete, as long-running operations 231 # (eg RSA key generation) could cause U2F operations to timeout. 232 self.wait_for_cr50() 233 234 logging.info("testing u2fd --u2f") 235 self.set_u2fd_flags(True, False, False) 236 # Setting the flags restarts u2fd, which will re-create the u2f device. 237 self.update_u2f_device_path() 238 self.run_u2ftest(); 239 240 logging.info("testing u2fd --g2f") 241 self.set_u2fd_flags(False, True, False) 242 self.update_u2f_device_path() 243 self.run_u2ftest(); 244 245 logging.info("testing u2fd --u2f --user_keys") 246 self.set_u2fd_flags(True, False, True) 247 self.update_u2f_device_path() 248 self.run_u2ftest(); 249 250 logging.info("testing u2fd --g2f --user_keys") 251 self.set_u2fd_flags(False, True, True) 252 self.update_u2f_device_path() 253 self.run_u2ftest(); 254