xref: /aosp_15_r20/external/autotest/server/cros/servo/servo.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li#
6*9c5db199SXin Li# Expects to be run in an environment with sudo and no interactive password
7*9c5db199SXin Li# prompt, such as within the Chromium OS development chroot.
8*9c5db199SXin Li
9*9c5db199SXin Liimport ast
10*9c5db199SXin Liimport logging
11*9c5db199SXin Liimport os
12*9c5db199SXin Liimport re
13*9c5db199SXin Liimport six
14*9c5db199SXin Liimport six.moves.xmlrpc_client
15*9c5db199SXin Liimport six.moves.http_client
16*9c5db199SXin Liimport time
17*9c5db199SXin Li
18*9c5db199SXin Lifrom autotest_lib.client.common_lib import error
19*9c5db199SXin Lifrom autotest_lib.client.common_lib import lsbrelease_utils
20*9c5db199SXin Lifrom autotest_lib.client.common_lib import seven
21*9c5db199SXin Lifrom autotest_lib.server import utils as server_utils
22*9c5db199SXin Lifrom autotest_lib.server.cros.servo import firmware_programmer
23*9c5db199SXin Lifrom autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig
24*9c5db199SXin Li
25*9c5db199SXin Li
26*9c5db199SXin Li# Regex to match XMLRPC errors due to a servod control not existing.
27*9c5db199SXin Li# Servod uses both 'No control %s' and 'No control named %s' in exceptions.
28*9c5db199SXin LiNO_CONTROL_RE = re.compile(r'No control(?: named)? (?P<name>\w*\.?\w*)')
29*9c5db199SXin Li
30*9c5db199SXin Li# Please see servo/drv/pty_driver.py for error messages to match.
31*9c5db199SXin Li
32*9c5db199SXin Li# This common prefix can apply to all subtypes of console errors.
33*9c5db199SXin Li# The first portion is an optional qualifier of the type
34*9c5db199SXin Li# of error that occurred. Each error is or'd.
35*9c5db199SXin LiCONSOLE_COMMON_RE = (r'((Timeout waiting for response|'
36*9c5db199SXin Li                     r'Known error [\w\'\".\s]+). )?'
37*9c5db199SXin Li                     # The second portion is an optional name for the console
38*9c5db199SXin Li                     # source
39*9c5db199SXin Li                     r'(\w+\: )?')
40*9c5db199SXin Li
41*9c5db199SXin Li# Regex to match XMLRPC errors due to a console being unresponsive.
42*9c5db199SXin LiNO_CONSOLE_OUTPUT_RE = re.compile(r'%sNo data was sent from the pty\.' %
43*9c5db199SXin Li                                  CONSOLE_COMMON_RE)
44*9c5db199SXin Li
45*9c5db199SXin Li
46*9c5db199SXin Li# Regex to match XMLRPC errors due to a console control failing, but the
47*9c5db199SXin Li# underlying Console being responsive.
48*9c5db199SXin LiCONSOLE_MISMATCH_RE = re.compile(r'%sThere was output:' % CONSOLE_COMMON_RE)
49*9c5db199SXin Li
50*9c5db199SXin Li
51*9c5db199SXin Li# The minimum voltage on the charger port on servo v4 that is expected. This is
52*9c5db199SXin Li# to query whether a charger is plugged into servo v4 and thus pd control
53*9c5db199SXin Li# capabilities can be used.
54*9c5db199SXin LiV4_CHG_ATTACHED_MIN_VOLTAGE_MV = 4400
55*9c5db199SXin Li
56*9c5db199SXin Li
57*9c5db199SXin Liclass ControlUnavailableError(error.TestFail):
58*9c5db199SXin Li    """Custom error class to indicate a control is unavailable on servod."""
59*9c5db199SXin Li    pass
60*9c5db199SXin Li
61*9c5db199SXin Li
62*9c5db199SXin Liclass ConsoleError(error.TestFail):
63*9c5db199SXin Li    """Common error class for servod console-back control failures."""
64*9c5db199SXin Li    pass
65*9c5db199SXin Li
66*9c5db199SXin Li
67*9c5db199SXin Liclass UnresponsiveConsoleError(ConsoleError):
68*9c5db199SXin Li    """Error for: A console control fails for lack of console output."""
69*9c5db199SXin Li    pass
70*9c5db199SXin Li
71*9c5db199SXin Li
72*9c5db199SXin Liclass ResponsiveConsoleError(ConsoleError):
73*9c5db199SXin Li    """Error for: A console control fails but console is responsive."""
74*9c5db199SXin Li    pass
75*9c5db199SXin Li
76*9c5db199SXin Li
77*9c5db199SXin Liclass ServodBadResponse(six.moves.http_client.BadStatusLine):
78*9c5db199SXin Li    """Indicates a bad HTTP response from servod"""
79*9c5db199SXin Li
80*9c5db199SXin Li    def __init__(self, when, line):
81*9c5db199SXin Li        """
82*9c5db199SXin Li
83*9c5db199SXin Li        @param when: Description of the operation being performed (get/set)
84*9c5db199SXin Li        @param line: The line that came from the server, often an empty string.
85*9c5db199SXin Li        """
86*9c5db199SXin Li        super(ServodBadResponse, self).__init__(line)
87*9c5db199SXin Li        self.when = when
88*9c5db199SXin Li
89*9c5db199SXin Li    def __str__(self):
90*9c5db199SXin Li        """String representation of the exception"""
91*9c5db199SXin Li        return '%s -- StatusLine=%s' % (self.when, self.line)
92*9c5db199SXin Li
93*9c5db199SXin Li
94*9c5db199SXin Liclass ServodEmptyResponse(ServodBadResponse):
95*9c5db199SXin Li    """Indicates an empty response from servod, possibly because it exited."""
96*9c5db199SXin Li    pass
97*9c5db199SXin Li
98*9c5db199SXin Li
99*9c5db199SXin Liclass ServodConnectionError(seven.SOCKET_ERRORS[0]):
100*9c5db199SXin Li    """Indicates socket errors seen during communication with servod"""
101*9c5db199SXin Li
102*9c5db199SXin Li    def __init__(self, when, errno, strerror, filename):
103*9c5db199SXin Li        """Instance initializer
104*9c5db199SXin Li
105*9c5db199SXin Li        The filename is used to add details to the exception message:
106*9c5db199SXin Li        [Errno 104] Connection reset by peer: "<Servo 'ipaddr:9999'>"
107*9c5db199SXin Li
108*9c5db199SXin Li        @param when: Description of the operation being performed at the time
109*9c5db199SXin Li        @param errno: errno value, such as ECONNRESET
110*9c5db199SXin Li        @param strerror: OS-provided description ("connection reset by peer")
111*9c5db199SXin Li        @param filename: Something to report as a path, such as a socket address
112*9c5db199SXin Li        """
113*9c5db199SXin Li        # [Errno 104] [Setting ctrl:val] Connection reset by peer: <Servo...
114*9c5db199SXin Li        self.when = when
115*9c5db199SXin Li        super(ServodConnectionError, self).__init__(errno, strerror, filename)
116*9c5db199SXin Li
117*9c5db199SXin Li    def __str__(self):
118*9c5db199SXin Li        """String representation of the exception"""
119*9c5db199SXin Li        msgv = [self.when]
120*9c5db199SXin Li        if self.errno is not None or self.strerror is not None:
121*9c5db199SXin Li            msgv.append('--')
122*9c5db199SXin Li        if self.errno is not None:
123*9c5db199SXin Li            msgv.append('[Errno %d]' % self.errno)
124*9c5db199SXin Li        if self.strerror is not None:
125*9c5db199SXin Li            msgv.append(self.strerror)
126*9c5db199SXin Li        return '%s: %r' % (' '.join(msgv), self.filename)
127*9c5db199SXin Li
128*9c5db199SXin Li
129*9c5db199SXin Li# TODO: once in python 3, inherit from AbstractContextManager
130*9c5db199SXin Liclass _WrapServoErrors(object):
131*9c5db199SXin Li    """
132*9c5db199SXin Li    Wrap an operation, replacing BadStatusLine and socket.error with
133*9c5db199SXin Li    servo-specific versions, and extracting exception info from xmlrplib.Fault.
134*9c5db199SXin Li
135*9c5db199SXin Li    @param servo_name: The servo object, used to add the servo name to errors.
136*9c5db199SXin Li                       See the ServodConnectionError docstring.
137*9c5db199SXin Li    @param description:  String to use when describing what was being done
138*9c5db199SXin Li    @raise ServodBadStatusLine: if exception is a httplib.BadStatusLine
139*9c5db199SXin Li    @raise ServodSocketError: if exception is a socket.error
140*9c5db199SXin Li    @raise ControlUnavailableError: if Fault matches NO_CONTROL_RE
141*9c5db199SXin Li    @raise UnresponsiveConsoleError: if Fault matches NO_CONSOLE_OUTPUT_RE
142*9c5db199SXin Li    @raise ResponsiveConsoleError: if Fault matches CONSOLE_MISMATCH_RE
143*9c5db199SXin Li    """
144*9c5db199SXin Li
145*9c5db199SXin Li    def __init__(self, servo, description):
146*9c5db199SXin Li        self.servo_name = str(servo)
147*9c5db199SXin Li        self.description = description
148*9c5db199SXin Li
149*9c5db199SXin Li    @staticmethod
150*9c5db199SXin Li    def _get_xmlrpclib_exception(xmlexc):
151*9c5db199SXin Li        """Get meaningful exception string from xmlrpc.
152*9c5db199SXin Li
153*9c5db199SXin Li        Args:
154*9c5db199SXin Li            xmlexc: xmlrpclib.Fault object
155*9c5db199SXin Li
156*9c5db199SXin Li        xmlrpclib.Fault.faultString has the following format:
157*9c5db199SXin Li
158*9c5db199SXin Li        <type 'exception type'>:'actual error message'
159*9c5db199SXin Li
160*9c5db199SXin Li        Parse and return the real exception from the servod side instead of the
161*9c5db199SXin Li        less meaningful one like,
162*9c5db199SXin Li           <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no
163*9c5db199SXin Li           attribute 'hw_driver'">
164*9c5db199SXin Li
165*9c5db199SXin Li        Returns:
166*9c5db199SXin Li            string of underlying exception raised in servod.
167*9c5db199SXin Li        """
168*9c5db199SXin Li        return re.sub('^.*>:', '', xmlexc.faultString)
169*9c5db199SXin Li
170*9c5db199SXin Li    @staticmethod
171*9c5db199SXin Li    def _log_exception(exc_type, exc_val, exc_tb):
172*9c5db199SXin Li        """Log exception information"""
173*9c5db199SXin Li        if exc_val is not None:
174*9c5db199SXin Li            logging.debug(
175*9c5db199SXin Li                    'Wrapped exception:', exc_info=(exc_type, exc_val, exc_tb))
176*9c5db199SXin Li
177*9c5db199SXin Li    def __enter__(self):
178*9c5db199SXin Li        """Enter the context"""
179*9c5db199SXin Li        return self
180*9c5db199SXin Li
181*9c5db199SXin Li    def __exit__(self, exc_type, exc_val, exc_tb):
182*9c5db199SXin Li        """Exit the context, handling the exception if there was one"""
183*9c5db199SXin Li        try:
184*9c5db199SXin Li            if isinstance(exc_val, six.moves.http_client.BadStatusLine):
185*9c5db199SXin Li                self._log_exception(exc_type, exc_val, exc_tb)
186*9c5db199SXin Li                if exc_val.line in ('', "''"):
187*9c5db199SXin Li                    err = ServodEmptyResponse(self.description, exc_val.line)
188*9c5db199SXin Li                else:
189*9c5db199SXin Li                    err = ServodBadResponse(self.description, exc_val.line)
190*9c5db199SXin Li                six.reraise(err.__class__, err, exc_tb)
191*9c5db199SXin Li
192*9c5db199SXin Li            if isinstance(exc_val, seven.SOCKET_ERRORS):
193*9c5db199SXin Li                self._log_exception(exc_type, exc_val, exc_tb)
194*9c5db199SXin Li                if len(exc_val.args) == 0:
195*9c5db199SXin Li                    errno = None
196*9c5db199SXin Li                    strerror = None
197*9c5db199SXin Li                elif len(exc_val.args) == 1:
198*9c5db199SXin Li                    errno = None
199*9c5db199SXin Li                    strerror = exc_val.args[0]
200*9c5db199SXin Li                else:
201*9c5db199SXin Li                    errno = exc_val.args[0]
202*9c5db199SXin Li                    strerror = exc_val.args[1]
203*9c5db199SXin Li                err = ServodConnectionError(self.description, errno, strerror,
204*9c5db199SXin Li                                            self.servo_name)
205*9c5db199SXin Li                six.reraise(err.__class__, err, exc_tb)
206*9c5db199SXin Li
207*9c5db199SXin Li            if isinstance(exc_val, six.moves.xmlrpc_client.Fault):
208*9c5db199SXin Li                err_str = self._get_xmlrpclib_exception(exc_val)
209*9c5db199SXin Li                err_msg = '%s :: %s' % (self.description, err_str)
210*9c5db199SXin Li                unknown_ctrl = re.search(NO_CONTROL_RE, err_str)
211*9c5db199SXin Li                if not unknown_ctrl:
212*9c5db199SXin Li                    # Log the full text for errors, except unavailable controls.
213*9c5db199SXin Li                    self._log_exception(exc_type, exc_val, exc_tb)
214*9c5db199SXin Li                    logging.debug(err_msg)
215*9c5db199SXin Li                if unknown_ctrl:
216*9c5db199SXin Li                    # The error message for unavailable controls is huge, since
217*9c5db199SXin Li                    # it reports all known controls.  Don't log the full text.
218*9c5db199SXin Li                    unknown_ctrl_name = unknown_ctrl.group('name')
219*9c5db199SXin Li                    logging.debug('%s :: No control named %r',
220*9c5db199SXin Li                                  self.description, unknown_ctrl_name)
221*9c5db199SXin Li                    err = ControlUnavailableError(
222*9c5db199SXin Li                            'No control named %r' % unknown_ctrl_name)
223*9c5db199SXin Li                elif re.search(NO_CONSOLE_OUTPUT_RE, err_str):
224*9c5db199SXin Li                    err = UnresponsiveConsoleError(
225*9c5db199SXin Li                            'Console not printing output. %s.' %
226*9c5db199SXin Li                            self.description)
227*9c5db199SXin Li                elif re.search(CONSOLE_MISMATCH_RE, err_str):
228*9c5db199SXin Li                    err = ResponsiveConsoleError(
229*9c5db199SXin Li                            'Control failed but console alive. %s.' %
230*9c5db199SXin Li                            self.description)
231*9c5db199SXin Li                else:
232*9c5db199SXin Li                    err = error.TestFail(err_msg)
233*9c5db199SXin Li                six.reraise(err.__class__, err, exc_tb)
234*9c5db199SXin Li        finally:
235*9c5db199SXin Li            del exc_tb
236*9c5db199SXin Li
237*9c5db199SXin Li
238*9c5db199SXin Lidef _extract_image_from_tarball(tarball, dest_dir, image_candidates, timeout):
239*9c5db199SXin Li    """Try extracting the image_candidates from the tarball.
240*9c5db199SXin Li
241*9c5db199SXin Li    @param tarball: The path of the tarball.
242*9c5db199SXin Li    @param dest_path: The path of the destination.
243*9c5db199SXin Li    @param image_candidates: A tuple of the paths of image candidates.
244*9c5db199SXin Li    @param timeout: Time to wait in seconds before timing out.
245*9c5db199SXin Li
246*9c5db199SXin Li    @return: The first path from the image candidates, which succeeds, or None
247*9c5db199SXin Li             if all the image candidates fail.
248*9c5db199SXin Li    """
249*9c5db199SXin Li
250*9c5db199SXin Li    # Create the firmware_name subdirectory if it doesn't exist
251*9c5db199SXin Li    if not os.path.exists(dest_dir):
252*9c5db199SXin Li        os.mkdir(dest_dir)
253*9c5db199SXin Li
254*9c5db199SXin Li    # Generate a list of all tarball files
255*9c5db199SXin Li    stdout = server_utils.system_output('tar tf %s' % tarball,
256*9c5db199SXin Li                                        timeout=timeout,
257*9c5db199SXin Li                                        ignore_status=True,
258*9c5db199SXin Li                                        args=image_candidates)
259*9c5db199SXin Li    tarball_files = stdout.splitlines()
260*9c5db199SXin Li
261*9c5db199SXin Li    # Check if image candidates are in the list of tarball files
262*9c5db199SXin Li    for image in image_candidates:
263*9c5db199SXin Li        logging.debug("Trying to extract %s (autotest)", image)
264*9c5db199SXin Li        if image in tarball_files:
265*9c5db199SXin Li            # Extract and return the first image candidate found
266*9c5db199SXin Li            tar_cmd = 'tar xf %s -C %s %s' % (tarball, dest_dir, image)
267*9c5db199SXin Li            status = server_utils.system(tar_cmd,
268*9c5db199SXin Li                                         timeout=timeout,
269*9c5db199SXin Li                                         ignore_status=True)
270*9c5db199SXin Li            if status == 0:
271*9c5db199SXin Li                return image
272*9c5db199SXin Li    return None
273*9c5db199SXin Li
274*9c5db199SXin Li
275*9c5db199SXin Liclass _PowerStateController(object):
276*9c5db199SXin Li
277*9c5db199SXin Li    """Class to provide board-specific power operations.
278*9c5db199SXin Li
279*9c5db199SXin Li    This class is responsible for "power on" and "power off"
280*9c5db199SXin Li    operations that can operate without making assumptions in
281*9c5db199SXin Li    advance about board state.  It offers an interface that
282*9c5db199SXin Li    abstracts out the different sequences required for different
283*9c5db199SXin Li    board types.
284*9c5db199SXin Li
285*9c5db199SXin Li    """
286*9c5db199SXin Li    # Constants acceptable to be passed for the `rec_mode` parameter
287*9c5db199SXin Li    # to power_on().
288*9c5db199SXin Li    #
289*9c5db199SXin Li    # REC_ON:  Boot the DUT in recovery mode, i.e. boot from USB or
290*9c5db199SXin Li    #   SD card.
291*9c5db199SXin Li    # REC_OFF:  Boot in normal mode, i.e. boot from internal storage.
292*9c5db199SXin Li
293*9c5db199SXin Li    REC_ON = 'rec'
294*9c5db199SXin Li    REC_OFF = 'on'
295*9c5db199SXin Li    REC_ON_FORCE_MRC = 'rec_force_mrc'
296*9c5db199SXin Li
297*9c5db199SXin Li    # Delay in seconds needed between asserting and de-asserting
298*9c5db199SXin Li    # warm reset.
299*9c5db199SXin Li    _RESET_HOLD_TIME = 0.5
300*9c5db199SXin Li
301*9c5db199SXin Li
302*9c5db199SXin Li    def __init__(self, servo):
303*9c5db199SXin Li        """Initialize the power state control.
304*9c5db199SXin Li
305*9c5db199SXin Li        @param servo Servo object providing the underlying `set` and `get`
306*9c5db199SXin Li                     methods for the target controls.
307*9c5db199SXin Li
308*9c5db199SXin Li        """
309*9c5db199SXin Li        self._servo = servo
310*9c5db199SXin Li        self.supported = self._servo.has_control('power_state')
311*9c5db199SXin Li        self.last_rec_mode = self.REC_OFF
312*9c5db199SXin Li        if not self.supported:
313*9c5db199SXin Li            logging.info('Servo setup does not support power-state operations. '
314*9c5db199SXin Li                         'All power-state calls will lead to error.TestFail')
315*9c5db199SXin Li
316*9c5db199SXin Li    def _check_supported(self):
317*9c5db199SXin Li        """Throw an error if dts mode control is not supported."""
318*9c5db199SXin Li        if not self.supported:
319*9c5db199SXin Li            raise error.TestFail('power_state controls not supported')
320*9c5db199SXin Li
321*9c5db199SXin Li    def reset(self):
322*9c5db199SXin Li        """Force the DUT to reset.
323*9c5db199SXin Li
324*9c5db199SXin Li        The DUT is guaranteed to be on at the end of this call,
325*9c5db199SXin Li        regardless of its previous state, provided that there is
326*9c5db199SXin Li        working OS software. This also guarantees that the EC has
327*9c5db199SXin Li        been restarted.
328*9c5db199SXin Li
329*9c5db199SXin Li        """
330*9c5db199SXin Li        self._check_supported()
331*9c5db199SXin Li        self._servo.set_nocheck('power_state', 'reset')
332*9c5db199SXin Li
333*9c5db199SXin Li    def cr50_reset(self):
334*9c5db199SXin Li        """Force the DUT to reset.
335*9c5db199SXin Li
336*9c5db199SXin Li        The DUT is guaranteed to be on at the end of this call,
337*9c5db199SXin Li        regardless of its previous state, provided that there is
338*9c5db199SXin Li        working OS software. This also guarantees that the EC has
339*9c5db199SXin Li        been restarted. Works only for ccd connections.
340*9c5db199SXin Li
341*9c5db199SXin Li        """
342*9c5db199SXin Li        self._check_supported()
343*9c5db199SXin Li        self._servo.set_nocheck('power_state', 'cr50_reset')
344*9c5db199SXin Li
345*9c5db199SXin Li    def warm_reset(self):
346*9c5db199SXin Li        """Apply warm reset to the DUT.
347*9c5db199SXin Li
348*9c5db199SXin Li        This asserts, then de-asserts the 'warm_reset' signal.
349*9c5db199SXin Li        Generally, this causes the board to restart.
350*9c5db199SXin Li
351*9c5db199SXin Li        """
352*9c5db199SXin Li        # TODO: warm_reset support has added to power_state.py. Once it
353*9c5db199SXin Li        # available to labstation remove fallback method.
354*9c5db199SXin Li        self._check_supported()
355*9c5db199SXin Li        try:
356*9c5db199SXin Li            self._servo.set_nocheck('power_state', 'warm_reset')
357*9c5db199SXin Li        except error.TestFail as err:
358*9c5db199SXin Li            logging.info("Fallback to warm_reset control method")
359*9c5db199SXin Li            self._servo.set_get_all(['warm_reset:on',
360*9c5db199SXin Li                                 'sleep:%.4f' % self._RESET_HOLD_TIME,
361*9c5db199SXin Li                                 'warm_reset:off'])
362*9c5db199SXin Li
363*9c5db199SXin Li    def power_off(self):
364*9c5db199SXin Li        """Force the DUT to power off.
365*9c5db199SXin Li
366*9c5db199SXin Li        The DUT is guaranteed to be off at the end of this call,
367*9c5db199SXin Li        regardless of its previous state, provided that there is
368*9c5db199SXin Li        working EC and boot firmware.  There is no requirement for
369*9c5db199SXin Li        working OS software.
370*9c5db199SXin Li
371*9c5db199SXin Li        """
372*9c5db199SXin Li        self._check_supported()
373*9c5db199SXin Li        self._servo.set_nocheck('power_state', 'off')
374*9c5db199SXin Li
375*9c5db199SXin Li    def power_on(self, rec_mode=REC_OFF):
376*9c5db199SXin Li        """Force the DUT to power on.
377*9c5db199SXin Li
378*9c5db199SXin Li        Prior to calling this function, the DUT must be powered off,
379*9c5db199SXin Li        e.g. with a call to `power_off()`.
380*9c5db199SXin Li
381*9c5db199SXin Li        At power on, recovery mode is set as specified by the
382*9c5db199SXin Li        corresponding argument.  When booting with recovery mode on, it
383*9c5db199SXin Li        is the caller's responsibility to unplug/plug in a bootable
384*9c5db199SXin Li        external storage device.
385*9c5db199SXin Li
386*9c5db199SXin Li        If the DUT requires a delay after powering on but before
387*9c5db199SXin Li        processing inputs such as USB stick insertion, the delay is
388*9c5db199SXin Li        handled by this method; the caller is not responsible for such
389*9c5db199SXin Li        delays.
390*9c5db199SXin Li
391*9c5db199SXin Li        @param rec_mode Setting of recovery mode to be applied at
392*9c5db199SXin Li                        power on. default: REC_OFF aka 'off'
393*9c5db199SXin Li
394*9c5db199SXin Li        """
395*9c5db199SXin Li        self._check_supported()
396*9c5db199SXin Li        self._servo.set_nocheck('power_state', rec_mode)
397*9c5db199SXin Li        self.last_rec_mode = rec_mode
398*9c5db199SXin Li
399*9c5db199SXin Li    def retry_power_on(self):
400*9c5db199SXin Li        """Retry powering on the DUT.
401*9c5db199SXin Li
402*9c5db199SXin Li        After power_on(...) the system might not come up reliably, although
403*9c5db199SXin Li        the reasons aren't known yet. This function retries turning on the
404*9c5db199SXin Li        system again, trying to bring it in the last state that power_on()
405*9c5db199SXin Li        attempted to reach.
406*9c5db199SXin Li        """
407*9c5db199SXin Li        self._check_supported()
408*9c5db199SXin Li        self._servo.set_nocheck('power_state', self.last_rec_mode)
409*9c5db199SXin Li
410*9c5db199SXin Li
411*9c5db199SXin Liclass _Uart(object):
412*9c5db199SXin Li    """Class to capture UART streams of CPU, EC, Cr50, etc."""
413*9c5db199SXin Li    _UartToCapture = ('cpu', 'cr50', 'ec', 'servo_micro', 'servo_v4', 'usbpd',
414*9c5db199SXin Li                      'ccd_cr50.ec', 'ccd_cr50.cpu', 'ccd_cr50.cr50'
415*9c5db199SXin Li                      'ccd_gsc.ec', 'ccd_gsc.cpu', 'ccd_gsc.cr50')
416*9c5db199SXin Li
417*9c5db199SXin Li
418*9c5db199SXin Li    def __init__(self, servo):
419*9c5db199SXin Li        self._servo = servo
420*9c5db199SXin Li        self._streams = []
421*9c5db199SXin Li        self.logs_dir = None
422*9c5db199SXin Li
423*9c5db199SXin Li    def _start_stop_capture(self, uart, start):
424*9c5db199SXin Li        """Helper function to start/stop capturing on specified UART.
425*9c5db199SXin Li
426*9c5db199SXin Li        @param uart:  The UART name to start/stop capturing.
427*9c5db199SXin Li        @param start:  True to start capturing, otherwise stop.
428*9c5db199SXin Li
429*9c5db199SXin Li        @returns True if the operation completes successfully.
430*9c5db199SXin Li                 False if the UART capturing is not supported or failed due to
431*9c5db199SXin Li                 an error.
432*9c5db199SXin Li        """
433*9c5db199SXin Li        logging.debug('%s capturing %s UART.', 'Start' if start else 'Stop',
434*9c5db199SXin Li                      uart)
435*9c5db199SXin Li        uart_cmd = '%s_uart_capture' % uart
436*9c5db199SXin Li        target_level = 'on' if start else 'off'
437*9c5db199SXin Li        level = None
438*9c5db199SXin Li        if self._servo.has_control(uart_cmd):
439*9c5db199SXin Li            # Do our own implementation of set() here as not_applicable
440*9c5db199SXin Li            # should also count as a valid control.
441*9c5db199SXin Li            logging.debug('Trying to set %s to %s.', uart_cmd, target_level)
442*9c5db199SXin Li            try:
443*9c5db199SXin Li                self._servo.set_nocheck(uart_cmd, target_level)
444*9c5db199SXin Li                level = self._servo.get(uart_cmd)
445*9c5db199SXin Li            except error.TestFail as e:
446*9c5db199SXin Li                # Any sort of test failure here should not stop the test. This
447*9c5db199SXin Li                # is just to capture more output. Log and move on.
448*9c5db199SXin Li                logging.warning('Failed to set %s to %s. %s. Ignoring.',
449*9c5db199SXin Li                                uart_cmd, target_level, str(e))
450*9c5db199SXin Li            if level == target_level:
451*9c5db199SXin Li                logging.debug('Managed to set %s to %s.', uart_cmd, level)
452*9c5db199SXin Li            else:
453*9c5db199SXin Li                logging.debug('Failed to set %s to %s. Got %s.', uart_cmd,
454*9c5db199SXin Li                              target_level, level)
455*9c5db199SXin Li        return level == target_level
456*9c5db199SXin Li
457*9c5db199SXin Li    def start_capture(self):
458*9c5db199SXin Li        """Start capturing UART streams."""
459*9c5db199SXin Li        for uart in self._UartToCapture:
460*9c5db199SXin Li            # Always try to start the uart. Only add it to _streams if it's not
461*9c5db199SXin Li            # in the list.
462*9c5db199SXin Li            if (self._start_stop_capture(uart, True)
463*9c5db199SXin Li                        and uart not in self._streams):
464*9c5db199SXin Li                self._streams.append(uart)
465*9c5db199SXin Li
466*9c5db199SXin Li    def get_logfile(self, uart):
467*9c5db199SXin Li        """Return the path to the uart logfile or none if logs_dir isn't set."""
468*9c5db199SXin Li        if not self.logs_dir:
469*9c5db199SXin Li            return None
470*9c5db199SXin Li        return os.path.join(self.logs_dir, '%s_uart.txt' % uart)
471*9c5db199SXin Li
472*9c5db199SXin Li    def dump(self):
473*9c5db199SXin Li        """Dump UART streams to log files accordingly."""
474*9c5db199SXin Li        if not self.logs_dir:
475*9c5db199SXin Li            return
476*9c5db199SXin Li
477*9c5db199SXin Li        for uart in self._streams:
478*9c5db199SXin Li            logfile_fullname = self.get_logfile(uart)
479*9c5db199SXin Li            stream = '%s_uart_stream' % uart
480*9c5db199SXin Li            try:
481*9c5db199SXin Li                content = self._servo.get(stream)
482*9c5db199SXin Li            except Exception as err:
483*9c5db199SXin Li                logging.warning('Failed to get UART log for %s: %s', stream, err)
484*9c5db199SXin Li                continue
485*9c5db199SXin Li
486*9c5db199SXin Li            if content == 'not_applicable':
487*9c5db199SXin Li                logging.warning('%s is not applicable', stream)
488*9c5db199SXin Li                continue
489*9c5db199SXin Li
490*9c5db199SXin Li            # The UART stream may contain non-printable characters, and servo
491*9c5db199SXin Li            # returns it in string representation. We use `ast.leteral_eval`
492*9c5db199SXin Li            # to revert it back.
493*9c5db199SXin Li            with open(logfile_fullname, 'a') as fd:
494*9c5db199SXin Li                try:
495*9c5db199SXin Li                    fd.write(ast.literal_eval(content))
496*9c5db199SXin Li                except ValueError:
497*9c5db199SXin Li                    logging.exception('Invalid value for %s: %r', stream,
498*9c5db199SXin Li                                      content)
499*9c5db199SXin Li
500*9c5db199SXin Li    def stop_capture(self):
501*9c5db199SXin Li        """Stop capturing UART streams."""
502*9c5db199SXin Li        for uart in self._UartToCapture:
503*9c5db199SXin Li            try:
504*9c5db199SXin Li                self._start_stop_capture(uart, False)
505*9c5db199SXin Li            except Exception as err:
506*9c5db199SXin Li                logging.warning('Failed to stop UART logging for %s: %s', uart,
507*9c5db199SXin Li                             err)
508*9c5db199SXin Li
509*9c5db199SXin Li
510*9c5db199SXin Liclass Servo(object):
511*9c5db199SXin Li
512*9c5db199SXin Li    """Manages control of a Servo board.
513*9c5db199SXin Li
514*9c5db199SXin Li    Servo is a board developed by hardware group to aide in the debug and
515*9c5db199SXin Li    control of various partner devices. Servo's features include the simulation
516*9c5db199SXin Li    of pressing the power button, closing the lid, and pressing Ctrl-d. This
517*9c5db199SXin Li    class manages setting up and communicating with a servo demon (servod)
518*9c5db199SXin Li    process. It provides both high-level functions for common servo tasks and
519*9c5db199SXin Li    low-level functions for directly setting and reading gpios.
520*9c5db199SXin Li
521*9c5db199SXin Li    """
522*9c5db199SXin Li
523*9c5db199SXin Li    # Power button press delays in seconds.
524*9c5db199SXin Li    #
525*9c5db199SXin Li    # The EC specification says that 8.0 seconds should be enough
526*9c5db199SXin Li    # for the long power press.  However, some platforms need a bit
527*9c5db199SXin Li    # more time.  Empirical testing has found these requirements:
528*9c5db199SXin Li    #   Alex: 8.2 seconds
529*9c5db199SXin Li    #   ZGB:  8.5 seconds
530*9c5db199SXin Li    # The actual value is set to the largest known necessary value.
531*9c5db199SXin Li    #
532*9c5db199SXin Li    # TODO(jrbarnette) Being generous is the right thing to do for
533*9c5db199SXin Li    # existing platforms, but if this code is to be used for
534*9c5db199SXin Li    # qualification of new hardware, we should be less generous.
535*9c5db199SXin Li    SHORT_DELAY = 0.1
536*9c5db199SXin Li
537*9c5db199SXin Li    # Maximum number of times to re-read power button on release.
538*9c5db199SXin Li    GET_RETRY_MAX = 10
539*9c5db199SXin Li
540*9c5db199SXin Li    # Delays to deal with DUT state transitions.
541*9c5db199SXin Li    SLEEP_DELAY = 6
542*9c5db199SXin Li    BOOT_DELAY = 10
543*9c5db199SXin Li
544*9c5db199SXin Li    # Default minimum time interval between 'press' and 'release'
545*9c5db199SXin Li    # keyboard events.
546*9c5db199SXin Li    SERVO_KEY_PRESS_DELAY = 0.1
547*9c5db199SXin Li
548*9c5db199SXin Li    # Time to toggle recovery switch on and off.
549*9c5db199SXin Li    REC_TOGGLE_DELAY = 0.1
550*9c5db199SXin Li
551*9c5db199SXin Li    # Time to toggle development switch on and off.
552*9c5db199SXin Li    DEV_TOGGLE_DELAY = 0.1
553*9c5db199SXin Li
554*9c5db199SXin Li    # Time between an usb disk plugged-in and detected in the system.
555*9c5db199SXin Li    USB_DETECTION_DELAY = 5
556*9c5db199SXin Li
557*9c5db199SXin Li    # Time to wait before timing out on servo initialization.
558*9c5db199SXin Li    INIT_TIMEOUT_SECS = 10
559*9c5db199SXin Li
560*9c5db199SXin Li    # Time to wait before timing out when extracting firmware images.
561*9c5db199SXin Li    #
562*9c5db199SXin Li    # This was increased from 60 seconds to support boards with very
563*9c5db199SXin Li    # large (>500MB) firmware archives taking longer than expected to
564*9c5db199SXin Li    # extract firmware on the lab host machines (b/149419503).
565*9c5db199SXin Li    EXTRACT_TIMEOUT_SECS = 900
566*9c5db199SXin Li
567*9c5db199SXin Li    # The VBUS voltage threshold used to detect if VBUS is supplied
568*9c5db199SXin Li    VBUS_THRESHOLD = 3000.0
569*9c5db199SXin Li
570*9c5db199SXin Li    # List of servos that connect to a debug header on the board.
571*9c5db199SXin Li    FLEX_SERVOS = ['c2d2', 'servo_micro', 'servo_v3']
572*9c5db199SXin Li
573*9c5db199SXin Li    # List of servos that rely on gsc commands for some part of dut control.
574*9c5db199SXin Li    GSC_DRV_SERVOS = ['c2d2', 'ccd_gsc', 'ccd_cr50']
575*9c5db199SXin Li
576*9c5db199SXin Li    CCD_PREFIX = 'ccd_'
577*9c5db199SXin Li
578*9c5db199SXin Li    def __init__(self, servo_host, servo_serial=None, delay_init=False):
579*9c5db199SXin Li        """Sets up the servo communication infrastructure.
580*9c5db199SXin Li
581*9c5db199SXin Li        @param servo_host: A ServoHost object representing
582*9c5db199SXin Li                           the host running servod.
583*9c5db199SXin Li        @type servo_host: autotest_lib.server.hosts.servo_host.ServoHost
584*9c5db199SXin Li        @param servo_serial: Serial number of the servo board.
585*9c5db199SXin Li        @param delay_init:  Delay cache servo_type and power_state to prevent
586*9c5db199SXin Li                            attempt to connect to the servod.
587*9c5db199SXin Li        """
588*9c5db199SXin Li        # TODO(fdeng): crbug.com/298379
589*9c5db199SXin Li        # We should move servo_host object out of servo object
590*9c5db199SXin Li        # to minimize the dependencies on the rest of Autotest.
591*9c5db199SXin Li        self._servo_host = servo_host
592*9c5db199SXin Li        self._servo_serial = servo_serial
593*9c5db199SXin Li        self._servo_type = None
594*9c5db199SXin Li        self._power_state = None
595*9c5db199SXin Li        self._programmer = None
596*9c5db199SXin Li        self._prev_log_inode = None
597*9c5db199SXin Li        self._prev_log_size = 0
598*9c5db199SXin Li        self._ccd_watchdog_disabled = False
599*9c5db199SXin Li        if not delay_init:
600*9c5db199SXin Li            self._servo_type = self.get_servo_version()
601*9c5db199SXin Li            self._power_state = _PowerStateController(self)
602*9c5db199SXin Li        self._uart = _Uart(self)
603*9c5db199SXin Li
604*9c5db199SXin Li    def __str__(self):
605*9c5db199SXin Li        """Description of this object and address, for use in errors"""
606*9c5db199SXin Li        return "<%s '%s:%s'>" % (
607*9c5db199SXin Li                type(self).__name__,
608*9c5db199SXin Li                self._servo_host.hostname,
609*9c5db199SXin Li                self._servo_host.servo_port)
610*9c5db199SXin Li
611*9c5db199SXin Li    @property
612*9c5db199SXin Li    def _server(self):
613*9c5db199SXin Li        with _WrapServoErrors(
614*9c5db199SXin Li                servo=self, description='get_servod_server_proxy()'):
615*9c5db199SXin Li            return self._servo_host.get_servod_server_proxy()
616*9c5db199SXin Li
617*9c5db199SXin Li    @property
618*9c5db199SXin Li    def servo_serial(self):
619*9c5db199SXin Li        """Returns the serial number of the servo board."""
620*9c5db199SXin Li        return self._servo_serial
621*9c5db199SXin Li
622*9c5db199SXin Li    def get_power_state_controller(self):
623*9c5db199SXin Li        """Return the power state controller for this Servo.
624*9c5db199SXin Li
625*9c5db199SXin Li        The power state controller provides board-independent
626*9c5db199SXin Li        interfaces for reset, power-on, power-off operations.
627*9c5db199SXin Li
628*9c5db199SXin Li        """
629*9c5db199SXin Li        if self._power_state is None:
630*9c5db199SXin Li            self._power_state = _PowerStateController(self)
631*9c5db199SXin Li        return self._power_state
632*9c5db199SXin Li
633*9c5db199SXin Li
634*9c5db199SXin Li    def initialize_dut(self, cold_reset=False, enable_main=True):
635*9c5db199SXin Li        """Initializes a dut for testing purposes.
636*9c5db199SXin Li
637*9c5db199SXin Li        This sets various servo signals back to default values
638*9c5db199SXin Li        appropriate for the target board.  By default, if the DUT
639*9c5db199SXin Li        is already on, it stays on.  If the DUT is powered off
640*9c5db199SXin Li        before initialization, its state afterward is unspecified.
641*9c5db199SXin Li
642*9c5db199SXin Li        Rationale:  Basic initialization of servo sets the lid open,
643*9c5db199SXin Li        when there is a lid.  This operation won't affect powered on
644*9c5db199SXin Li        units; however, setting the lid open may power on a unit
645*9c5db199SXin Li        that's off, depending on the board type and previous state
646*9c5db199SXin Li        of the device.
647*9c5db199SXin Li
648*9c5db199SXin Li        If `cold_reset` is a true value, the DUT and its EC will be
649*9c5db199SXin Li        reset, and the DUT rebooted in normal mode.
650*9c5db199SXin Li
651*9c5db199SXin Li        @param cold_reset If True, cold reset the device after
652*9c5db199SXin Li                          initialization.
653*9c5db199SXin Li        @param enable_main If True, make sure the main servo device has
654*9c5db199SXin Li                           control of the dut.
655*9c5db199SXin Li
656*9c5db199SXin Li        """
657*9c5db199SXin Li        if enable_main:
658*9c5db199SXin Li            self.enable_main_servo_device()
659*9c5db199SXin Li
660*9c5db199SXin Li        with _WrapServoErrors(
661*9c5db199SXin Li                servo=self, description='initialize_dut()->hwinit()'):
662*9c5db199SXin Li            self._server.hwinit()
663*9c5db199SXin Li        if self.has_control('usb_mux_oe1'):
664*9c5db199SXin Li            self.set('usb_mux_oe1', 'on')
665*9c5db199SXin Li            self.switch_usbkey('off')
666*9c5db199SXin Li        else:
667*9c5db199SXin Li            logging.warning('Servod command \'usb_mux_oe1\' is not available. '
668*9c5db199SXin Li                            'Any USB drive related servo routines will fail.')
669*9c5db199SXin Li        # Create a record of SBU voltages if this is running support servo (v4,
670*9c5db199SXin Li        # v4p1).
671*9c5db199SXin Li        # TODO(coconutruben): eventually, replace this with a metric to track
672*9c5db199SXin Li        # SBU voltages wrt servo-hw/dut-hw
673*9c5db199SXin Li        if self.has_control('servo_dut_sbu1_mv'):
674*9c5db199SXin Li            # Attempt to take a reading of sbu1 and sbu2 multiple times to
675*9c5db199SXin Li            # account for situations where the two lines exchange hi/lo roles
676*9c5db199SXin Li            # frequently.
677*9c5db199SXin Li            for i in range(10):
678*9c5db199SXin Li                try:
679*9c5db199SXin Li                    sbu1 = int(self.get('servo_dut_sbu1_mv'))
680*9c5db199SXin Li                    sbu2 = int(self.get('servo_dut_sbu2_mv'))
681*9c5db199SXin Li                    logging.info('attempt %d sbu1 %d sbu2 %d', i, sbu1, sbu2)
682*9c5db199SXin Li                except error.TestFail as e:
683*9c5db199SXin Li                    # This is a nice to have but if reading this fails, it
684*9c5db199SXin Li                    # shouldn't interfere with the test.
685*9c5db199SXin Li                    logging.exception(e)
686*9c5db199SXin Li        self._uart.start_capture()
687*9c5db199SXin Li        # Run testlab open if servo relies on ccd to control the dut.
688*9c5db199SXin Li        if self.main_device_uses_gsc_drv():
689*9c5db199SXin Li            self.set_nocheck('cr50_testlab', 'open')
690*9c5db199SXin Li        if cold_reset:
691*9c5db199SXin Li            if not self.get_power_state_controller().supported:
692*9c5db199SXin Li                logging.info('Cold-reset for DUT requested, but servo '
693*9c5db199SXin Li                             'setup does not support power_state. Skipping.')
694*9c5db199SXin Li            else:
695*9c5db199SXin Li                self.get_power_state_controller().reset()
696*9c5db199SXin Li        with _WrapServoErrors(
697*9c5db199SXin Li                servo=self, description='initialize_dut()->get_version()'):
698*9c5db199SXin Li            version = self._server.get_version()
699*9c5db199SXin Li        logging.debug('Servo initialized, version is %s', version)
700*9c5db199SXin Li
701*9c5db199SXin Li
702*9c5db199SXin Li    def is_localhost(self):
703*9c5db199SXin Li        """Is the servod hosted locally?
704*9c5db199SXin Li
705*9c5db199SXin Li        Returns:
706*9c5db199SXin Li          True if local hosted; otherwise, False.
707*9c5db199SXin Li        """
708*9c5db199SXin Li        return self._servo_host.is_localhost()
709*9c5db199SXin Li
710*9c5db199SXin Li
711*9c5db199SXin Li    def get_os_version(self):
712*9c5db199SXin Li        """Returns the chromeos release version."""
713*9c5db199SXin Li        lsb_release_content = self.system_output('cat /etc/lsb-release',
714*9c5db199SXin Li                                                 ignore_status=True)
715*9c5db199SXin Li        return lsbrelease_utils.get_chromeos_release_builder_path(
716*9c5db199SXin Li                    lsb_release_content=lsb_release_content)
717*9c5db199SXin Li
718*9c5db199SXin Li
719*9c5db199SXin Li    def get_servod_version(self):
720*9c5db199SXin Li        """Returns the servod version."""
721*9c5db199SXin Li        # TODO: use system_output once servod --sversion prints to stdout
722*9c5db199SXin Li        try:
723*9c5db199SXin Li            result = self._servo_host.run('servod --sversion 2>&1')
724*9c5db199SXin Li        except error.AutoservRunError as e:
725*9c5db199SXin Li            if 'command execution error' in str(e):
726*9c5db199SXin Li                # Fall back to version if sversion is not supported yet.
727*9c5db199SXin Li                result = self._servo_host.run('servod --version 2>&1')
728*9c5db199SXin Li                return result.stdout.strip() or result.stderr.strip()
729*9c5db199SXin Li            # An actually unexpected error occurred, just raise.
730*9c5db199SXin Li            raise e
731*9c5db199SXin Li        sversion = result.stdout or result.stderr
732*9c5db199SXin Li        # The sversion output contains 3 lines:
733*9c5db199SXin Li        # servod v1.0.816-ff8e966 // the extended version with git hash
734*9c5db199SXin Li        # 2020-04-08 01:10:29 // the time of the latest commit
735*9c5db199SXin Li        # chromeos-ci-legacy-us-central1-b-x32-55-u8zc // builder information
736*9c5db199SXin Li        # For debugging purposes, we mainly care about the version, and the
737*9c5db199SXin Li        # timestamp.
738*9c5db199SXin Li        if type(sversion) == type(b' '):
739*9c5db199SXin Li            sversion = sversion.decode("utf-8")
740*9c5db199SXin Li        return ' '.join(sversion.split()[1:4])
741*9c5db199SXin Li
742*9c5db199SXin Li
743*9c5db199SXin Li    def power_long_press(self):
744*9c5db199SXin Li        """Simulate a long power button press."""
745*9c5db199SXin Li        # After a long power press, the EC may ignore the next power
746*9c5db199SXin Li        # button press (at least on Alex).  To guarantee that this
747*9c5db199SXin Li        # won't happen, we need to allow the EC one second to
748*9c5db199SXin Li        # collect itself.
749*9c5db199SXin Li        # long_press is defined as 8.5s in servod
750*9c5db199SXin Li        self.power_key('long_press')
751*9c5db199SXin Li
752*9c5db199SXin Li
753*9c5db199SXin Li    def power_normal_press(self):
754*9c5db199SXin Li        """Simulate a normal power button press."""
755*9c5db199SXin Li        # press is defined as 1.2s in servod
756*9c5db199SXin Li        self.power_key('press')
757*9c5db199SXin Li
758*9c5db199SXin Li
759*9c5db199SXin Li    def power_short_press(self):
760*9c5db199SXin Li        """Simulate a short power button press."""
761*9c5db199SXin Li        # tab is defined as 0.2s in servod
762*9c5db199SXin Li        self.power_key('tab')
763*9c5db199SXin Li
764*9c5db199SXin Li
765*9c5db199SXin Li    def power_key(self, press_secs='tab'):
766*9c5db199SXin Li        """Simulate a power button press.
767*9c5db199SXin Li
768*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
769*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
770*9c5db199SXin Li        """
771*9c5db199SXin Li        # TODO(b/224804060): use the power_key control for all servo types when
772*9c5db199SXin Li        # c2d2 has a defined power_key driver.
773*9c5db199SXin Li        if 'c2d2' not in self.get_servo_type():
774*9c5db199SXin Li            self.set_nocheck('power_key', press_secs)
775*9c5db199SXin Li            return
776*9c5db199SXin Li        if isinstance(press_secs, str):
777*9c5db199SXin Li            if press_secs == 'tab':
778*9c5db199SXin Li                press_secs = 0.2
779*9c5db199SXin Li            elif press_secs == 'press':
780*9c5db199SXin Li                press_secs = 1.2
781*9c5db199SXin Li            elif press_secs == 'long_press':
782*9c5db199SXin Li                press_secs = 8.5
783*9c5db199SXin Li            else:
784*9c5db199SXin Li                raise error.TestError('Invalid press %r' % press_secs)
785*9c5db199SXin Li        logging.info('Manual power button press for %ds', press_secs)
786*9c5db199SXin Li        self.set_nocheck('pwr_button', 'press')
787*9c5db199SXin Li        time.sleep(press_secs)
788*9c5db199SXin Li        self.set_nocheck('pwr_button', 'release')
789*9c5db199SXin Li
790*9c5db199SXin Li
791*9c5db199SXin Li    def pwr_button(self, action='press'):
792*9c5db199SXin Li        """Simulate a power button press.
793*9c5db199SXin Li
794*9c5db199SXin Li        @param action: str; could be press or could be release.
795*9c5db199SXin Li        """
796*9c5db199SXin Li        self.set_nocheck('pwr_button', action)
797*9c5db199SXin Li
798*9c5db199SXin Li
799*9c5db199SXin Li    def lid_open(self):
800*9c5db199SXin Li        """Simulate opening the lid and raise exception if all attempts fail"""
801*9c5db199SXin Li        self.set('lid_open', 'yes')
802*9c5db199SXin Li
803*9c5db199SXin Li
804*9c5db199SXin Li    def lid_close(self):
805*9c5db199SXin Li        """Simulate closing the lid and raise exception if all attempts fail
806*9c5db199SXin Li
807*9c5db199SXin Li        Waits 6 seconds to ensure the device is fully asleep before returning.
808*9c5db199SXin Li        """
809*9c5db199SXin Li        self.set('lid_open', 'no')
810*9c5db199SXin Li        time.sleep(Servo.SLEEP_DELAY)
811*9c5db199SXin Li
812*9c5db199SXin Li
813*9c5db199SXin Li    def vbus_power_get(self):
814*9c5db199SXin Li        """Get current vbus_power."""
815*9c5db199SXin Li        return self.get('vbus_power')
816*9c5db199SXin Li
817*9c5db199SXin Li
818*9c5db199SXin Li    def volume_up(self, timeout=300):
819*9c5db199SXin Li        """Simulate pushing the volume down button.
820*9c5db199SXin Li
821*9c5db199SXin Li        @param timeout: Timeout for setting the volume.
822*9c5db199SXin Li        """
823*9c5db199SXin Li        self.set_get_all(['volume_up:yes',
824*9c5db199SXin Li                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
825*9c5db199SXin Li                          'volume_up:no'])
826*9c5db199SXin Li        # we need to wait for commands to take effect before moving on
827*9c5db199SXin Li        time_left = float(timeout)
828*9c5db199SXin Li        while time_left > 0.0:
829*9c5db199SXin Li            value = self.get('volume_up')
830*9c5db199SXin Li            if value == 'no':
831*9c5db199SXin Li                return
832*9c5db199SXin Li            time.sleep(self.SHORT_DELAY)
833*9c5db199SXin Li            time_left = time_left - self.SHORT_DELAY
834*9c5db199SXin Li        raise error.TestFail("Failed setting volume_up to no")
835*9c5db199SXin Li
836*9c5db199SXin Li    def volume_down(self, timeout=300):
837*9c5db199SXin Li        """Simulate pushing the volume down button.
838*9c5db199SXin Li
839*9c5db199SXin Li        @param timeout: Timeout for setting the volume.
840*9c5db199SXin Li        """
841*9c5db199SXin Li        self.set_get_all(['volume_down:yes',
842*9c5db199SXin Li                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
843*9c5db199SXin Li                          'volume_down:no'])
844*9c5db199SXin Li        # we need to wait for commands to take effect before moving on
845*9c5db199SXin Li        time_left = float(timeout)
846*9c5db199SXin Li        while time_left > 0.0:
847*9c5db199SXin Li            value = self.get('volume_down')
848*9c5db199SXin Li            if value == 'no':
849*9c5db199SXin Li                return
850*9c5db199SXin Li            time.sleep(self.SHORT_DELAY)
851*9c5db199SXin Li            time_left = time_left - self.SHORT_DELAY
852*9c5db199SXin Li        raise error.TestFail("Failed setting volume_down to no")
853*9c5db199SXin Li
854*9c5db199SXin Li    def arrow_up(self, press_secs='tab'):
855*9c5db199SXin Li        """Simulate arrow up key presses.
856*9c5db199SXin Li
857*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
858*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'.
859*9c5db199SXin Li        """
860*9c5db199SXin Li        # TODO: Remove this check after a lab update to include CL:1913684
861*9c5db199SXin Li        if not self.has_control('arrow_up'):
862*9c5db199SXin Li            logging.warning('Control arrow_up ignored. '
863*9c5db199SXin Li                            'Please update hdctools')
864*9c5db199SXin Li            return
865*9c5db199SXin Li        self.set_nocheck('arrow_up', press_secs)
866*9c5db199SXin Li
867*9c5db199SXin Li    def arrow_down(self, press_secs='tab'):
868*9c5db199SXin Li        """Simulate arrow down key presses.
869*9c5db199SXin Li
870*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
871*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'.
872*9c5db199SXin Li        """
873*9c5db199SXin Li        # TODO: Remove this check after a lab update to include CL:1913684
874*9c5db199SXin Li        if not self.has_control('arrow_down'):
875*9c5db199SXin Li            logging.warning('Control arrow_down ignored. '
876*9c5db199SXin Li                            'Please update hdctools')
877*9c5db199SXin Li            return
878*9c5db199SXin Li        self.set_nocheck('arrow_down', press_secs)
879*9c5db199SXin Li
880*9c5db199SXin Li    def ctrl_d(self, press_secs='tab'):
881*9c5db199SXin Li        """Simulate Ctrl-d simultaneous button presses.
882*9c5db199SXin Li
883*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
884*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
885*9c5db199SXin Li        """
886*9c5db199SXin Li        self.set_nocheck('ctrl_d', press_secs)
887*9c5db199SXin Li
888*9c5db199SXin Li
889*9c5db199SXin Li    def ctrl_r(self, press_secs='tab'):
890*9c5db199SXin Li        """Simulate Ctrl-r simultaneous button presses.
891*9c5db199SXin Li
892*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
893*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
894*9c5db199SXin Li        """
895*9c5db199SXin Li        self.set_nocheck('ctrl_r', press_secs)
896*9c5db199SXin Li
897*9c5db199SXin Li    def ctrl_s(self, press_secs='tab'):
898*9c5db199SXin Li        """Simulate Ctrl-s simultaneous button presses.
899*9c5db199SXin Li
900*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
901*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
902*9c5db199SXin Li        """
903*9c5db199SXin Li        self.set_nocheck('ctrl_s', press_secs)
904*9c5db199SXin Li
905*9c5db199SXin Li
906*9c5db199SXin Li    def ctrl_u(self, press_secs='tab'):
907*9c5db199SXin Li        """Simulate Ctrl-u simultaneous button presses.
908*9c5db199SXin Li
909*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
910*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
911*9c5db199SXin Li        """
912*9c5db199SXin Li        self.set_nocheck('ctrl_u', press_secs)
913*9c5db199SXin Li
914*9c5db199SXin Li
915*9c5db199SXin Li    def ctrl_enter(self, press_secs='tab'):
916*9c5db199SXin Li        """Simulate Ctrl-enter simultaneous button presses.
917*9c5db199SXin Li
918*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
919*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
920*9c5db199SXin Li        """
921*9c5db199SXin Li        self.set_nocheck('ctrl_enter', press_secs)
922*9c5db199SXin Li
923*9c5db199SXin Li
924*9c5db199SXin Li    def ctrl_key(self, press_secs='tab'):
925*9c5db199SXin Li        """Simulate Enter key button press.
926*9c5db199SXin Li
927*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
928*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
929*9c5db199SXin Li        """
930*9c5db199SXin Li        self.set_nocheck('ctrl_key', press_secs)
931*9c5db199SXin Li
932*9c5db199SXin Li
933*9c5db199SXin Li    def enter_key(self, press_secs='tab'):
934*9c5db199SXin Li        """Simulate Enter key button press.
935*9c5db199SXin Li
936*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
937*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
938*9c5db199SXin Li        """
939*9c5db199SXin Li        self.set_nocheck('enter_key', press_secs)
940*9c5db199SXin Li
941*9c5db199SXin Li
942*9c5db199SXin Li    def refresh_key(self, press_secs='tab'):
943*9c5db199SXin Li        """Simulate Refresh key (F3) button press.
944*9c5db199SXin Li
945*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
946*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
947*9c5db199SXin Li        """
948*9c5db199SXin Li        self.set_nocheck('refresh_key', press_secs)
949*9c5db199SXin Li
950*9c5db199SXin Li
951*9c5db199SXin Li    def ctrl_refresh_key(self, press_secs='tab'):
952*9c5db199SXin Li        """Simulate Ctrl and Refresh (F3) simultaneous press.
953*9c5db199SXin Li
954*9c5db199SXin Li        This key combination is an alternative of Space key.
955*9c5db199SXin Li
956*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
957*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
958*9c5db199SXin Li        """
959*9c5db199SXin Li        self.set_nocheck('ctrl_refresh_key', press_secs)
960*9c5db199SXin Li
961*9c5db199SXin Li
962*9c5db199SXin Li    def imaginary_key(self, press_secs='tab'):
963*9c5db199SXin Li        """Simulate imaginary key button press.
964*9c5db199SXin Li
965*9c5db199SXin Li        Maps to a key that doesn't physically exist.
966*9c5db199SXin Li
967*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
968*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
969*9c5db199SXin Li        """
970*9c5db199SXin Li        self.set_nocheck('imaginary_key', press_secs)
971*9c5db199SXin Li
972*9c5db199SXin Li
973*9c5db199SXin Li    def sysrq_x(self, press_secs='tab'):
974*9c5db199SXin Li        """Simulate Alt VolumeUp X simulataneous press.
975*9c5db199SXin Li
976*9c5db199SXin Li        This key combination is the kernel system request (sysrq) X.
977*9c5db199SXin Li
978*9c5db199SXin Li        @param press_secs: int, float, str; time to press key in seconds or
979*9c5db199SXin Li                           known shorthand: 'tab' 'press' 'long_press'
980*9c5db199SXin Li        """
981*9c5db199SXin Li        self.set_nocheck('sysrq_x', press_secs)
982*9c5db199SXin Li
983*9c5db199SXin Li
984*9c5db199SXin Li    def toggle_recovery_switch(self):
985*9c5db199SXin Li        """Toggle recovery switch on and off."""
986*9c5db199SXin Li        self.enable_recovery_mode()
987*9c5db199SXin Li        time.sleep(self.REC_TOGGLE_DELAY)
988*9c5db199SXin Li        self.disable_recovery_mode()
989*9c5db199SXin Li
990*9c5db199SXin Li
991*9c5db199SXin Li    def enable_recovery_mode(self):
992*9c5db199SXin Li        """Enable recovery mode on device."""
993*9c5db199SXin Li        self.set('rec_mode', 'on')
994*9c5db199SXin Li
995*9c5db199SXin Li
996*9c5db199SXin Li    def disable_recovery_mode(self):
997*9c5db199SXin Li        """Disable recovery mode on device."""
998*9c5db199SXin Li        self.set('rec_mode', 'off')
999*9c5db199SXin Li
1000*9c5db199SXin Li
1001*9c5db199SXin Li    def toggle_development_switch(self):
1002*9c5db199SXin Li        """Toggle development switch on and off."""
1003*9c5db199SXin Li        self.enable_development_mode()
1004*9c5db199SXin Li        time.sleep(self.DEV_TOGGLE_DELAY)
1005*9c5db199SXin Li        self.disable_development_mode()
1006*9c5db199SXin Li
1007*9c5db199SXin Li
1008*9c5db199SXin Li    def enable_development_mode(self):
1009*9c5db199SXin Li        """Enable development mode on device."""
1010*9c5db199SXin Li        self.set('dev_mode', 'on')
1011*9c5db199SXin Li
1012*9c5db199SXin Li
1013*9c5db199SXin Li    def disable_development_mode(self):
1014*9c5db199SXin Li        """Disable development mode on device."""
1015*9c5db199SXin Li        self.set('dev_mode', 'off')
1016*9c5db199SXin Li
1017*9c5db199SXin Li    def boot_devmode(self):
1018*9c5db199SXin Li        """Boot a dev-mode device that is powered off."""
1019*9c5db199SXin Li        self.power_short_press()
1020*9c5db199SXin Li        self.pass_devmode()
1021*9c5db199SXin Li
1022*9c5db199SXin Li
1023*9c5db199SXin Li    def pass_devmode(self):
1024*9c5db199SXin Li        """Pass through boot screens in dev-mode."""
1025*9c5db199SXin Li        time.sleep(Servo.BOOT_DELAY)
1026*9c5db199SXin Li        self.ctrl_d()
1027*9c5db199SXin Li        time.sleep(Servo.BOOT_DELAY)
1028*9c5db199SXin Li
1029*9c5db199SXin Li
1030*9c5db199SXin Li    def get_board(self):
1031*9c5db199SXin Li        """Get the board connected to servod."""
1032*9c5db199SXin Li        with _WrapServoErrors(servo=self, description='get_board()'):
1033*9c5db199SXin Li            return self._server.get_board()
1034*9c5db199SXin Li
1035*9c5db199SXin Li
1036*9c5db199SXin Li    def get_base_board(self):
1037*9c5db199SXin Li        """Get the board of the base connected to servod."""
1038*9c5db199SXin Li        try:
1039*9c5db199SXin Li            with _WrapServoErrors(servo=self, description='get_base_board()'):
1040*9c5db199SXin Li                return self._server.get_base_board()
1041*9c5db199SXin Li        except six.moves.xmlrpc_client.Fault as e:
1042*9c5db199SXin Li            # TODO(waihong): Remove the following compatibility check when
1043*9c5db199SXin Li            # the new versions of hdctools are deployed.
1044*9c5db199SXin Li            if 'not supported' in str(e):
1045*9c5db199SXin Li                logging.warning('The servod is too old that get_base_board '
1046*9c5db199SXin Li                                'not supported.')
1047*9c5db199SXin Li                return ''
1048*9c5db199SXin Li            raise
1049*9c5db199SXin Li
1050*9c5db199SXin Li    def can_set_active_device(self):
1051*9c5db199SXin Li        """Returns True if the servo setup supports setting the active device
1052*9c5db199SXin Li
1053*9c5db199SXin Li        Servo can only change the active device if there are multiple devices
1054*9c5db199SXin Li        and servo has the active_dut_controller control.
1055*9c5db199SXin Li        """
1056*9c5db199SXin Li        return ('_and_' in self.get_servo_type()
1057*9c5db199SXin Li                and self.has_control('active_dut_controller'))
1058*9c5db199SXin Li
1059*9c5db199SXin Li    def get_active_device_prefix(self):
1060*9c5db199SXin Li        """Return ccd_(gsc|cr50) or '' if the main device is active."""
1061*9c5db199SXin Li        active_device = ''
1062*9c5db199SXin Li        if self.can_set_active_device():
1063*9c5db199SXin Li            # If servo v4 is allowing dual_v4 devices, then choose the
1064*9c5db199SXin Li            # active device.
1065*9c5db199SXin Li            active_device = self.get('active_dut_controller')
1066*9c5db199SXin Li            if active_device == self.get_main_servo_device():
1067*9c5db199SXin Li                active_device = ''
1068*9c5db199SXin Li        return active_device
1069*9c5db199SXin Li
1070*9c5db199SXin Li    def get_ec_board(self):
1071*9c5db199SXin Li        """Get the board name from EC."""
1072*9c5db199SXin Li
1073*9c5db199SXin Li        return self.get('ec_board', prefix=self.get_active_device_prefix())
1074*9c5db199SXin Li
1075*9c5db199SXin Li    def get_ec_active_copy(self):
1076*9c5db199SXin Li        """Get the active copy of the EC image."""
1077*9c5db199SXin Li        return self.get('ec_active_copy')
1078*9c5db199SXin Li
1079*9c5db199SXin Li    def has_control(self, ctrl_name, prefix=''):
1080*9c5db199SXin Li        """Query servod server to determine if |ctrl_name| is a valid control.
1081*9c5db199SXin Li
1082*9c5db199SXin Li        @param ctrl_name Name of the control.
1083*9c5db199SXin Li        @param prefix: prefix to route control to correct servo device.
1084*9c5db199SXin Li
1085*9c5db199SXin Li        @returns: true if |ctrl_name| is a known control, false otherwise.
1086*9c5db199SXin Li        """
1087*9c5db199SXin Li        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1088*9c5db199SXin Li        try:
1089*9c5db199SXin Li            # If the control exists, doc() will work.
1090*9c5db199SXin Li            with _WrapServoErrors(
1091*9c5db199SXin Li                    servo=self,
1092*9c5db199SXin Li                    description='has_control(%s)->doc()' % ctrl_name):
1093*9c5db199SXin Li                self._server.doc(ctrl_name)
1094*9c5db199SXin Li            return True
1095*9c5db199SXin Li        except ControlUnavailableError:
1096*9c5db199SXin Li            return False
1097*9c5db199SXin Li
1098*9c5db199SXin Li    def _build_ctrl_name(self, ctrl_name, prefix):
1099*9c5db199SXin Li        """Helper to build the control name if a prefix is used.
1100*9c5db199SXin Li
1101*9c5db199SXin Li        @param ctrl_name Name of the control.
1102*9c5db199SXin Li        @param prefix: prefix to route control to correct servo device.
1103*9c5db199SXin Li
1104*9c5db199SXin Li        @returns: [|prefix|.]ctrl_name depending on whether prefix is non-empty.
1105*9c5db199SXin Li        """
1106*9c5db199SXin Li        assert ctrl_name
1107*9c5db199SXin Li        if prefix:
1108*9c5db199SXin Li            return '%s.%s' % (prefix, ctrl_name)
1109*9c5db199SXin Li        return ctrl_name
1110*9c5db199SXin Li
1111*9c5db199SXin Li    def get(self, ctrl_name, prefix=''):
1112*9c5db199SXin Li        """Get the value of a gpio from Servod.
1113*9c5db199SXin Li
1114*9c5db199SXin Li        @param ctrl_name Name of the control.
1115*9c5db199SXin Li        @param prefix: prefix to route control to correct servo device.
1116*9c5db199SXin Li
1117*9c5db199SXin Li        @returns: server response to |ctrl_name| request.
1118*9c5db199SXin Li
1119*9c5db199SXin Li        @raise ControlUnavailableError: if |ctrl_name| not a known control.
1120*9c5db199SXin Li        @raise error.TestFail: for all other failures doing get().
1121*9c5db199SXin Li        """
1122*9c5db199SXin Li        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1123*9c5db199SXin Li        with _WrapServoErrors(
1124*9c5db199SXin Li                servo=self, description='Getting %s' % ctrl_name):
1125*9c5db199SXin Li            return self._server.get(ctrl_name)
1126*9c5db199SXin Li
1127*9c5db199SXin Li    def set(self, ctrl_name, ctrl_value, prefix=''):
1128*9c5db199SXin Li        """Set and check the value of a gpio using Servod.
1129*9c5db199SXin Li
1130*9c5db199SXin Li        @param ctrl_name: Name of the control.
1131*9c5db199SXin Li        @param ctrl_value: New setting for the control.
1132*9c5db199SXin Li        @param prefix: prefix to route control to correct servo device.
1133*9c5db199SXin Li        @raise error.TestFail: if the control value fails to change.
1134*9c5db199SXin Li        """
1135*9c5db199SXin Li        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1136*9c5db199SXin Li        self.set_nocheck(ctrl_name, ctrl_value)
1137*9c5db199SXin Li        retry_count = Servo.GET_RETRY_MAX
1138*9c5db199SXin Li        actual_value = self.get(ctrl_name)
1139*9c5db199SXin Li        while ctrl_value != actual_value and retry_count:
1140*9c5db199SXin Li            logging.warning("%s != %s, retry %d", ctrl_name, ctrl_value,
1141*9c5db199SXin Li                            retry_count)
1142*9c5db199SXin Li            retry_count -= 1
1143*9c5db199SXin Li            time.sleep(Servo.SHORT_DELAY)
1144*9c5db199SXin Li            actual_value = self.get(ctrl_name)
1145*9c5db199SXin Li
1146*9c5db199SXin Li        if ctrl_value != actual_value:
1147*9c5db199SXin Li            raise error.TestFail(
1148*9c5db199SXin Li                    'Servo failed to set %s to %s. Got %s.'
1149*9c5db199SXin Li                    % (ctrl_name, ctrl_value, actual_value))
1150*9c5db199SXin Li
1151*9c5db199SXin Li    def set_nocheck(self, ctrl_name, ctrl_value, prefix=''):
1152*9c5db199SXin Li        """Set the value of a gpio using Servod.
1153*9c5db199SXin Li
1154*9c5db199SXin Li        @param ctrl_name Name of the control.
1155*9c5db199SXin Li        @param ctrl_value New setting for the control.
1156*9c5db199SXin Li        @param prefix: prefix to route control to correct servo device.
1157*9c5db199SXin Li
1158*9c5db199SXin Li        @raise ControlUnavailableError: if |ctrl_name| not a known control.
1159*9c5db199SXin Li        @raise error.TestFail: for all other failures doing set().
1160*9c5db199SXin Li        """
1161*9c5db199SXin Li        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1162*9c5db199SXin Li        # The real danger here is to pass a None value through the xmlrpc.
1163*9c5db199SXin Li        assert ctrl_value is not None
1164*9c5db199SXin Li        description = 'Setting %s to %r' % (ctrl_name, ctrl_value)
1165*9c5db199SXin Li        logging.debug('%s', description)
1166*9c5db199SXin Li        with _WrapServoErrors(servo=self, description=description):
1167*9c5db199SXin Li            self._server.set(ctrl_name, ctrl_value)
1168*9c5db199SXin Li
1169*9c5db199SXin Li    def set_get_all(self, controls):
1170*9c5db199SXin Li        """Set &| get one or more control values.
1171*9c5db199SXin Li
1172*9c5db199SXin Li        @param controls: list of strings, controls to set &| get.
1173*9c5db199SXin Li
1174*9c5db199SXin Li        @raise: error.TestError in case error occurs setting/getting values.
1175*9c5db199SXin Li        """
1176*9c5db199SXin Li        description = 'Set/get all: %s' % str(controls)
1177*9c5db199SXin Li        logging.debug('%s', description)
1178*9c5db199SXin Li        with _WrapServoErrors(servo=self, description=description):
1179*9c5db199SXin Li            return self._server.set_get_all(controls)
1180*9c5db199SXin Li
1181*9c5db199SXin Li
1182*9c5db199SXin Li    def probe_host_usb_dev(self):
1183*9c5db199SXin Li        """Probe the USB disk device plugged-in the servo from the host side.
1184*9c5db199SXin Li
1185*9c5db199SXin Li        It uses servod to discover if there is a usb device attached to it.
1186*9c5db199SXin Li
1187*9c5db199SXin Li        @return: String of USB disk path (e.g. '/dev/sdb') or None.
1188*9c5db199SXin Li        """
1189*9c5db199SXin Li        # Set up Servo's usb mux.
1190*9c5db199SXin Li        return self.get('image_usbkey_dev') or None
1191*9c5db199SXin Li
1192*9c5db199SXin Li
1193*9c5db199SXin Li    def image_to_servo_usb(self, image_path=None,
1194*9c5db199SXin Li                           make_image_noninteractive=False,
1195*9c5db199SXin Li                           power_off_dut=True):
1196*9c5db199SXin Li        """Install an image to the USB key plugged into the servo.
1197*9c5db199SXin Li
1198*9c5db199SXin Li        This method may copy any image to the servo USB key including a
1199*9c5db199SXin Li        recovery image or a test image.  These images are frequently used
1200*9c5db199SXin Li        for test purposes such as restoring a corrupted image or conducting
1201*9c5db199SXin Li        an upgrade of ec/fw/kernel as part of a test of a specific image part.
1202*9c5db199SXin Li
1203*9c5db199SXin Li        @param image_path: Path on the host to the recovery image.
1204*9c5db199SXin Li        @param make_image_noninteractive: Make the recovery image
1205*9c5db199SXin Li                                   noninteractive, therefore the DUT
1206*9c5db199SXin Li                                   will reboot automatically after
1207*9c5db199SXin Li                                   installation.
1208*9c5db199SXin Li        @param power_off_dut: To put the DUT in power off mode.
1209*9c5db199SXin Li        """
1210*9c5db199SXin Li        # We're about to start plugging/unplugging the USB key.  We
1211*9c5db199SXin Li        # don't know the state of the DUT, or what it might choose
1212*9c5db199SXin Li        # to do to the device after hotplug.  To avoid surprises,
1213*9c5db199SXin Li        # force the DUT to be off.
1214*9c5db199SXin Li        if power_off_dut:
1215*9c5db199SXin Li            self.get_power_state_controller().power_off()
1216*9c5db199SXin Li
1217*9c5db199SXin Li        if image_path:
1218*9c5db199SXin Li            logging.info('Searching for usb device and copying image to it. '
1219*9c5db199SXin Li                         'Please wait a few minutes...')
1220*9c5db199SXin Li            # The servod control automatically sets up the host in the host
1221*9c5db199SXin Li            # direction.
1222*9c5db199SXin Li            try:
1223*9c5db199SXin Li                self.set_nocheck('download_image_to_usb_dev', image_path)
1224*9c5db199SXin Li            except error.TestFail as e:
1225*9c5db199SXin Li                logging.error('Failed to transfer requested image to USB. %s.'
1226*9c5db199SXin Li                              'Please take a look at Servo Logs.', str(e))
1227*9c5db199SXin Li                raise error.AutotestError('Download image to usb failed.')
1228*9c5db199SXin Li            if make_image_noninteractive:
1229*9c5db199SXin Li                logging.info('Making image noninteractive')
1230*9c5db199SXin Li                try:
1231*9c5db199SXin Li                    dev = self.probe_host_usb_dev()
1232*9c5db199SXin Li                    if not dev:
1233*9c5db199SXin Li                        # This is fine but also should never happen: if we
1234*9c5db199SXin Li                        # successfully download an image but somehow cannot
1235*9c5db199SXin Li                        # find the stick again, it needs to be investigated.
1236*9c5db199SXin Li                        raise error.TestFail('No image usb key detected '
1237*9c5db199SXin Li                                             'after successful download. '
1238*9c5db199SXin Li                                             'Please investigate.')
1239*9c5db199SXin Li                    # The modification has to happen on partition 1.
1240*9c5db199SXin Li                    dev_partition = '%s1' % dev
1241*9c5db199SXin Li                    self.set_nocheck('make_usb_dev_image_noninteractive',
1242*9c5db199SXin Li                                     dev_partition)
1243*9c5db199SXin Li                except error.TestFail as e:
1244*9c5db199SXin Li                    logging.error('Failed to make image noninteractive. %s.'
1245*9c5db199SXin Li                                  'Please take a look at Servo Logs.',
1246*9c5db199SXin Li                                  str(e))
1247*9c5db199SXin Li
1248*9c5db199SXin Li    def boot_in_recovery_mode(self, snk_mode=False):
1249*9c5db199SXin Li        """Boot host DUT in recovery mode.
1250*9c5db199SXin Li
1251*9c5db199SXin Li        @param snk_mode: If True, switch servo_v4 role to 'snk' mode before
1252*9c5db199SXin Li                         boot DUT into recovery mode.
1253*9c5db199SXin Li        """
1254*9c5db199SXin Li        # This call has a built-in delay to ensure that we wait a timeout
1255*9c5db199SXin Li        # for the stick to enumerate and settle on the DUT side.
1256*9c5db199SXin Li        self.switch_usbkey('dut')
1257*9c5db199SXin Li        # Switch servo_v4 mode to snk as the DUT won't able to see usb drive
1258*9c5db199SXin Li        # in recovery mode if the servo is in src mode(see crbug.com/1129165).
1259*9c5db199SXin Li        if snk_mode:
1260*9c5db199SXin Li            logging.info('Setting servo_v4 role to snk mode in order to make'
1261*9c5db199SXin Li                         ' the DUT can see usb drive while in recovery mode.')
1262*9c5db199SXin Li            self.set_servo_v4_role('snk')
1263*9c5db199SXin Li
1264*9c5db199SXin Li        try:
1265*9c5db199SXin Li            power_state = self.get_power_state_controller()
1266*9c5db199SXin Li            power_state.power_on(rec_mode=power_state.REC_ON)
1267*9c5db199SXin Li        except error.TestFail as e:
1268*9c5db199SXin Li            self.set_servo_v4_role('src')
1269*9c5db199SXin Li            logging.error('Failed to boot DUT in recovery mode. %s.', str(e))
1270*9c5db199SXin Li            raise error.AutotestError('Failed to boot DUT in recovery mode.')
1271*9c5db199SXin Li
1272*9c5db199SXin Li    def install_recovery_image(self,
1273*9c5db199SXin Li                               image_path=None,
1274*9c5db199SXin Li                               make_image_noninteractive=False,
1275*9c5db199SXin Li                               snk_mode=False):
1276*9c5db199SXin Li        """Install the recovery image specified by the path onto the DUT.
1277*9c5db199SXin Li
1278*9c5db199SXin Li        This method uses google recovery mode to install a recovery image
1279*9c5db199SXin Li        onto a DUT through the use of a USB stick that is mounted on a servo
1280*9c5db199SXin Li        board specified by the usb_dev.  If no image path is specified
1281*9c5db199SXin Li        we use the recovery image already on the usb image.
1282*9c5db199SXin Li
1283*9c5db199SXin Li        This method will switch servo_v4 role to 'snk' mode in order to make
1284*9c5db199SXin Li        the DUT can see the usb drive plugged on servo, the caller should
1285*9c5db199SXin Li        set servo_v4 role back to 'src' mode one the DUT exit recovery mode.
1286*9c5db199SXin Li
1287*9c5db199SXin Li        @param image_path: Path on the host to the recovery image.
1288*9c5db199SXin Li        @param make_image_noninteractive: Make the recovery image
1289*9c5db199SXin Li                noninteractive, therefore the DUT will reboot automatically
1290*9c5db199SXin Li                after installation.
1291*9c5db199SXin Li        @param snk_mode: If True, switch servo_v4 role to 'snk' mode before
1292*9c5db199SXin Li                         boot DUT into recovery mode.
1293*9c5db199SXin Li        """
1294*9c5db199SXin Li        self.image_to_servo_usb(image_path, make_image_noninteractive)
1295*9c5db199SXin Li        # Give the DUT some time to power_off if we skip
1296*9c5db199SXin Li        # download image to usb. (crbug.com/982993)
1297*9c5db199SXin Li        if not image_path:
1298*9c5db199SXin Li            time.sleep(10)
1299*9c5db199SXin Li        self.boot_in_recovery_mode(snk_mode=snk_mode)
1300*9c5db199SXin Li
1301*9c5db199SXin Li
1302*9c5db199SXin Li    def _scp_image(self, image_path):
1303*9c5db199SXin Li        """Copy image to the servo host.
1304*9c5db199SXin Li
1305*9c5db199SXin Li        When programming a firmware image on the DUT, the image must be
1306*9c5db199SXin Li        located on the host to which the servo device is connected. Sometimes
1307*9c5db199SXin Li        servo is controlled by a remote host, in this case the image needs to
1308*9c5db199SXin Li        be transferred to the remote host. This adds the servod port number, to
1309*9c5db199SXin Li        make sure tests for different DUTs don't trample on each other's files.
1310*9c5db199SXin Li        Along with the firmware image, any subsidiary files in the same
1311*9c5db199SXin Li        directory shall be copied to the host as well.
1312*9c5db199SXin Li
1313*9c5db199SXin Li        @param image_path: a string, name of the firmware image file to be
1314*9c5db199SXin Li               transferred.
1315*9c5db199SXin Li        @return: a string, full path name of the copied file on the remote.
1316*9c5db199SXin Li        """
1317*9c5db199SXin Li        src_path = os.path.dirname(image_path)
1318*9c5db199SXin Li        dest_path = os.path.join('/tmp', 'dut_%d' % self._servo_host.servo_port)
1319*9c5db199SXin Li        logging.info('Copying %s to %s', src_path, dest_path)
1320*9c5db199SXin Li        # Copy a directory, src_path to dest_path. send_file() will create a
1321*9c5db199SXin Li        # directory named basename(src_path) under dest_path, and copy all files
1322*9c5db199SXin Li        # in src_path to the destination.
1323*9c5db199SXin Li        self._servo_host.send_file(src_path, dest_path, delete_dest=True)
1324*9c5db199SXin Li
1325*9c5db199SXin Li        # Make a image path of the destination.
1326*9c5db199SXin Li        # e.g. /tmp/dut_9999/EC/ec.bin
1327*9c5db199SXin Li        rv = os.path.join(dest_path, os.path.basename(src_path))
1328*9c5db199SXin Li        return os.path.join(rv, os.path.basename(image_path))
1329*9c5db199SXin Li
1330*9c5db199SXin Li
1331*9c5db199SXin Li    def system(self, command, timeout=3600):
1332*9c5db199SXin Li        """Execute the passed in command on the servod host.
1333*9c5db199SXin Li
1334*9c5db199SXin Li        @param command Command to be executed.
1335*9c5db199SXin Li        @param timeout Maximum number of seconds of runtime allowed. Default to
1336*9c5db199SXin Li                       1 hour.
1337*9c5db199SXin Li        """
1338*9c5db199SXin Li        logging.info('Will execute on servo host: %s', command)
1339*9c5db199SXin Li        self._servo_host.run(command, timeout=timeout)
1340*9c5db199SXin Li
1341*9c5db199SXin Li
1342*9c5db199SXin Li    def system_output(self, command, timeout=3600,
1343*9c5db199SXin Li                      ignore_status=False, args=()):
1344*9c5db199SXin Li        """Execute the passed in command on the servod host, return stdout.
1345*9c5db199SXin Li
1346*9c5db199SXin Li        @param command a string, the command to execute
1347*9c5db199SXin Li        @param timeout an int, max number of seconds to wait til command
1348*9c5db199SXin Li               execution completes. Default to 1 hour.
1349*9c5db199SXin Li        @param ignore_status a Boolean, if true - ignore command's nonzero exit
1350*9c5db199SXin Li               status, otherwise an exception will be thrown
1351*9c5db199SXin Li        @param args a tuple of strings, each becoming a separate command line
1352*9c5db199SXin Li               parameter for the command
1353*9c5db199SXin Li        @return command's stdout as a string.
1354*9c5db199SXin Li        """
1355*9c5db199SXin Li        return self._servo_host.run(command, timeout=timeout,
1356*9c5db199SXin Li                                    ignore_status=ignore_status,
1357*9c5db199SXin Li                                    args=args).stdout.strip()
1358*9c5db199SXin Li
1359*9c5db199SXin Li
1360*9c5db199SXin Li    def get_servo_version(self, active=False):
1361*9c5db199SXin Li        """Get the version of the servo, e.g., servo_v2 or servo_v3.
1362*9c5db199SXin Li
1363*9c5db199SXin Li        @param active: Only return the servo type with the active device.
1364*9c5db199SXin Li        @return: The version of the servo.
1365*9c5db199SXin Li
1366*9c5db199SXin Li        """
1367*9c5db199SXin Li        with _WrapServoErrors(
1368*9c5db199SXin Li                servo=self, description='get_servo_version()->get_version()'):
1369*9c5db199SXin Li            servo_type = self._server.get_version()
1370*9c5db199SXin Li        if '_and_' not in servo_type or not active:
1371*9c5db199SXin Li            return servo_type
1372*9c5db199SXin Li
1373*9c5db199SXin Li        # If servo v4 is using ccd and servo micro, modify the servo type to
1374*9c5db199SXin Li        # reflect the active device.
1375*9c5db199SXin Li        active_device = self.get('active_dut_controller')
1376*9c5db199SXin Li        if active_device in servo_type:
1377*9c5db199SXin Li            logging.info('%s is active', active_device)
1378*9c5db199SXin Li            return 'servo_v4_with_' + active_device
1379*9c5db199SXin Li
1380*9c5db199SXin Li        logging.warning("%s is active even though it's not in servo type",
1381*9c5db199SXin Li                     active_device)
1382*9c5db199SXin Li        return servo_type
1383*9c5db199SXin Li
1384*9c5db199SXin Li
1385*9c5db199SXin Li    def get_servo_type(self):
1386*9c5db199SXin Li        if self._servo_type is None:
1387*9c5db199SXin Li            self._servo_type = self.get_servo_version()
1388*9c5db199SXin Li        return self._servo_type
1389*9c5db199SXin Li
1390*9c5db199SXin Li    def get_servo_v4_type(self):
1391*9c5db199SXin Li        """Return the servo_v4_type (such as 'type-c'), or None if not v4."""
1392*9c5db199SXin Li        if not hasattr(self, '_servo_v4_type'):
1393*9c5db199SXin Li            if 'servo_v4' in self.get_servo_type():
1394*9c5db199SXin Li                self._servo_v4_type = self.get('root.dut_connection_type')
1395*9c5db199SXin Li            else:
1396*9c5db199SXin Li                self._servo_v4_type = None
1397*9c5db199SXin Li        return self._servo_v4_type
1398*9c5db199SXin Li
1399*9c5db199SXin Li    def is_servo_v4_type_a(self):
1400*9c5db199SXin Li        """True if the servo is v4 and type-a, else False."""
1401*9c5db199SXin Li        return self.get_servo_v4_type() == 'type-a'
1402*9c5db199SXin Li
1403*9c5db199SXin Li    def is_servo_v4_type_c(self):
1404*9c5db199SXin Li        """True if the servo is v4 and type-c, else False."""
1405*9c5db199SXin Li        return self.get_servo_v4_type() == 'type-c'
1406*9c5db199SXin Li
1407*9c5db199SXin Li    def get_main_servo_device(self):
1408*9c5db199SXin Li        """Return the main servo device"""
1409*9c5db199SXin Li        return self.get_servo_type().split('_with_')[-1].split('_and_')[0]
1410*9c5db199SXin Li
1411*9c5db199SXin Li
1412*9c5db199SXin Li    def enable_main_servo_device(self):
1413*9c5db199SXin Li        """Make sure the main device has control of the dut."""
1414*9c5db199SXin Li        if not self.can_set_active_device():
1415*9c5db199SXin Li            return
1416*9c5db199SXin Li        self.set('active_dut_controller', self.get_main_servo_device())
1417*9c5db199SXin Li
1418*9c5db199SXin Li    def get_ccd_servo_device(self):
1419*9c5db199SXin Li        """Return the ccd servo device or '' if no ccd devices are connected."""
1420*9c5db199SXin Li        servo_type = self.get_servo_type()
1421*9c5db199SXin Li        if 'ccd' not in servo_type:
1422*9c5db199SXin Li            return ''
1423*9c5db199SXin Li        return servo_type.split('_with_')[-1].split('_and_')[-1]
1424*9c5db199SXin Li
1425*9c5db199SXin Li    def active_device_is_ccd(self):
1426*9c5db199SXin Li        """Returns True if a ccd device is active."""
1427*9c5db199SXin Li        return 'ccd' in self.get_servo_version(active=True)
1428*9c5db199SXin Li
1429*9c5db199SXin Li    def enable_ccd_servo_device(self):
1430*9c5db199SXin Li        """Make sure the ccd device has control of the dut.
1431*9c5db199SXin Li
1432*9c5db199SXin Li        Returns True if the ccd device is in control of the dut.
1433*9c5db199SXin Li        """
1434*9c5db199SXin Li        if self.active_device_is_ccd():
1435*9c5db199SXin Li            return True
1436*9c5db199SXin Li        ccd_device = self.get_ccd_servo_device()
1437*9c5db199SXin Li        if not self.can_set_active_device() or not ccd_device:
1438*9c5db199SXin Li            return False
1439*9c5db199SXin Li        self.set('active_dut_controller', ccd_device)
1440*9c5db199SXin Li        return True
1441*9c5db199SXin Li
1442*9c5db199SXin Li    def main_device_is_ccd(self):
1443*9c5db199SXin Li        """Whether the main servo device (no prefixes) is a ccd device."""
1444*9c5db199SXin Li        servo = self.get_servo_type()
1445*9c5db199SXin Li        return 'ccd' in servo and not self.main_device_is_flex()
1446*9c5db199SXin Li
1447*9c5db199SXin Li    def main_device_is_flex(self):
1448*9c5db199SXin Li        """Whether the main servo device (no prefixes) is a legacy device."""
1449*9c5db199SXin Li        servo = self.get_servo_type()
1450*9c5db199SXin Li        return any([flex in servo for flex in self.FLEX_SERVOS])
1451*9c5db199SXin Li
1452*9c5db199SXin Li    def main_device_uses_gsc_drv(self):
1453*9c5db199SXin Li        """Whether the main servo device uses gsc drivers.
1454*9c5db199SXin Li
1455*9c5db199SXin Li        Servo may use gsc wp or console commands to control the dut. These
1456*9c5db199SXin Li        get restricted with ccd capabilities. This returns true if some of
1457*9c5db199SXin Li        the servo functionality will be disabled if ccd is restricted.
1458*9c5db199SXin Li        """
1459*9c5db199SXin Li        return self.get_main_servo_device() in self.GSC_DRV_SERVOS
1460*9c5db199SXin Li
1461*9c5db199SXin Li    def _initialize_programmer(self, rw_only=False):
1462*9c5db199SXin Li        """Initialize the firmware programmer.
1463*9c5db199SXin Li
1464*9c5db199SXin Li        @param rw_only: True to initialize a programmer which only
1465*9c5db199SXin Li                        programs the RW portions.
1466*9c5db199SXin Li        """
1467*9c5db199SXin Li        if self._programmer:
1468*9c5db199SXin Li            return
1469*9c5db199SXin Li        # Initialize firmware programmer
1470*9c5db199SXin Li        servo_type = self.get_servo_type()
1471*9c5db199SXin Li        if servo_type.startswith('servo_v2'):
1472*9c5db199SXin Li            self._programmer = firmware_programmer.ProgrammerV2(self)
1473*9c5db199SXin Li            self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self)
1474*9c5db199SXin Li        # Both servo v3 and v4 use the same programming methods so just leverage
1475*9c5db199SXin Li        # ProgrammerV3 for servo v4 as well.
1476*9c5db199SXin Li        elif (servo_type.startswith('servo_v3')
1477*9c5db199SXin Li              or servo_type.startswith('servo_v4')):
1478*9c5db199SXin Li            self._programmer = firmware_programmer.ProgrammerV3(self)
1479*9c5db199SXin Li            self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self)
1480*9c5db199SXin Li        else:
1481*9c5db199SXin Li            raise error.TestError(
1482*9c5db199SXin Li                    'No firmware programmer for servo version: %s' %
1483*9c5db199SXin Li                    self.get_servo_type())
1484*9c5db199SXin Li
1485*9c5db199SXin Li
1486*9c5db199SXin Li    def program_bios(self, image, rw_only=False, copy_image=True):
1487*9c5db199SXin Li        """Program bios on DUT with given image.
1488*9c5db199SXin Li
1489*9c5db199SXin Li        @param image: a string, file name of the BIOS image to program
1490*9c5db199SXin Li                      on the DUT.
1491*9c5db199SXin Li        @param rw_only: True to only program the RW portion of BIOS.
1492*9c5db199SXin Li        @param copy_image: True indicates we need scp the image to servohost
1493*9c5db199SXin Li                           while False means the image file is already on
1494*9c5db199SXin Li                           servohost.
1495*9c5db199SXin Li        @return: a string, full path name of the copied file on the remote.
1496*9c5db199SXin Li        """
1497*9c5db199SXin Li        self._initialize_programmer()
1498*9c5db199SXin Li        # We don't need scp if test runs locally.
1499*9c5db199SXin Li        if copy_image and not self.is_localhost():
1500*9c5db199SXin Li            image = self._scp_image(image)
1501*9c5db199SXin Li        if rw_only:
1502*9c5db199SXin Li            self._programmer_rw.program_bios(image)
1503*9c5db199SXin Li        else:
1504*9c5db199SXin Li            self._programmer.program_bios(image)
1505*9c5db199SXin Li        return image
1506*9c5db199SXin Li
1507*9c5db199SXin Li
1508*9c5db199SXin Li    def program_ec(self, image, rw_only=False, copy_image=True):
1509*9c5db199SXin Li        """Program ec on DUT with given image.
1510*9c5db199SXin Li
1511*9c5db199SXin Li        @param image: a string, file name of the EC image to program
1512*9c5db199SXin Li                      on the DUT.
1513*9c5db199SXin Li        @param rw_only: True to only program the RW portion of EC.
1514*9c5db199SXin Li        @param copy_image: True indicates we need scp the image to servohost
1515*9c5db199SXin Li                           while False means the image file is already on
1516*9c5db199SXin Li                           servohost.
1517*9c5db199SXin Li        @return: a string, full path name of the copied file on the remote.
1518*9c5db199SXin Li        """
1519*9c5db199SXin Li        self._initialize_programmer()
1520*9c5db199SXin Li        # We don't need scp if test runs locally.
1521*9c5db199SXin Li        if copy_image and not self.is_localhost():
1522*9c5db199SXin Li            image = self._scp_image(image)
1523*9c5db199SXin Li        if rw_only:
1524*9c5db199SXin Li            self._programmer_rw.program_ec(image)
1525*9c5db199SXin Li        else:
1526*9c5db199SXin Li            self._programmer.program_ec(image)
1527*9c5db199SXin Li        return image
1528*9c5db199SXin Li
1529*9c5db199SXin Li
1530*9c5db199SXin Li    def extract_ec_image(self, board, model, tarball_path, fake_image=False):
1531*9c5db199SXin Li        """Helper function to extract EC image from downloaded tarball.
1532*9c5db199SXin Li
1533*9c5db199SXin Li        @param board: The DUT board name.
1534*9c5db199SXin Li        @param model: The DUT model name.
1535*9c5db199SXin Li        @param tarball_path: The path of the downloaded build tarball.
1536*9c5db199SXin Li        @param fake_image: True to return a fake zero-filled image instead.
1537*9c5db199SXin Li
1538*9c5db199SXin Li        @return: Path to extracted EC image.
1539*9c5db199SXin Li        """
1540*9c5db199SXin Li
1541*9c5db199SXin Li        # Ignore extracting EC image and re-programming if not a Chrome EC
1542*9c5db199SXin Li        chrome_ec = FAFTConfig(board).chrome_ec
1543*9c5db199SXin Li        if not chrome_ec:
1544*9c5db199SXin Li            logging.warning('Not a Chrome EC, ignore re-programming it')
1545*9c5db199SXin Li            return None
1546*9c5db199SXin Li
1547*9c5db199SXin Li        # Most boards use the model name as the ec directory.
1548*9c5db199SXin Li        ec_image_candidates = ['%s/ec.bin' % model]
1549*9c5db199SXin Li
1550*9c5db199SXin Li        if model == "dragonair":
1551*9c5db199SXin Li            ec_image_candidates.append('dratini/ec.bin')
1552*9c5db199SXin Li
1553*9c5db199SXin Li        # If that isn't found try the name from the EC RO version.
1554*9c5db199SXin Li        try:
1555*9c5db199SXin Li            fw_target = self.get_ec_board().lower()
1556*9c5db199SXin Li            ec_image_candidates.append('%s/ec.bin' % fw_target)
1557*9c5db199SXin Li        except Exception as err:
1558*9c5db199SXin Li            logging.warning('Failed to get ec_board value; ignoring')
1559*9c5db199SXin Li
1560*9c5db199SXin Li        # Fallback to the name of the board, and then a bare ec.bin.
1561*9c5db199SXin Li        ec_image_candidates.append('%s/ec.bin' % board)
1562*9c5db199SXin Li        ec_image_candidates.append('ec.bin')
1563*9c5db199SXin Li
1564*9c5db199SXin Li        # Extract EC image from tarball
1565*9c5db199SXin Li        dest_dir = os.path.join(os.path.dirname(tarball_path), 'EC')
1566*9c5db199SXin Li        ec_image = _extract_image_from_tarball(tarball_path,
1567*9c5db199SXin Li                                               dest_dir,
1568*9c5db199SXin Li                                               ec_image_candidates,
1569*9c5db199SXin Li                                               self.EXTRACT_TIMEOUT_SECS)
1570*9c5db199SXin Li
1571*9c5db199SXin Li        # Check if EC image was found and return path or raise error
1572*9c5db199SXin Li        if ec_image:
1573*9c5db199SXin Li            # Extract subsidiary binaries for EC
1574*9c5db199SXin Li            # Find a monitor binary for NPCX_UUT chip type, if any.
1575*9c5db199SXin Li            mon_candidates = [candidate.replace('ec.bin', 'npcx_monitor.bin')
1576*9c5db199SXin Li                              for candidate in ec_image_candidates]
1577*9c5db199SXin Li            _extract_image_from_tarball(tarball_path, dest_dir, mon_candidates,
1578*9c5db199SXin Li                                        self.EXTRACT_TIMEOUT_SECS)
1579*9c5db199SXin Li
1580*9c5db199SXin Li            if fake_image:
1581*9c5db199SXin Li                # Create a small (25% of original size) zero-filled binary to
1582*9c5db199SXin Li                # replace the real ec_image
1583*9c5db199SXin Li                file_size = os.path.getsize(ec_image) / 4
1584*9c5db199SXin Li                ec_image = os.path.join(os.path.dirname(ec_image),
1585*9c5db199SXin Li                                        "zero_ec.bin")
1586*9c5db199SXin Li                dump_cmd = 'dd if=/dev/zero of=%s bs=4096 count=%d' % (
1587*9c5db199SXin Li                        os.path.join(dest_dir, ec_image), file_size / 4096)
1588*9c5db199SXin Li                if server_utils.system(dump_cmd, ignore_status=True) != 0:
1589*9c5db199SXin Li                    return None
1590*9c5db199SXin Li
1591*9c5db199SXin Li            return os.path.join(dest_dir, ec_image)
1592*9c5db199SXin Li        else:
1593*9c5db199SXin Li            raise error.TestError('Failed to extract EC image from %s' %
1594*9c5db199SXin Li                                  tarball_path)
1595*9c5db199SXin Li
1596*9c5db199SXin Li
1597*9c5db199SXin Li    def extract_bios_image(self, board, model, tarball_path):
1598*9c5db199SXin Li        """Helper function to extract BIOS image from downloaded tarball.
1599*9c5db199SXin Li
1600*9c5db199SXin Li        @param board: The DUT board name.
1601*9c5db199SXin Li        @param model: The DUT model name.
1602*9c5db199SXin Li        @param tarball_path: The path of the downloaded build tarball.
1603*9c5db199SXin Li
1604*9c5db199SXin Li        @return: Path to extracted BIOS image.
1605*9c5db199SXin Li        """
1606*9c5db199SXin Li
1607*9c5db199SXin Li        # Most boards use the model name as the image filename.
1608*9c5db199SXin Li        bios_image_candidates = [
1609*9c5db199SXin Li                'image-%s.bin' % model,
1610*9c5db199SXin Li        ]
1611*9c5db199SXin Li
1612*9c5db199SXin Li        if model == "dragonair":
1613*9c5db199SXin Li            bios_image_candidates.append('image-dratini.bin')
1614*9c5db199SXin Li
1615*9c5db199SXin Li        # If that isn't found try the name from the EC RO version.
1616*9c5db199SXin Li        try:
1617*9c5db199SXin Li            fw_target = self.get_ec_board().lower()
1618*9c5db199SXin Li            bios_image_candidates.append('image-%s.bin' % fw_target)
1619*9c5db199SXin Li        except Exception as err:
1620*9c5db199SXin Li            logging.warning('Failed to get ec_board value; ignoring')
1621*9c5db199SXin Li
1622*9c5db199SXin Li        # Fallback to the name of the board, and then a bare image.bin.
1623*9c5db199SXin Li        bios_image_candidates.append('image-%s.bin' % board)
1624*9c5db199SXin Li        bios_image_candidates.append('image.bin')
1625*9c5db199SXin Li
1626*9c5db199SXin Li        # Extract BIOS image from tarball
1627*9c5db199SXin Li        dest_dir = os.path.join(os.path.dirname(tarball_path), 'BIOS')
1628*9c5db199SXin Li        bios_image = _extract_image_from_tarball(tarball_path,
1629*9c5db199SXin Li                                                 dest_dir,
1630*9c5db199SXin Li                                                 bios_image_candidates,
1631*9c5db199SXin Li                                                 self.EXTRACT_TIMEOUT_SECS)
1632*9c5db199SXin Li
1633*9c5db199SXin Li        # Check if BIOS image was found and return path or raise error
1634*9c5db199SXin Li        if bios_image:
1635*9c5db199SXin Li            return os.path.join(dest_dir, bios_image)
1636*9c5db199SXin Li        else:
1637*9c5db199SXin Li            raise error.TestError('Failed to extract BIOS image from %s' %
1638*9c5db199SXin Li                                  tarball_path)
1639*9c5db199SXin Li
1640*9c5db199SXin Li
1641*9c5db199SXin Li    def switch_usbkey(self, usb_state):
1642*9c5db199SXin Li        """Connect USB flash stick to either host or DUT, or turn USB port off.
1643*9c5db199SXin Li
1644*9c5db199SXin Li        This function switches the servo multiplexer to provide electrical
1645*9c5db199SXin Li        connection between the USB port J3 and either host or DUT side. It
1646*9c5db199SXin Li        can also be used to turn the USB port off.
1647*9c5db199SXin Li
1648*9c5db199SXin Li        @param usb_state: A string, one of 'dut', 'host', or 'off'.
1649*9c5db199SXin Li                          'dut' and 'host' indicate which side the
1650*9c5db199SXin Li                          USB flash device is required to be connected to.
1651*9c5db199SXin Li                          'off' indicates turning the USB port off.
1652*9c5db199SXin Li
1653*9c5db199SXin Li        @raise: error.TestError in case the parameter is not 'dut'
1654*9c5db199SXin Li                'host', or 'off'.
1655*9c5db199SXin Li        """
1656*9c5db199SXin Li        if self.get_usbkey_state() == usb_state:
1657*9c5db199SXin Li            return
1658*9c5db199SXin Li
1659*9c5db199SXin Li        if usb_state == 'off':
1660*9c5db199SXin Li            self.set_nocheck('image_usbkey_pwr', 'off')
1661*9c5db199SXin Li            return
1662*9c5db199SXin Li        elif usb_state == 'host':
1663*9c5db199SXin Li            mux_direction = 'servo_sees_usbkey'
1664*9c5db199SXin Li        elif usb_state == 'dut':
1665*9c5db199SXin Li            mux_direction = 'dut_sees_usbkey'
1666*9c5db199SXin Li        else:
1667*9c5db199SXin Li            raise error.TestError('Unknown USB state request: %s' % usb_state)
1668*9c5db199SXin Li        # On the servod side, this control will ensure that
1669*9c5db199SXin Li        # - the port is power cycled if it is changing directions
1670*9c5db199SXin Li        # - the port ends up in a powered state after this call
1671*9c5db199SXin Li        # - if facing the host side, the call only returns once a usb block
1672*9c5db199SXin Li        #   device is detected, or after a generous timeout (10s)
1673*9c5db199SXin Li        self.set('image_usbkey_direction', mux_direction)
1674*9c5db199SXin Li        # As servod makes no guarantees when switching to the dut side,
1675*9c5db199SXin Li        # add a detection delay here when facing the dut.
1676*9c5db199SXin Li        if mux_direction == 'dut_sees_usbkey':
1677*9c5db199SXin Li            time.sleep(self.USB_DETECTION_DELAY)
1678*9c5db199SXin Li
1679*9c5db199SXin Li    def get_usbkey_state(self):
1680*9c5db199SXin Li        """Get which side USB is connected to or 'off' if usb power is off.
1681*9c5db199SXin Li
1682*9c5db199SXin Li        @return: A string, one of 'dut', 'host', or 'off'.
1683*9c5db199SXin Li        """
1684*9c5db199SXin Li        pwr = self.get('image_usbkey_pwr')
1685*9c5db199SXin Li        if pwr == 'off':
1686*9c5db199SXin Li            return pwr
1687*9c5db199SXin Li        direction = self.get('image_usbkey_direction')
1688*9c5db199SXin Li        if direction == 'servo_sees_usbkey':
1689*9c5db199SXin Li            return 'host'
1690*9c5db199SXin Li        if direction == 'dut_sees_usbkey':
1691*9c5db199SXin Li            return 'dut'
1692*9c5db199SXin Li        raise error.TestFail('image_usbkey_direction set an unknown mux '
1693*9c5db199SXin Li                             'direction: %s' % direction)
1694*9c5db199SXin Li
1695*9c5db199SXin Li    def set_servo_v4_role(self, role):
1696*9c5db199SXin Li        """Set the power role of servo v4, either 'src' or 'snk'.
1697*9c5db199SXin Li
1698*9c5db199SXin Li        It does nothing if not a servo v4.
1699*9c5db199SXin Li
1700*9c5db199SXin Li        @param role: Power role for DUT port on servo v4, either 'src' or 'snk'.
1701*9c5db199SXin Li        """
1702*9c5db199SXin Li        if not self.get_servo_type().startswith('servo_v4'):
1703*9c5db199SXin Li            logging.debug('Not a servo v4, unable to set role to %s.', role)
1704*9c5db199SXin Li            return
1705*9c5db199SXin Li
1706*9c5db199SXin Li        if not self.has_control('servo_pd_role'):
1707*9c5db199SXin Li            logging.debug(
1708*9c5db199SXin Li                    'Servo does not has servo_v4_role control, unable'
1709*9c5db199SXin Li                    ' to set role to %s.', role)
1710*9c5db199SXin Li            return
1711*9c5db199SXin Li
1712*9c5db199SXin Li        value = self.get('servo_pd_role')
1713*9c5db199SXin Li        if value != role:
1714*9c5db199SXin Li            self.set_nocheck('servo_pd_role', role)
1715*9c5db199SXin Li        else:
1716*9c5db199SXin Li            logging.debug('Already in the role: %s.', role)
1717*9c5db199SXin Li
1718*9c5db199SXin Li    def get_servo_v4_role(self):
1719*9c5db199SXin Li        """Get the power role of servo v4, either 'src' or 'snk'.
1720*9c5db199SXin Li
1721*9c5db199SXin Li        It returns None if not a servo v4.
1722*9c5db199SXin Li        """
1723*9c5db199SXin Li        if not self.get_servo_type().startswith('servo_v4'):
1724*9c5db199SXin Li            logging.debug('Not a servo v4, unable to get role')
1725*9c5db199SXin Li            return None
1726*9c5db199SXin Li
1727*9c5db199SXin Li        if not self.has_control('servo_pd_role'):
1728*9c5db199SXin Li            logging.debug(
1729*9c5db199SXin Li                    'Servo does not has servo_v4_role control, unable'
1730*9c5db199SXin Li                    ' to get the role.')
1731*9c5db199SXin Li            return None
1732*9c5db199SXin Li
1733*9c5db199SXin Li        return self.get('servo_pd_role')
1734*9c5db199SXin Li
1735*9c5db199SXin Li    def set_servo_v4_pd_comm(self, en):
1736*9c5db199SXin Li        """Set the PD communication of servo v4, either 'on' or 'off'.
1737*9c5db199SXin Li
1738*9c5db199SXin Li        It does nothing if not a servo v4.
1739*9c5db199SXin Li
1740*9c5db199SXin Li        @param en: a string of 'on' or 'off' for PD communication.
1741*9c5db199SXin Li        """
1742*9c5db199SXin Li        if self.get_servo_type().startswith('servo_v4'):
1743*9c5db199SXin Li            self.set_nocheck('servo_pd_comm', en)
1744*9c5db199SXin Li        else:
1745*9c5db199SXin Li            logging.debug('Not a servo v4, unable to set PD comm to %s.', en)
1746*9c5db199SXin Li
1747*9c5db199SXin Li    def supports_built_in_pd_control(self):
1748*9c5db199SXin Li        """Return whether the servo type supports pd charging and control."""
1749*9c5db199SXin Li        # Only servo v4 type-c supports this feature.
1750*9c5db199SXin Li        if not self.is_servo_v4_type_c():
1751*9c5db199SXin Li            logging.info('PD controls require a servo v4 type-c.')
1752*9c5db199SXin Li            return False
1753*9c5db199SXin Li        # Lastly, one cannot really do anything without a plugged in charger.
1754*9c5db199SXin Li        chg_port_mv = self.get('ppchg5_mv')
1755*9c5db199SXin Li        if chg_port_mv < V4_CHG_ATTACHED_MIN_VOLTAGE_MV:
1756*9c5db199SXin Li            logging.info(
1757*9c5db199SXin Li                    'It appears that no charger is plugged into servo v4. '
1758*9c5db199SXin Li                    'Charger port voltage: %dmV', chg_port_mv)
1759*9c5db199SXin Li            return False
1760*9c5db199SXin Li        logging.info('Charger port voltage: %dmV', chg_port_mv)
1761*9c5db199SXin Li        return True
1762*9c5db199SXin Li
1763*9c5db199SXin Li    def dts_mode_is_valid(self):
1764*9c5db199SXin Li        """Return whether servo setup supports dts mode control for cr50."""
1765*9c5db199SXin Li        # Only servo v4 type-c supports this feature.
1766*9c5db199SXin Li        return self.is_servo_v4_type_c()
1767*9c5db199SXin Li
1768*9c5db199SXin Li    def dts_mode_is_safe(self):
1769*9c5db199SXin Li        """Return whether servo setup supports dts mode without losing access.
1770*9c5db199SXin Li
1771*9c5db199SXin Li        DTS mode control exists but the main device might go through ccd.
1772*9c5db199SXin Li        In that case, it's only safe to control dts mode if the main device
1773*9c5db199SXin Li        is legacy as otherwise the connection to the main device cuts out.
1774*9c5db199SXin Li        """
1775*9c5db199SXin Li        return self.dts_mode_is_valid() and self.main_device_is_flex()
1776*9c5db199SXin Li
1777*9c5db199SXin Li    def get_dts_mode(self):
1778*9c5db199SXin Li        """Return servo dts mode.
1779*9c5db199SXin Li
1780*9c5db199SXin Li        @returns: on/off whether dts is on or off
1781*9c5db199SXin Li        """
1782*9c5db199SXin Li        if not self.dts_mode_is_valid():
1783*9c5db199SXin Li            logging.info('Not a valid servo setup. Unable to get dts mode.')
1784*9c5db199SXin Li            return
1785*9c5db199SXin Li        return self.get('servo_dts_mode')
1786*9c5db199SXin Li
1787*9c5db199SXin Li    def ccd_watchdog_enable(self, enable):
1788*9c5db199SXin Li        """Control the ccd watchdog."""
1789*9c5db199SXin Li        if 'ccd' not in self.get_servo_type():
1790*9c5db199SXin Li            return
1791*9c5db199SXin Li        if self._ccd_watchdog_disabled and enable:
1792*9c5db199SXin Li            logging.info('CCD watchdog disabled for test')
1793*9c5db199SXin Li            return
1794*9c5db199SXin Li        control = 'watchdog_add' if enable else 'watchdog_remove'
1795*9c5db199SXin Li        self.set_nocheck(control, 'ccd')
1796*9c5db199SXin Li
1797*9c5db199SXin Li    def disable_ccd_watchdog_for_test(self):
1798*9c5db199SXin Li        """Prevent servo from enabling the watchdog."""
1799*9c5db199SXin Li        self._ccd_watchdog_disabled = True
1800*9c5db199SXin Li        self.ccd_watchdog_enable(False)
1801*9c5db199SXin Li
1802*9c5db199SXin Li    def allow_ccd_watchdog_for_test(self):
1803*9c5db199SXin Li        """Allow servo to enable the ccd watchdog."""
1804*9c5db199SXin Li        self._ccd_watchdog_disabled = False
1805*9c5db199SXin Li        self.ccd_watchdog_enable(True)
1806*9c5db199SXin Li
1807*9c5db199SXin Li    def set_dts_mode(self, state):
1808*9c5db199SXin Li        """Set servo dts mode to off or on.
1809*9c5db199SXin Li
1810*9c5db199SXin Li        It does nothing if not a servo v4. Disable the ccd watchdog if we're
1811*9c5db199SXin Li        disabling dts mode. CCD will disconnect. The watchdog only allows CCD
1812*9c5db199SXin Li        to disconnect for 10 seconds until it kills servod. Disable the
1813*9c5db199SXin Li        watchdog, so CCD can stay disconnected indefinitely.
1814*9c5db199SXin Li
1815*9c5db199SXin Li        @param state: Set servo v4 dts mode 'off' or 'on'.
1816*9c5db199SXin Li        """
1817*9c5db199SXin Li        if not self.dts_mode_is_valid():
1818*9c5db199SXin Li            logging.info('Not a valid servo setup. Unable to set dts mode %s.',
1819*9c5db199SXin Li                         state)
1820*9c5db199SXin Li            return
1821*9c5db199SXin Li
1822*9c5db199SXin Li        enable_watchdog = state == 'on'
1823*9c5db199SXin Li
1824*9c5db199SXin Li        if not enable_watchdog:
1825*9c5db199SXin Li            self.ccd_watchdog_enable(False)
1826*9c5db199SXin Li
1827*9c5db199SXin Li        self.set_nocheck('servo_dts_mode', state)
1828*9c5db199SXin Li
1829*9c5db199SXin Li        if enable_watchdog:
1830*9c5db199SXin Li            self.ccd_watchdog_enable(True)
1831*9c5db199SXin Li
1832*9c5db199SXin Li
1833*9c5db199SXin Li    def _get_servo_type_fw_version(self, servo_type, prefix=''):
1834*9c5db199SXin Li        """Helper to handle fw retrieval for micro/v4 vs ccd.
1835*9c5db199SXin Li
1836*9c5db199SXin Li        @param servo_type: one of 'servo_v4', 'servo_micro', 'c2d2',
1837*9c5db199SXin Li                           'ccd_cr50', or 'ccd_gsc'
1838*9c5db199SXin Li        @param prefix: whether the control has a prefix
1839*9c5db199SXin Li
1840*9c5db199SXin Li        @returns: fw version for non-ccd devices, cr50 version for ccd device
1841*9c5db199SXin Li        """
1842*9c5db199SXin Li        # If it's a ccd device, remove the 'ccd_' prefix to find the firmware
1843*9c5db199SXin Li        # name.
1844*9c5db199SXin Li        if servo_type.startswith(self.CCD_PREFIX):
1845*9c5db199SXin Li            servo_type = servo_type[len(self.CCD_PREFIX)::]
1846*9c5db199SXin Li        cmd = '%s_version' % servo_type
1847*9c5db199SXin Li        try:
1848*9c5db199SXin Li            return self.get(cmd, prefix=prefix)
1849*9c5db199SXin Li        except error.TestFail:
1850*9c5db199SXin Li            # Do not fail here, simply report the version as unknown.
1851*9c5db199SXin Li            logging.warning('Unable to query %r to get servo fw version.', cmd)
1852*9c5db199SXin Li            return 'unknown'
1853*9c5db199SXin Li
1854*9c5db199SXin Li
1855*9c5db199SXin Li    def get_servo_fw_versions(self):
1856*9c5db199SXin Li        """Retrieve a summary of attached servos and their firmware.
1857*9c5db199SXin Li
1858*9c5db199SXin Li        Note: that only the Google firmware owned servos supports this e.g.
1859*9c5db199SXin Li        micro, v4, etc. For anything else, the dictionary will have no entry.
1860*9c5db199SXin Li        If no device is has Google owned firmware (e.g. v3) then the result
1861*9c5db199SXin Li        is an empty dictionary.
1862*9c5db199SXin Li
1863*9c5db199SXin Li        @returns: dict, a collection of each attached servo & their firmware.
1864*9c5db199SXin Li        """
1865*9c5db199SXin Li        def get_fw_version_tag(tag, dev):
1866*9c5db199SXin Li            return '%s_version.%s' % (dev, tag)
1867*9c5db199SXin Li
1868*9c5db199SXin Li        fw_versions = {}
1869*9c5db199SXin Li        # Note, this works because v4p1 starts with v4 as well.
1870*9c5db199SXin Li        # TODO(coconutruben): make this more robust so that it can work on
1871*9c5db199SXin Li        # a future v-whatever as well.
1872*9c5db199SXin Li        if 'servo_v4' not in self.get_servo_type():
1873*9c5db199SXin Li            return {}
1874*9c5db199SXin Li        # v4 or v4p1
1875*9c5db199SXin Li        v4_flavor = self.get_servo_type().split('_with_')[0]
1876*9c5db199SXin Li        v4_tag = get_fw_version_tag('root', v4_flavor)
1877*9c5db199SXin Li        fw_versions[v4_tag] = self._get_servo_type_fw_version('servo_fw',
1878*9c5db199SXin Li                                                              prefix='root')
1879*9c5db199SXin Li        if 'with' in self.get_servo_type():
1880*9c5db199SXin Li            dut_devs = self.get_servo_type().split('_with_')[1].split('_and_')
1881*9c5db199SXin Li            main_tag = get_fw_version_tag('main', dut_devs[0])
1882*9c5db199SXin Li            fw_versions[main_tag] = self._get_servo_type_fw_version(dut_devs[0])
1883*9c5db199SXin Li            if len(dut_devs) == 2:
1884*9c5db199SXin Li                # Right now, the only way for this to happen is for a dual setup
1885*9c5db199SXin Li                # to exist where ccd is attached on top of servo micro. Thus, we
1886*9c5db199SXin Li                # know that the prefix is ccd_cr50 and the type is ccd_cr50.
1887*9c5db199SXin Li                # TODO(coconutruben): If the new servod is not deployed by
1888*9c5db199SXin Li                # the time that there are more cases of '_and_' devices,
1889*9c5db199SXin Li                # this needs to be reworked.
1890*9c5db199SXin Li                dual_tag = get_fw_version_tag('ccd_flex_secondary', dut_devs[1])
1891*9c5db199SXin Li                fw = self._get_servo_type_fw_version(dut_devs[1], dut_devs[1])
1892*9c5db199SXin Li                fw_versions[dual_tag] = fw
1893*9c5db199SXin Li        return fw_versions
1894*9c5db199SXin Li
1895*9c5db199SXin Li    @property
1896*9c5db199SXin Li    def uart_logs_dir(self):
1897*9c5db199SXin Li        """Return the directory to save UART logs."""
1898*9c5db199SXin Li        return self._uart.logs_dir if self._uart else ""
1899*9c5db199SXin Li
1900*9c5db199SXin Li
1901*9c5db199SXin Li    @uart_logs_dir.setter
1902*9c5db199SXin Li    def uart_logs_dir(self, logs_dir):
1903*9c5db199SXin Li        """Set directory to save UART logs.
1904*9c5db199SXin Li
1905*9c5db199SXin Li        @param logs_dir  String of directory name."""
1906*9c5db199SXin Li        self._uart.logs_dir = logs_dir
1907*9c5db199SXin Li
1908*9c5db199SXin Li    def get_uart_logfile(self, uart):
1909*9c5db199SXin Li        """Return the path to the uart log file."""
1910*9c5db199SXin Li        return self._uart.get_logfile(uart)
1911*9c5db199SXin Li
1912*9c5db199SXin Li    def record_uart_capture(self, outdir=None):
1913*9c5db199SXin Li        """Save uart stream output."""
1914*9c5db199SXin Li        if outdir and not self.uart_logs_dir:
1915*9c5db199SXin Li            self.uart_logs_dir = outdir
1916*9c5db199SXin Li        self._uart.dump()
1917*9c5db199SXin Li
1918*9c5db199SXin Li    def close(self, outdir=None):
1919*9c5db199SXin Li        """Close the servo object."""
1920*9c5db199SXin Li        # We want to ensure that servo_v4 is in src mode to avoid DUTs
1921*9c5db199SXin Li        # left in discharge state after a task.
1922*9c5db199SXin Li        try:
1923*9c5db199SXin Li            self.set_servo_v4_role('src')
1924*9c5db199SXin Li        except Exception as e:
1925*9c5db199SXin Li            logging.info(
1926*9c5db199SXin Li                    'Unexpected error while setting servo_v4 role'
1927*9c5db199SXin Li                    ' to src; %s', e)
1928*9c5db199SXin Li
1929*9c5db199SXin Li        self._uart.stop_capture()
1930*9c5db199SXin Li        self.record_uart_capture(outdir)
1931*9c5db199SXin Li
1932*9c5db199SXin Li    def ec_reboot(self):
1933*9c5db199SXin Li        """Reboot Just the embedded controller."""
1934*9c5db199SXin Li        self.set_nocheck('ec_uart_flush', 'off')
1935*9c5db199SXin Li        self.set_nocheck('ec_uart_cmd', 'reboot')
1936*9c5db199SXin Li        self.set_nocheck('ec_uart_flush', 'on')
1937*9c5db199SXin Li
1938*9c5db199SXin Li    def get_vbus_voltage(self):
1939*9c5db199SXin Li        """Get the voltage of VBUS'.
1940*9c5db199SXin Li
1941*9c5db199SXin Li        @returns The voltage of VBUS, if vbus_voltage is supported.
1942*9c5db199SXin Li                 None               , if vbus_voltage is not supported.
1943*9c5db199SXin Li        """
1944*9c5db199SXin Li        if not self.has_control('vbus_voltage'):
1945*9c5db199SXin Li            logging.debug('Servo does not have vbus_voltage control,'
1946*9c5db199SXin Li                          'unable to get vbus voltage')
1947*9c5db199SXin Li            return None
1948*9c5db199SXin Li
1949*9c5db199SXin Li        return self.get('vbus_voltage')
1950*9c5db199SXin Li
1951*9c5db199SXin Li    def supports_eth_power_control(self):
1952*9c5db199SXin Li        """True if servo supports power management for ethernet dongle."""
1953*9c5db199SXin Li        return self.has_control('dut_eth_pwr_en')
1954*9c5db199SXin Li
1955*9c5db199SXin Li    def set_eth_power(self, state):
1956*9c5db199SXin Li        """Set ethernet dongle power state, either 'on' or 'off'.
1957*9c5db199SXin Li
1958*9c5db199SXin Li        Note: this functionality is supported only on servo v4p1.
1959*9c5db199SXin Li
1960*9c5db199SXin Li        @param state: a string of 'on' or 'off'.
1961*9c5db199SXin Li        """
1962*9c5db199SXin Li        if state != 'off' and state != 'on':
1963*9c5db199SXin Li            raise error.TestError('Unknown ethernet power state request: %s' %
1964*9c5db199SXin Li                                  state)
1965*9c5db199SXin Li
1966*9c5db199SXin Li        if not self.supports_eth_power_control():
1967*9c5db199SXin Li            logging.info('Not a supported servo setup. Unable to set ethernet'
1968*9c5db199SXin Li                         'dongle power state %s.', state)
1969*9c5db199SXin Li            return
1970*9c5db199SXin Li
1971*9c5db199SXin Li        self.set_nocheck('dut_eth_pwr_en', state)
1972*9c5db199SXin Li
1973*9c5db199SXin Li    def eth_power_reset(self):
1974*9c5db199SXin Li        """Reset ethernet dongle power state if supported'.
1975*9c5db199SXin Li
1976*9c5db199SXin Li        It does nothing if servo setup does not support power management for
1977*9c5db199SXin Li        the etherent dongle, only log information about this.
1978*9c5db199SXin Li        """
1979*9c5db199SXin Li        if self.supports_eth_power_control():
1980*9c5db199SXin Li            logging.info("Resetting servo's Ethernet controller...")
1981*9c5db199SXin Li            self.set_eth_power('off')
1982*9c5db199SXin Li            time.sleep(1)
1983*9c5db199SXin Li            self.set_eth_power('on')
1984*9c5db199SXin Li        else:
1985*9c5db199SXin Li            logging.info("Trying to reset servo's Ethernet controller, but"
1986*9c5db199SXin Li                         "this feature is not supported on used servo setup.")
1987