xref: /aosp_15_r20/external/autotest/server/site_tests/firmware_IntegratedU2F/firmware_IntegratedU2F.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import print_function
6
7import logging
8import time
9import six
10import subprocess
11
12from autotest_lib.client.common_lib import error, utils
13from autotest_lib.client.common_lib.cros import tpm_utils
14from autotest_lib.client.cros import constants
15from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
16
17
18class firmware_IntegratedU2F(FirmwareTest):
19    """Verify U2F using the on-board cr50 firmware works."""
20    version = 1
21
22    U2F_TEST_PATH = '/usr/local/bin/U2FTest'
23
24    U2F_FORCE_PATH = '/var/lib/u2f/force/u2f.force'
25    G2F_FORCE_PATH = '/var/lib/u2f/force/g2f.force'
26    USER_KEYS_FORCE_PATH = '/var/lib/u2f/force/user_keys.force'
27
28    VID = '18D1'
29    PID = '502C'
30    SHORT_WAIT = 1
31
32    def cleanup(self):
33        """Remove *.force files"""
34        self.host.run('rm -f /var/lib/u2f/force/*.force')
35
36        # Restart u2fd so that flag change takes effect.
37        self.host.run('restart u2fd')
38
39        # Put the device back to a known state; also restarts the device.
40        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
41
42        super(firmware_IntegratedU2F, self).cleanup()
43
44
45    def u2fd_is_running(self):
46        """Returns True if u2fd is running on the host"""
47        return 'running' in self.host.run('status u2fd').stdout
48
49
50    def cryptohome_ready(self):
51        """Return True if cryptohome is running."""
52        return 'running' in self.host.run('status cryptohomed').stdout
53
54
55    def owner_key_exists(self):
56        """Return True if constants.OWNER_KEY_FILE exists."""
57        logging.info('checking for owner key')
58        return self.host.path_exists(constants.OWNER_KEY_FILE)
59
60
61    def wait_for_policy(self):
62        """Start u2fd on the host"""
63
64        # Wait for cryptohome to show the TPM is ready before logging in.
65        if not utils.wait_for_value(self.cryptohome_ready, True,
66                                    timeout_sec=60):
67            raise error.TestError('Cryptohome did not start')
68
69        # Wait for the owner key to exist before trying to start u2fd.
70        if not utils.wait_for_value(self.owner_key_exists, True,
71                                    timeout_sec=120):
72            raise error.TestError('Device did not create owner key')
73
74
75    def attestation_init_complete(self):
76        """Return True if prepare_for_enrollment has completed"""
77        return 'prepared_for_enrollment: true' in self.host.run(
78            'attestation_client status').stdout
79
80    def chaps_init_complete(self):
81        """Return True if chaps token initialization has completed"""
82        try:
83            return 'available with 2 token' in self.host.run(
84                    'chaps_client --ping').stderr
85        except error.AutoservRunError:
86            logging.info('Chaps no response')
87            return False
88
89    def wait_for_cr50(self):
90        """Wait for cr50 to complete any OOBE initialization"""
91
92        if not utils.wait_for_value(
93                self.attestation_init_complete, True, timeout_sec=120):
94            raise error.TestError('Attestation initialization did not complete')
95
96        if not utils.wait_for_value(
97                self.chaps_init_complete, True, timeout_sec=120):
98            raise error.TestError('Chaps initialization did not complete')
99
100
101    def set_u2fd_flags(self, u2f, g2f, user_keys):
102        # Start by removing all flags.
103        self.host.run('rm -f /var/lib/u2f/force/*.force')
104
105        if u2f:
106            self.host.run('touch %s' % self.U2F_FORCE_PATH)
107
108        if g2f:
109            self.host.run('touch %s' % self.G2F_FORCE_PATH)
110
111        if user_keys:
112            self.host.run('touch %s' % self.USER_KEYS_FORCE_PATH)
113
114        # Restart u2fd so that flag change takes effect.
115        self.host.run('restart u2fd')
116
117        # Make sure it is still running
118        if not self.u2fd_is_running():
119            raise error.TestFail('could not start u2fd')
120        logging.info('u2fd is running')
121
122
123    def find_u2f_device(self):
124        """Find the U2F device
125
126        Returns:
127            0 if the device hasn't been found. Non-zero if it has
128        """
129        self.device = ''
130        path = '/sys/bus/hid/devices/*:%s:%s.*/hidraw' % (self.VID, self.PID)
131        try:
132            self.device = self.host.run('ls ' + path).stdout.strip()
133        except error.AutoservRunError as e:
134            logging.info('Could not find device')
135        return len(self.device)
136
137
138    def update_u2f_device_path(self):
139        """Get the integrated u2f device."""
140        start_time = time.time()
141        utils.wait_for_value(self.find_u2f_device, max_threshold=1,
142            timeout_sec=30)
143        wait_time = int(time.time() - start_time)
144        if wait_time:
145            logging.info('Took %ss to find device', wait_time)
146        self.dev_path = '/dev/' + self.device
147
148
149    def check_u2ftest_and_press_power_button(self):
150        """Check stdout and press the power button if prompted
151
152        Returns:
153            True if the process has terminated.
154        """
155        time.sleep(self.SHORT_WAIT)
156        self.output += self.get_u2ftest_output()
157        logging.info(self.output)
158        if 'Touch device and hit enter..' in self.output:
159            # press the power button
160            self.servo.power_short_press()
161            logging.info('pressed power button')
162            time.sleep(self.SHORT_WAIT)
163            # send enter to the test process
164            self.u2ftest_job.sp.stdin.write(b'\n')
165            self.u2ftest_job.sp.stdin.flush()
166            logging.info('hit enter')
167            self.output = ''
168        return self.u2ftest_job.sp.poll() is not None
169
170
171    def get_u2ftest_output(self):
172        """Read the new output"""
173        self.u2ftest_job.process_output()
174        output = self.stdout.getvalue()
175        self.stdout.seek(self.last_len)
176        self.last_len = len(output)
177        return self.stdout.read().strip()
178
179    def run_u2ftest(self):
180        """Run U2FTest with the U2F device"""
181        self.last_len = 0
182        self.output = ''
183
184        u2ftest_cmd = utils.sh_escape('%s %s' % (self.U2F_TEST_PATH,
185                                                 self.dev_path))
186        full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'),
187            u2ftest_cmd)
188        self.stdout = six.StringIO()
189        # Start running U2FTest in the background.
190        self.u2ftest_job = utils.BgJob(full_ssh_command,
191                                       nickname='u2ftest',
192                                       stdout_tee=self.stdout,
193                                       stderr_tee=utils.TEE_TO_LOGS,
194                                       stdin=subprocess.PIPE)
195        if self.u2ftest_job == None:
196            raise error.TestFail('could not start u2ftest')
197
198        try:
199            utils.wait_for_value(self.check_u2ftest_and_press_power_button,
200                expected_value=True, timeout_sec=30)
201        finally:
202            self.close_u2ftest()
203
204
205    def close_u2ftest(self):
206        """Terminate the process and check the results."""
207        exit_status = utils.nuke_subprocess(self.u2ftest_job.sp)
208
209        stdout = self.stdout.getvalue().strip()
210        if stdout:
211            logging.debug('stdout of U2FTest:\n%s', stdout)
212        if exit_status:
213            logging.error('stderr of U2FTest:\n%s', self.output)
214            raise error.TestError('U2FTest: %s' % self.output)
215
216
217    def run_once(self, host):
218        """Run U2FTest"""
219        self.host = host
220
221        if not self.host.path_exists(self.U2F_TEST_PATH):
222            raise error.TestNAError('Device does not have U2FTest support')
223
224        # u2fd reads files from the user's home dir, so we need to log in.
225        self.host.run('/usr/local/autotest/bin/autologin.py')
226
227        # u2fd needs the policy file to exist.
228        self.wait_for_policy()
229
230        # Wait for OOBE initialiation to complete, as long-running operations
231        # (eg RSA key generation) could cause U2F operations to timeout.
232        self.wait_for_cr50()
233
234        logging.info("testing u2fd --u2f")
235        self.set_u2fd_flags(True, False, False)
236        # Setting the flags restarts u2fd, which will re-create the u2f device.
237        self.update_u2f_device_path()
238        self.run_u2ftest();
239
240        logging.info("testing u2fd --g2f")
241        self.set_u2fd_flags(False, True, False)
242        self.update_u2f_device_path()
243        self.run_u2ftest();
244
245        logging.info("testing u2fd --u2f --user_keys")
246        self.set_u2fd_flags(True, False, True)
247        self.update_u2f_device_path()
248        self.run_u2ftest();
249
250        logging.info("testing u2fd --g2f --user_keys")
251        self.set_u2fd_flags(False, True, True)
252        self.update_u2f_device_path()
253        self.run_u2ftest();
254