xref: /aosp_15_r20/external/autotest/server/server_job.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Li# pylint: disable-msg=C0111
3*9c5db199SXin Li
4*9c5db199SXin Li# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
5*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
6*9c5db199SXin Li# found in the LICENSE file.
7*9c5db199SXin Li"""
8*9c5db199SXin LiThe main job wrapper for the server side.
9*9c5db199SXin Li
10*9c5db199SXin LiThis is the core infrastructure. Derived from the client side job.py
11*9c5db199SXin Li
12*9c5db199SXin LiCopyright Martin J. Bligh, Andy Whitcroft 2007
13*9c5db199SXin Li"""
14*9c5db199SXin Li
15*9c5db199SXin Lifrom __future__ import absolute_import
16*9c5db199SXin Lifrom __future__ import division
17*9c5db199SXin Lifrom __future__ import print_function
18*9c5db199SXin Li
19*9c5db199SXin Liimport errno
20*9c5db199SXin Liimport fcntl
21*9c5db199SXin Liimport getpass
22*9c5db199SXin Liimport logging
23*9c5db199SXin Liimport os
24*9c5db199SXin Liimport pickle
25*9c5db199SXin Liimport platform
26*9c5db199SXin Liimport re
27*9c5db199SXin Liimport select
28*9c5db199SXin Liimport shutil
29*9c5db199SXin Liimport sys
30*9c5db199SXin Liimport tempfile
31*9c5db199SXin Liimport time
32*9c5db199SXin Liimport traceback
33*9c5db199SXin Liimport uuid
34*9c5db199SXin Liimport warnings
35*9c5db199SXin Li
36*9c5db199SXin Lifrom datetime import datetime
37*9c5db199SXin Li
38*9c5db199SXin Lifrom autotest_lib.client.bin import sysinfo
39*9c5db199SXin Lifrom autotest_lib.client.common_lib import base_job
40*9c5db199SXin Lifrom autotest_lib.client.common_lib import control_data
41*9c5db199SXin Lifrom autotest_lib.client.common_lib import error
42*9c5db199SXin Lifrom autotest_lib.client.common_lib import logging_manager
43*9c5db199SXin Lifrom autotest_lib.client.common_lib import packages
44*9c5db199SXin Lifrom autotest_lib.client.common_lib import utils
45*9c5db199SXin Lifrom autotest_lib.client.common_lib import seven
46*9c5db199SXin Lifrom autotest_lib.server import profilers
47*9c5db199SXin Lifrom autotest_lib.server import site_gtest_runner
48*9c5db199SXin Lifrom autotest_lib.server import subcommand
49*9c5db199SXin Lifrom autotest_lib.server import test
50*9c5db199SXin Lifrom autotest_lib.server.autotest import OFFLOAD_ENVVAR
51*9c5db199SXin Lifrom autotest_lib.server import utils as server_utils
52*9c5db199SXin Lifrom autotest_lib.server.cros.dynamic_suite import frontend_wrappers
53*9c5db199SXin Lifrom autotest_lib.server import hosts
54*9c5db199SXin Lifrom autotest_lib.server.hosts import abstract_ssh
55*9c5db199SXin Lifrom autotest_lib.server.hosts import afe_store
56*9c5db199SXin Lifrom autotest_lib.server.hosts import file_store
57*9c5db199SXin Lifrom autotest_lib.server.hosts import shadowing_store
58*9c5db199SXin Lifrom autotest_lib.server.hosts import factory as host_factory
59*9c5db199SXin Lifrom autotest_lib.server.hosts import host_info
60*9c5db199SXin Lifrom autotest_lib.server.hosts import ssh_multiplex
61*9c5db199SXin Lifrom autotest_lib.tko import models as tko_models
62*9c5db199SXin Lifrom autotest_lib.tko import parser_lib
63*9c5db199SXin Lifrom six.moves import zip
64*9c5db199SXin Li
65*9c5db199SXin Litry:
66*9c5db199SXin Li    from autotest_lib.utils.frozen_chromite.lib import metrics
67*9c5db199SXin Liexcept ImportError:
68*9c5db199SXin Li    metrics = utils.metrics_mock
69*9c5db199SXin Li
70*9c5db199SXin Li
71*9c5db199SXin Lidef _control_segment_path(name):
72*9c5db199SXin Li    """Get the pathname of the named control segment file."""
73*9c5db199SXin Li    server_dir = os.path.dirname(os.path.abspath(__file__))
74*9c5db199SXin Li    return os.path.join(server_dir, "control_segments", name)
75*9c5db199SXin Li
76*9c5db199SXin Li
77*9c5db199SXin LiCLIENT_CONTROL_FILENAME = 'control'
78*9c5db199SXin LiSERVER_CONTROL_FILENAME = 'control.srv'
79*9c5db199SXin LiMACHINES_FILENAME = '.machines'
80*9c5db199SXin LiDUT_STATEFUL_PATH = "/usr/local"
81*9c5db199SXin Li
82*9c5db199SXin LiCLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
83*9c5db199SXin LiCLIENT_TRAMPOLINE_CONTROL_FILE = _control_segment_path('client_trampoline')
84*9c5db199SXin LiCRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
85*9c5db199SXin LiCRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
86*9c5db199SXin LiCLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
87*9c5db199SXin LiVERIFY_CONTROL_FILE = _control_segment_path('verify')
88*9c5db199SXin LiREPAIR_CONTROL_FILE = _control_segment_path('repair')
89*9c5db199SXin LiPROVISION_CONTROL_FILE = _control_segment_path('provision')
90*9c5db199SXin LiVERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
91*9c5db199SXin LiRESET_CONTROL_FILE = _control_segment_path('reset')
92*9c5db199SXin LiGET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats')
93*9c5db199SXin Li
94*9c5db199SXin Li
95*9c5db199SXin Lidef get_machine_dicts(machine_names, store_dir, in_lab, use_shadow_store,
96*9c5db199SXin Li                      host_attributes=None):
97*9c5db199SXin Li    """Converts a list of machine names to list of dicts.
98*9c5db199SXin Li
99*9c5db199SXin Li    TODO(crbug.com/678430): This function temporarily has a side effect of
100*9c5db199SXin Li    creating files under workdir for backing a FileStore. This side-effect will
101*9c5db199SXin Li    go away once callers of autoserv start passing in the FileStore.
102*9c5db199SXin Li
103*9c5db199SXin Li    @param machine_names: A list of machine names.
104*9c5db199SXin Li    @param store_dir: A directory to contain store backing files.
105*9c5db199SXin Li    @param use_shadow_store: If True, we should create a ShadowingStore where
106*9c5db199SXin Li            actual store is backed by the AFE but we create a backing file to
107*9c5db199SXin Li            shadow the store. If False, backing file should already exist at:
108*9c5db199SXin Li            ${store_dir}/${hostname}.store
109*9c5db199SXin Li    @param in_lab: A boolean indicating whether we're running in lab.
110*9c5db199SXin Li    @param host_attributes: Optional list of host attributes to add for each
111*9c5db199SXin Li            host.
112*9c5db199SXin Li    @returns: A list of dicts. Each dict has the following keys:
113*9c5db199SXin Li            'hostname': Name of the machine originally in machine_names (str).
114*9c5db199SXin Li            'afe_host': A frontend.Host object for the machine, or a stub if
115*9c5db199SXin Li                    in_lab is false.
116*9c5db199SXin Li            'host_info_store': A host_info.CachingHostInfoStore object to obtain
117*9c5db199SXin Li                    host information. A stub if in_lab is False.
118*9c5db199SXin Li            'connection_pool': ssh_multiplex.ConnectionPool instance to share
119*9c5db199SXin Li                    ssh connection across control scripts. This is set to
120*9c5db199SXin Li                    None, and should be overridden for connection sharing.
121*9c5db199SXin Li    """
122*9c5db199SXin Li    # See autoserv_parser.parse_args. Only one of in_lab or host_attributes can
123*9c5db199SXin Li    # be provided.
124*9c5db199SXin Li    if in_lab and host_attributes:
125*9c5db199SXin Li        raise error.AutoservError(
126*9c5db199SXin Li                'in_lab and host_attribute are mutually exclusive.')
127*9c5db199SXin Li
128*9c5db199SXin Li    machine_dict_list = []
129*9c5db199SXin Li    for machine in machine_names:
130*9c5db199SXin Li        if not in_lab:
131*9c5db199SXin Li            afe_host = server_utils.EmptyAFEHost()
132*9c5db199SXin Li            host_info_store = _create_file_backed_host_info_store(
133*9c5db199SXin Li                    store_dir, machine)
134*9c5db199SXin Li            if host_attributes:
135*9c5db199SXin Li                afe_host.attributes.update(host_attributes)
136*9c5db199SXin Li                info = host_info.HostInfo(labels=host_info_store.get().labels,
137*9c5db199SXin Li                                          attributes=host_attributes)
138*9c5db199SXin Li                host_info_store.commit(info)
139*9c5db199SXin Li        elif use_shadow_store:
140*9c5db199SXin Li            afe_host = _create_afe_host(machine)
141*9c5db199SXin Li            host_info_store = _create_afe_backed_host_info_store(store_dir,
142*9c5db199SXin Li                                                                 machine)
143*9c5db199SXin Li        else:
144*9c5db199SXin Li            afe_host = server_utils.EmptyAFEHost()
145*9c5db199SXin Li            host_info_store = _create_file_backed_host_info_store(store_dir,
146*9c5db199SXin Li                                                                  machine)
147*9c5db199SXin Li
148*9c5db199SXin Li        machine_dict_list.append({
149*9c5db199SXin Li                'hostname' : machine,
150*9c5db199SXin Li                'afe_host' : afe_host,
151*9c5db199SXin Li                'host_info_store': host_info_store,
152*9c5db199SXin Li                'connection_pool': None,
153*9c5db199SXin Li        })
154*9c5db199SXin Li
155*9c5db199SXin Li    return machine_dict_list
156*9c5db199SXin Li
157*9c5db199SXin Li
158*9c5db199SXin Liclass status_indenter(base_job.status_indenter):
159*9c5db199SXin Li    """Provide a simple integer-backed status indenter."""
160*9c5db199SXin Li    def __init__(self):
161*9c5db199SXin Li        self._indent = 0
162*9c5db199SXin Li
163*9c5db199SXin Li
164*9c5db199SXin Li    @property
165*9c5db199SXin Li    def indent(self):
166*9c5db199SXin Li        return self._indent
167*9c5db199SXin Li
168*9c5db199SXin Li
169*9c5db199SXin Li    def increment(self):
170*9c5db199SXin Li        self._indent += 1
171*9c5db199SXin Li
172*9c5db199SXin Li
173*9c5db199SXin Li    def decrement(self):
174*9c5db199SXin Li        self._indent -= 1
175*9c5db199SXin Li
176*9c5db199SXin Li
177*9c5db199SXin Li    def get_context(self):
178*9c5db199SXin Li        """Returns a context object for use by job.get_record_context."""
179*9c5db199SXin Li        class context(object):
180*9c5db199SXin Li            def __init__(self, indenter, indent):
181*9c5db199SXin Li                self._indenter = indenter
182*9c5db199SXin Li                self._indent = indent
183*9c5db199SXin Li            def restore(self):
184*9c5db199SXin Li                self._indenter._indent = self._indent
185*9c5db199SXin Li        return context(self, self._indent)
186*9c5db199SXin Li
187*9c5db199SXin Li
188*9c5db199SXin Liclass server_job_record_hook(object):
189*9c5db199SXin Li    """The job.record hook for server job. Used to inject WARN messages from
190*9c5db199SXin Li    the console or vlm whenever new logs are written, and to echo any logs
191*9c5db199SXin Li    to INFO level logging. Implemented as a class so that it can use state to
192*9c5db199SXin Li    block recursive calls, so that the hook can call job.record itself to
193*9c5db199SXin Li    log WARN messages.
194*9c5db199SXin Li
195*9c5db199SXin Li    Depends on job._read_warnings and job._logger.
196*9c5db199SXin Li    """
197*9c5db199SXin Li    def __init__(self, job):
198*9c5db199SXin Li        self._job = job
199*9c5db199SXin Li        self._being_called = False
200*9c5db199SXin Li
201*9c5db199SXin Li
202*9c5db199SXin Li    def __call__(self, entry):
203*9c5db199SXin Li        """A wrapper around the 'real' record hook, the _hook method, which
204*9c5db199SXin Li        prevents recursion. This isn't making any effort to be threadsafe,
205*9c5db199SXin Li        the intent is to outright block infinite recursion via a
206*9c5db199SXin Li        job.record->_hook->job.record->_hook->job.record... chain."""
207*9c5db199SXin Li        if self._being_called:
208*9c5db199SXin Li            return
209*9c5db199SXin Li        self._being_called = True
210*9c5db199SXin Li        try:
211*9c5db199SXin Li            self._hook(self._job, entry)
212*9c5db199SXin Li        finally:
213*9c5db199SXin Li            self._being_called = False
214*9c5db199SXin Li
215*9c5db199SXin Li
216*9c5db199SXin Li    @staticmethod
217*9c5db199SXin Li    def _hook(job, entry):
218*9c5db199SXin Li        """The core hook, which can safely call job.record."""
219*9c5db199SXin Li        entries = []
220*9c5db199SXin Li        # poll all our warning loggers for new warnings
221*9c5db199SXin Li        for timestamp, msg in job._read_warnings():
222*9c5db199SXin Li            warning_entry = base_job.status_log_entry(
223*9c5db199SXin Li                'WARN', None, None, msg, {}, timestamp=timestamp)
224*9c5db199SXin Li            entries.append(warning_entry)
225*9c5db199SXin Li            job.record_entry(warning_entry)
226*9c5db199SXin Li        # echo rendered versions of all the status logs to info
227*9c5db199SXin Li        entries.append(entry)
228*9c5db199SXin Li        for entry in entries:
229*9c5db199SXin Li            rendered_entry = job._logger.render_entry(entry)
230*9c5db199SXin Li            logging.info(rendered_entry)
231*9c5db199SXin Li
232*9c5db199SXin Li
233*9c5db199SXin Liclass server_job(base_job.base_job):
234*9c5db199SXin Li    """The server-side concrete implementation of base_job.
235*9c5db199SXin Li
236*9c5db199SXin Li    Optional properties provided by this implementation:
237*9c5db199SXin Li        serverdir
238*9c5db199SXin Li
239*9c5db199SXin Li        warning_manager
240*9c5db199SXin Li        warning_loggers
241*9c5db199SXin Li    """
242*9c5db199SXin Li
243*9c5db199SXin Li    _STATUS_VERSION = 1
244*9c5db199SXin Li
245*9c5db199SXin Li    # TODO crbug.com/285395 eliminate ssh_verbosity_flag
246*9c5db199SXin Li    def __init__(self,
247*9c5db199SXin Li                 control,
248*9c5db199SXin Li                 args,
249*9c5db199SXin Li                 resultdir,
250*9c5db199SXin Li                 label,
251*9c5db199SXin Li                 user,
252*9c5db199SXin Li                 machines,
253*9c5db199SXin Li                 machine_dict_list,
254*9c5db199SXin Li                 client=False,
255*9c5db199SXin Li                 ssh_user=host_factory.DEFAULT_SSH_USER,
256*9c5db199SXin Li                 ssh_port=host_factory.DEFAULT_SSH_PORT,
257*9c5db199SXin Li                 ssh_pass=host_factory.DEFAULT_SSH_PASS,
258*9c5db199SXin Li                 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
259*9c5db199SXin Li                 ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
260*9c5db199SXin Li                 group_name='',
261*9c5db199SXin Li                 tag='',
262*9c5db199SXin Li                 disable_sysinfo=False,
263*9c5db199SXin Li                 control_filename=SERVER_CONTROL_FILENAME,
264*9c5db199SXin Li                 parent_job_id=None,
265*9c5db199SXin Li                 in_lab=False,
266*9c5db199SXin Li                 use_client_trampoline=False,
267*9c5db199SXin Li                 sync_offload_dir='',
268*9c5db199SXin Li                 companion_hosts=None,
269*9c5db199SXin Li                 dut_servers=None,
270*9c5db199SXin Li                 is_cft=False,
271*9c5db199SXin Li                 force_full_log_collection=False):
272*9c5db199SXin Li        """
273*9c5db199SXin Li        Create a server side job object.
274*9c5db199SXin Li
275*9c5db199SXin Li        @param control: The pathname of the control file.
276*9c5db199SXin Li        @param args: Passed to the control file.
277*9c5db199SXin Li        @param resultdir: Where to throw the results.
278*9c5db199SXin Li        @param label: Description of the job.
279*9c5db199SXin Li        @param user: Username for the job (email address).
280*9c5db199SXin Li        @param machines: A list of hostnames of the machines to use for the job.
281*9c5db199SXin Li        @param machine_dict_list: A list of dicts for each of the machines above
282*9c5db199SXin Li                as returned by get_machine_dicts.
283*9c5db199SXin Li        @param client: True if this is a client-side control file.
284*9c5db199SXin Li        @param ssh_user: The SSH username.  [root]
285*9c5db199SXin Li        @param ssh_port: The SSH port number.  [22]
286*9c5db199SXin Li        @param ssh_pass: The SSH passphrase, if needed.
287*9c5db199SXin Li        @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
288*9c5db199SXin Li                '-vvv', or an empty string if not needed.
289*9c5db199SXin Li        @param ssh_options: A string giving additional options that will be
290*9c5db199SXin Li                included in ssh commands.
291*9c5db199SXin Li        @param group_name: If supplied, this will be written out as
292*9c5db199SXin Li                host_group_name in the keyvals file for the parser.
293*9c5db199SXin Li        @param tag: The job execution tag from the scheduler.  [optional]
294*9c5db199SXin Li        @param disable_sysinfo: Whether we should disable the sysinfo step of
295*9c5db199SXin Li                tests for a modest shortening of test time.  [optional]
296*9c5db199SXin Li        @param control_filename: The filename where the server control file
297*9c5db199SXin Li                should be written in the results directory.
298*9c5db199SXin Li        @param parent_job_id: Job ID of the parent job. Default to None if the
299*9c5db199SXin Li                job does not have a parent job.
300*9c5db199SXin Li        @param in_lab: Boolean that indicates if this is running in the lab
301*9c5db199SXin Li                environment.
302*9c5db199SXin Li        @param use_client_trampoline: Boolean that indicates whether to
303*9c5db199SXin Li                use the client trampoline flow.  If this is True, control
304*9c5db199SXin Li                is interpreted as the name of the client test to run.
305*9c5db199SXin Li                The client control file will be client_trampoline.  The
306*9c5db199SXin Li                test name will be passed to client_trampoline, which will
307*9c5db199SXin Li                install the test package and re-exec the actual test
308*9c5db199SXin Li                control file.
309*9c5db199SXin Li        @param sync_offload_dir: String; relative path to synchronous offload
310*9c5db199SXin Li                dir, relative to the results directory. Ignored if empty.
311*9c5db199SXin Li        @param companion_hosts: a str or list of hosts to be used as companions
312*9c5db199SXin Li                for the and provided to test. NOTE: these are different than
313*9c5db199SXin Li                machines, where each host is a host that the test would be run
314*9c5db199SXin Li                on.
315*9c5db199SXin Li        @param dut_servers: a str or list of hosts to be used as DUT servers
316*9c5db199SXin Li                provided to test.
317*9c5db199SXin Li        @param force_full_log_collection: bool; force full log collection even
318*9c5db199SXin Li                when test passes.
319*9c5db199SXin Li        """
320*9c5db199SXin Li        super(server_job, self).__init__(resultdir=resultdir)
321*9c5db199SXin Li        self.control = control
322*9c5db199SXin Li        self._uncollected_log_file = os.path.join(self.resultdir,
323*9c5db199SXin Li                                                  'uncollected_logs')
324*9c5db199SXin Li        debugdir = os.path.join(self.resultdir, 'debug')
325*9c5db199SXin Li        if not os.path.exists(debugdir):
326*9c5db199SXin Li            os.mkdir(debugdir)
327*9c5db199SXin Li
328*9c5db199SXin Li        if user:
329*9c5db199SXin Li            self.user = user
330*9c5db199SXin Li        else:
331*9c5db199SXin Li            self.user = getpass.getuser()
332*9c5db199SXin Li
333*9c5db199SXin Li        self.args = args
334*9c5db199SXin Li        self.label = label
335*9c5db199SXin Li        self.machines = machines
336*9c5db199SXin Li        self._client = client
337*9c5db199SXin Li        self.warning_loggers = set()
338*9c5db199SXin Li        self.warning_manager = warning_manager()
339*9c5db199SXin Li        self._ssh_user = ssh_user
340*9c5db199SXin Li        self._ssh_port = ssh_port
341*9c5db199SXin Li        self._ssh_pass = ssh_pass
342*9c5db199SXin Li        self._ssh_verbosity_flag = ssh_verbosity_flag
343*9c5db199SXin Li        self._ssh_options = ssh_options
344*9c5db199SXin Li        self.tag = tag
345*9c5db199SXin Li        self.hosts = set()
346*9c5db199SXin Li        self.drop_caches = False
347*9c5db199SXin Li        self.drop_caches_between_iterations = False
348*9c5db199SXin Li        self._control_filename = control_filename
349*9c5db199SXin Li        self._disable_sysinfo = disable_sysinfo
350*9c5db199SXin Li        self._use_client_trampoline = use_client_trampoline
351*9c5db199SXin Li        self._companion_hosts = companion_hosts
352*9c5db199SXin Li        self._dut_servers = dut_servers
353*9c5db199SXin Li        self._is_cft = is_cft
354*9c5db199SXin Li        self.force_full_log_collection = force_full_log_collection
355*9c5db199SXin Li
356*9c5db199SXin Li        # Parse the release number from the label to setup sysinfo.
357*9c5db199SXin Li        version = re.findall('release/R(\d+)-', label)
358*9c5db199SXin Li        if version:
359*9c5db199SXin Li            version = int(version[0])
360*9c5db199SXin Li
361*9c5db199SXin Li        self.logging = logging_manager.get_logging_manager(
362*9c5db199SXin Li                manage_stdout_and_stderr=True, redirect_fds=True)
363*9c5db199SXin Li        subcommand.logging_manager_object = self.logging
364*9c5db199SXin Li
365*9c5db199SXin Li        self.sysinfo = sysinfo.sysinfo(self.resultdir, version=version)
366*9c5db199SXin Li        self.profilers = profilers.profilers(self)
367*9c5db199SXin Li        self._sync_offload_dir = sync_offload_dir
368*9c5db199SXin Li
369*9c5db199SXin Li        job_data = {
370*9c5db199SXin Li                'user': user,
371*9c5db199SXin Li                'hostname': ','.join(machines),
372*9c5db199SXin Li                'drone': platform.node(),
373*9c5db199SXin Li                'status_version': str(self._STATUS_VERSION),
374*9c5db199SXin Li                'job_started': str(int(time.time()))
375*9c5db199SXin Li        }
376*9c5db199SXin Li
377*9c5db199SXin Li        # Adhoc/<testname> is the default label, and should not be written,
378*9c5db199SXin Li        # as this can cause conflicts with results uploading in CFT.
379*9c5db199SXin Li        # However, some pipelines (such as PVS) do need `label` within the
380*9c5db199SXin Li        # keyval, which can now by done with the `-l` flag in test_that.
381*9c5db199SXin Li        if 'adhoc' not in label:
382*9c5db199SXin Li            job_data['label'] = label
383*9c5db199SXin Li        # Save parent job id to keyvals, so parser can retrieve the info and
384*9c5db199SXin Li        # write to tko_jobs record.
385*9c5db199SXin Li        if parent_job_id:
386*9c5db199SXin Li            job_data['parent_job_id'] = parent_job_id
387*9c5db199SXin Li        if group_name:
388*9c5db199SXin Li            job_data['host_group_name'] = group_name
389*9c5db199SXin Li
390*9c5db199SXin Li        # only write these keyvals out on the first job in a resultdir
391*9c5db199SXin Li        if 'job_started' not in utils.read_keyval(self.resultdir):
392*9c5db199SXin Li            job_data.update(self._get_job_data())
393*9c5db199SXin Li            utils.write_keyval(self.resultdir, job_data)
394*9c5db199SXin Li
395*9c5db199SXin Li        self.pkgmgr = packages.PackageManager(
396*9c5db199SXin Li            self.autodir, run_function_dargs={'timeout':600})
397*9c5db199SXin Li
398*9c5db199SXin Li        self._register_subcommand_hooks()
399*9c5db199SXin Li
400*9c5db199SXin Li        # We no longer parse results as part of the server_job. These arguments
401*9c5db199SXin Li        # can't be dropped yet because clients haven't all be cleaned up yet.
402*9c5db199SXin Li        self.num_tests_run = -1
403*9c5db199SXin Li        self.num_tests_failed = -1
404*9c5db199SXin Li
405*9c5db199SXin Li        # set up the status logger
406*9c5db199SXin Li        self._indenter = status_indenter()
407*9c5db199SXin Li        self._logger = base_job.status_logger(
408*9c5db199SXin Li            self, self._indenter, 'status.log', 'status.log',
409*9c5db199SXin Li            record_hook=server_job_record_hook(self))
410*9c5db199SXin Li
411*9c5db199SXin Li        # Initialize a flag to indicate DUT failure during the test, e.g.,
412*9c5db199SXin Li        # unexpected reboot.
413*9c5db199SXin Li        self.failed_with_device_error = False
414*9c5db199SXin Li
415*9c5db199SXin Li        self._connection_pool = ssh_multiplex.ConnectionPool()
416*9c5db199SXin Li
417*9c5db199SXin Li        # List of functions to run after the main job function.
418*9c5db199SXin Li        self._post_run_hooks = []
419*9c5db199SXin Li
420*9c5db199SXin Li        self.parent_job_id = parent_job_id
421*9c5db199SXin Li        self.in_lab = in_lab
422*9c5db199SXin Li        self.machine_dict_list = machine_dict_list
423*9c5db199SXin Li        for machine_dict in self.machine_dict_list:
424*9c5db199SXin Li            machine_dict['connection_pool'] = self._connection_pool
425*9c5db199SXin Li
426*9c5db199SXin Li        # TODO(jrbarnette) The harness attribute is only relevant to
427*9c5db199SXin Li        # client jobs, but it's required to be present, or we will fail
428*9c5db199SXin Li        # server job unit tests.  Yes, really.
429*9c5db199SXin Li        #
430*9c5db199SXin Li        # TODO(jrbarnette) The utility of the 'harness' attribute even
431*9c5db199SXin Li        # to client jobs is suspect.  Probably, we should remove it.
432*9c5db199SXin Li        self.harness = None
433*9c5db199SXin Li
434*9c5db199SXin Li        # TODO(ayatane): fast and max_result_size_KB are not set for
435*9c5db199SXin Li        # client_trampoline jobs.
436*9c5db199SXin Li        if control and not use_client_trampoline:
437*9c5db199SXin Li            parsed_control = control_data.parse_control(
438*9c5db199SXin Li                    control, raise_warnings=False)
439*9c5db199SXin Li            self.fast = parsed_control.fast
440*9c5db199SXin Li            self.max_result_size_KB = parsed_control.max_result_size_KB
441*9c5db199SXin Li            # wrap this in a try to prevent client/SSP issues. Note: if the
442*9c5db199SXin Li            # except is hit, the timeout will be ignored.
443*9c5db199SXin Li            try:
444*9c5db199SXin Li                self.extended_timeout = parsed_control.extended_timeout
445*9c5db199SXin Li            except AttributeError:
446*9c5db199SXin Li                self.extended_timeout = None
447*9c5db199SXin Li        else:
448*9c5db199SXin Li            self.fast = False
449*9c5db199SXin Li            # Set the maximum result size to be the default specified in
450*9c5db199SXin Li            # global config, if the job has no control file associated.
451*9c5db199SXin Li            self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
452*9c5db199SXin Li
453*9c5db199SXin Li
454*9c5db199SXin Li    @classmethod
455*9c5db199SXin Li    def _find_base_directories(cls):
456*9c5db199SXin Li        """
457*9c5db199SXin Li        Determine locations of autodir, clientdir and serverdir. Assumes
458*9c5db199SXin Li        that this file is located within serverdir and uses __file__ along
459*9c5db199SXin Li        with relative paths to resolve the location.
460*9c5db199SXin Li        """
461*9c5db199SXin Li        serverdir = os.path.abspath(os.path.dirname(__file__))
462*9c5db199SXin Li        autodir = os.path.normpath(os.path.join(serverdir, '..'))
463*9c5db199SXin Li        clientdir = os.path.join(autodir, 'client')
464*9c5db199SXin Li        return autodir, clientdir, serverdir
465*9c5db199SXin Li
466*9c5db199SXin Li
467*9c5db199SXin Li    def _find_resultdir(self, resultdir, *args, **dargs):
468*9c5db199SXin Li        """
469*9c5db199SXin Li        Determine the location of resultdir. For server jobs we expect one to
470*9c5db199SXin Li        always be explicitly passed in to __init__, so just return that.
471*9c5db199SXin Li        """
472*9c5db199SXin Li        if resultdir:
473*9c5db199SXin Li            return os.path.normpath(resultdir)
474*9c5db199SXin Li        else:
475*9c5db199SXin Li            return None
476*9c5db199SXin Li
477*9c5db199SXin Li
478*9c5db199SXin Li    def _get_status_logger(self):
479*9c5db199SXin Li        """Return a reference to the status logger."""
480*9c5db199SXin Li        return self._logger
481*9c5db199SXin Li
482*9c5db199SXin Li
483*9c5db199SXin Li    @staticmethod
484*9c5db199SXin Li    def _load_control_file(path):
485*9c5db199SXin Li        f = open(path)
486*9c5db199SXin Li        try:
487*9c5db199SXin Li            control_file = f.read()
488*9c5db199SXin Li        finally:
489*9c5db199SXin Li            f.close()
490*9c5db199SXin Li        return re.sub('\r', '', control_file)
491*9c5db199SXin Li
492*9c5db199SXin Li
493*9c5db199SXin Li    def _register_subcommand_hooks(self):
494*9c5db199SXin Li        """
495*9c5db199SXin Li        Register some hooks into the subcommand modules that allow us
496*9c5db199SXin Li        to properly clean up self.hosts created in forked subprocesses.
497*9c5db199SXin Li        """
498*9c5db199SXin Li        def on_fork(cmd):
499*9c5db199SXin Li            self._existing_hosts_on_fork = set(self.hosts)
500*9c5db199SXin Li        def on_join(cmd):
501*9c5db199SXin Li            new_hosts = self.hosts - self._existing_hosts_on_fork
502*9c5db199SXin Li            for host in new_hosts:
503*9c5db199SXin Li                host.close()
504*9c5db199SXin Li        subcommand.subcommand.register_fork_hook(on_fork)
505*9c5db199SXin Li        subcommand.subcommand.register_join_hook(on_join)
506*9c5db199SXin Li
507*9c5db199SXin Li
508*9c5db199SXin Li    # TODO crbug.com/285395 add a kwargs parameter.
509*9c5db199SXin Li    def _make_namespace(self):
510*9c5db199SXin Li        """Create a namespace dictionary to be passed along to control file.
511*9c5db199SXin Li
512*9c5db199SXin Li        Creates a namespace argument populated with standard values:
513*9c5db199SXin Li        machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
514*9c5db199SXin Li        and ssh_options.
515*9c5db199SXin Li        """
516*9c5db199SXin Li        namespace = {'machines' : self.machine_dict_list,
517*9c5db199SXin Li                     'job' : self,
518*9c5db199SXin Li                     'ssh_user' : self._ssh_user,
519*9c5db199SXin Li                     'ssh_port' : self._ssh_port,
520*9c5db199SXin Li                     'ssh_pass' : self._ssh_pass,
521*9c5db199SXin Li                     'ssh_verbosity_flag' : self._ssh_verbosity_flag,
522*9c5db199SXin Li                     'ssh_options' : self._ssh_options}
523*9c5db199SXin Li        return namespace
524*9c5db199SXin Li
525*9c5db199SXin Li
526*9c5db199SXin Li    def cleanup(self, labels):
527*9c5db199SXin Li        """Cleanup machines.
528*9c5db199SXin Li
529*9c5db199SXin Li        @param labels: Comma separated job labels, will be used to
530*9c5db199SXin Li                       determine special task actions.
531*9c5db199SXin Li        """
532*9c5db199SXin Li        if not self.machines:
533*9c5db199SXin Li            raise error.AutoservError('No machines specified to cleanup')
534*9c5db199SXin Li        if self.resultdir:
535*9c5db199SXin Li            os.chdir(self.resultdir)
536*9c5db199SXin Li
537*9c5db199SXin Li        namespace = self._make_namespace()
538*9c5db199SXin Li        namespace.update({'job_labels': labels, 'args': ''})
539*9c5db199SXin Li        self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
540*9c5db199SXin Li
541*9c5db199SXin Li
542*9c5db199SXin Li    def verify(self, labels):
543*9c5db199SXin Li        """Verify machines are all ssh-able.
544*9c5db199SXin Li
545*9c5db199SXin Li        @param labels: Comma separated job labels, will be used to
546*9c5db199SXin Li                       determine special task actions.
547*9c5db199SXin Li        """
548*9c5db199SXin Li        if not self.machines:
549*9c5db199SXin Li            raise error.AutoservError('No machines specified to verify')
550*9c5db199SXin Li        if self.resultdir:
551*9c5db199SXin Li            os.chdir(self.resultdir)
552*9c5db199SXin Li
553*9c5db199SXin Li        namespace = self._make_namespace()
554*9c5db199SXin Li        namespace.update({'job_labels': labels, 'args': ''})
555*9c5db199SXin Li        self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
556*9c5db199SXin Li
557*9c5db199SXin Li
558*9c5db199SXin Li    def reset(self, labels):
559*9c5db199SXin Li        """Reset machines by first cleanup then verify each machine.
560*9c5db199SXin Li
561*9c5db199SXin Li        @param labels: Comma separated job labels, will be used to
562*9c5db199SXin Li                       determine special task actions.
563*9c5db199SXin Li        """
564*9c5db199SXin Li        if not self.machines:
565*9c5db199SXin Li            raise error.AutoservError('No machines specified to reset.')
566*9c5db199SXin Li        if self.resultdir:
567*9c5db199SXin Li            os.chdir(self.resultdir)
568*9c5db199SXin Li
569*9c5db199SXin Li        namespace = self._make_namespace()
570*9c5db199SXin Li        namespace.update({'job_labels': labels, 'args': ''})
571*9c5db199SXin Li        self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
572*9c5db199SXin Li
573*9c5db199SXin Li
574*9c5db199SXin Li    def repair(self, labels):
575*9c5db199SXin Li        """Repair machines.
576*9c5db199SXin Li
577*9c5db199SXin Li        @param labels: Comma separated job labels, will be used to
578*9c5db199SXin Li                       determine special task actions.
579*9c5db199SXin Li        """
580*9c5db199SXin Li        if not self.machines:
581*9c5db199SXin Li            raise error.AutoservError('No machines specified to repair')
582*9c5db199SXin Li        if self.resultdir:
583*9c5db199SXin Li            os.chdir(self.resultdir)
584*9c5db199SXin Li
585*9c5db199SXin Li        namespace = self._make_namespace()
586*9c5db199SXin Li        namespace.update({'job_labels': labels, 'args': ''})
587*9c5db199SXin Li        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
588*9c5db199SXin Li
589*9c5db199SXin Li
590*9c5db199SXin Li    def provision(self, labels):
591*9c5db199SXin Li        """
592*9c5db199SXin Li        Provision all hosts to match |labels|.
593*9c5db199SXin Li
594*9c5db199SXin Li        @param labels: A comma seperated string of labels to provision the
595*9c5db199SXin Li                       host to.
596*9c5db199SXin Li
597*9c5db199SXin Li        """
598*9c5db199SXin Li        control = self._load_control_file(PROVISION_CONTROL_FILE)
599*9c5db199SXin Li        self.run(control=control, job_labels=labels)
600*9c5db199SXin Li
601*9c5db199SXin Li
602*9c5db199SXin Li    def precheck(self):
603*9c5db199SXin Li        """
604*9c5db199SXin Li        perform any additional checks in derived classes.
605*9c5db199SXin Li        """
606*9c5db199SXin Li        pass
607*9c5db199SXin Li
608*9c5db199SXin Li
609*9c5db199SXin Li    def enable_external_logging(self):
610*9c5db199SXin Li        """
611*9c5db199SXin Li        Start or restart external logging mechanism.
612*9c5db199SXin Li        """
613*9c5db199SXin Li        pass
614*9c5db199SXin Li
615*9c5db199SXin Li
616*9c5db199SXin Li    def disable_external_logging(self):
617*9c5db199SXin Li        """
618*9c5db199SXin Li        Pause or stop external logging mechanism.
619*9c5db199SXin Li        """
620*9c5db199SXin Li        pass
621*9c5db199SXin Li
622*9c5db199SXin Li
623*9c5db199SXin Li    def use_external_logging(self):
624*9c5db199SXin Li        """
625*9c5db199SXin Li        Return True if external logging should be used.
626*9c5db199SXin Li        """
627*9c5db199SXin Li        return False
628*9c5db199SXin Li
629*9c5db199SXin Li
630*9c5db199SXin Li    def _make_parallel_wrapper(self, function, machines, log):
631*9c5db199SXin Li        """Wrap function as appropriate for calling by parallel_simple."""
632*9c5db199SXin Li        # machines could be a list of dictionaries, e.g.,
633*9c5db199SXin Li        # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
634*9c5db199SXin Li        # The dictionary is generated in server_job.__init__, refer to
635*9c5db199SXin Li        # variable machine_dict_list, then passed in with namespace, see method
636*9c5db199SXin Li        # server_job._make_namespace.
637*9c5db199SXin Li        # To compare the machinese to self.machines, which is a list of machine
638*9c5db199SXin Li        # hostname, we need to convert machines back to a list of hostnames.
639*9c5db199SXin Li        if (machines and isinstance(machines, list)
640*9c5db199SXin Li            and isinstance(machines[0], dict)):
641*9c5db199SXin Li            machines = [m['hostname'] for m in machines]
642*9c5db199SXin Li        if len(machines) > 1 and log:
643*9c5db199SXin Li            def wrapper(machine):
644*9c5db199SXin Li                hostname = server_utils.get_hostname_from_machine(machine)
645*9c5db199SXin Li                self.push_execution_context(hostname)
646*9c5db199SXin Li                os.chdir(self.resultdir)
647*9c5db199SXin Li                machine_data = {'hostname' : hostname,
648*9c5db199SXin Li                                'status_version' : str(self._STATUS_VERSION)}
649*9c5db199SXin Li                utils.write_keyval(self.resultdir, machine_data)
650*9c5db199SXin Li                result = function(machine)
651*9c5db199SXin Li                return result
652*9c5db199SXin Li        else:
653*9c5db199SXin Li            wrapper = function
654*9c5db199SXin Li        return wrapper
655*9c5db199SXin Li
656*9c5db199SXin Li
657*9c5db199SXin Li    def parallel_simple(self, function, machines, log=True, timeout=None,
658*9c5db199SXin Li                        return_results=False):
659*9c5db199SXin Li        """
660*9c5db199SXin Li        Run 'function' using parallel_simple, with an extra wrapper to handle
661*9c5db199SXin Li        the necessary setup for continuous parsing, if possible. If continuous
662*9c5db199SXin Li        parsing is already properly initialized then this should just work.
663*9c5db199SXin Li
664*9c5db199SXin Li        @param function: A callable to run in parallel given each machine.
665*9c5db199SXin Li        @param machines: A list of machine names to be passed one per subcommand
666*9c5db199SXin Li                invocation of function.
667*9c5db199SXin Li        @param log: If True, output will be written to output in a subdirectory
668*9c5db199SXin Li                named after each machine.
669*9c5db199SXin Li        @param timeout: Seconds after which the function call should timeout.
670*9c5db199SXin Li        @param return_results: If True instead of an AutoServError being raised
671*9c5db199SXin Li                on any error a list of the results|exceptions from the function
672*9c5db199SXin Li                called on each arg is returned.  [default: False]
673*9c5db199SXin Li
674*9c5db199SXin Li        @raises error.AutotestError: If any of the functions failed.
675*9c5db199SXin Li        """
676*9c5db199SXin Li        wrapper = self._make_parallel_wrapper(function, machines, log)
677*9c5db199SXin Li        return subcommand.parallel_simple(
678*9c5db199SXin Li                wrapper, machines,
679*9c5db199SXin Li                subdir_name_constructor=server_utils.get_hostname_from_machine,
680*9c5db199SXin Li                log=log, timeout=timeout, return_results=return_results)
681*9c5db199SXin Li
682*9c5db199SXin Li
683*9c5db199SXin Li    def parallel_on_machines(self, function, machines, timeout=None):
684*9c5db199SXin Li        """
685*9c5db199SXin Li        @param function: Called in parallel with one machine as its argument.
686*9c5db199SXin Li        @param machines: A list of machines to call function(machine) on.
687*9c5db199SXin Li        @param timeout: Seconds after which the function call should timeout.
688*9c5db199SXin Li
689*9c5db199SXin Li        @returns A list of machines on which function(machine) returned
690*9c5db199SXin Li                without raising an exception.
691*9c5db199SXin Li        """
692*9c5db199SXin Li        results = self.parallel_simple(function, machines, timeout=timeout,
693*9c5db199SXin Li                                       return_results=True)
694*9c5db199SXin Li        success_machines = []
695*9c5db199SXin Li        for result, machine in zip(results, machines):
696*9c5db199SXin Li            if not isinstance(result, Exception):
697*9c5db199SXin Li                success_machines.append(machine)
698*9c5db199SXin Li        return success_machines
699*9c5db199SXin Li
700*9c5db199SXin Li
701*9c5db199SXin Li    def record_skipped_test(self, skipped_test, message=None):
702*9c5db199SXin Li        """Insert a failure record into status.log for this test."""
703*9c5db199SXin Li        msg = message
704*9c5db199SXin Li        if msg is None:
705*9c5db199SXin Li            msg = 'No valid machines found for test %s.' % skipped_test
706*9c5db199SXin Li        logging.info(msg)
707*9c5db199SXin Li        self.record('START', None, skipped_test.test_name)
708*9c5db199SXin Li        self.record('INFO', None, skipped_test.test_name, msg)
709*9c5db199SXin Li        self.record('END TEST_NA', None, skipped_test.test_name, msg)
710*9c5db199SXin Li
711*9c5db199SXin Li
712*9c5db199SXin Li    def _has_failed_tests(self):
713*9c5db199SXin Li        """Parse status log for failed tests.
714*9c5db199SXin Li
715*9c5db199SXin Li        This checks the current working directory and is intended only for use
716*9c5db199SXin Li        by the run() method.
717*9c5db199SXin Li
718*9c5db199SXin Li        @return boolean
719*9c5db199SXin Li        """
720*9c5db199SXin Li        path = os.getcwd()
721*9c5db199SXin Li
722*9c5db199SXin Li        # TODO(ayatane): Copied from tko/parse.py.  Needs extensive refactor to
723*9c5db199SXin Li        # make code reuse plausible.
724*9c5db199SXin Li        job_keyval = tko_models.job.read_keyval(path)
725*9c5db199SXin Li        status_version = job_keyval.get("status_version", 0)
726*9c5db199SXin Li
727*9c5db199SXin Li        # parse out the job
728*9c5db199SXin Li        parser = parser_lib.parser(status_version)
729*9c5db199SXin Li        job = parser.make_job(path)
730*9c5db199SXin Li        status_log = os.path.join(path, "status.log")
731*9c5db199SXin Li        if not os.path.exists(status_log):
732*9c5db199SXin Li            status_log = os.path.join(path, "status")
733*9c5db199SXin Li        if not os.path.exists(status_log):
734*9c5db199SXin Li            logging.warning("! Unable to parse job, no status file")
735*9c5db199SXin Li            return True
736*9c5db199SXin Li
737*9c5db199SXin Li        # parse the status logs
738*9c5db199SXin Li        status_lines = open(status_log).readlines()
739*9c5db199SXin Li        parser.start(job)
740*9c5db199SXin Li        tests = parser.end(status_lines)
741*9c5db199SXin Li
742*9c5db199SXin Li        # parser.end can return the same object multiple times, so filter out
743*9c5db199SXin Li        # dups
744*9c5db199SXin Li        job.tests = []
745*9c5db199SXin Li        already_added = set()
746*9c5db199SXin Li        for test in tests:
747*9c5db199SXin Li            if test not in already_added:
748*9c5db199SXin Li                already_added.add(test)
749*9c5db199SXin Li                job.tests.append(test)
750*9c5db199SXin Li
751*9c5db199SXin Li        failed = False
752*9c5db199SXin Li        for test in job.tests:
753*9c5db199SXin Li            # The current job is still running and shouldn't count as failed.
754*9c5db199SXin Li            # The parser will fail to parse the exit status of the job since it
755*9c5db199SXin Li            # hasn't exited yet (this running right now is the job).
756*9c5db199SXin Li            failed = failed or (test.status != 'GOOD'
757*9c5db199SXin Li                                and not _is_current_server_job(test))
758*9c5db199SXin Li        return failed
759*9c5db199SXin Li
760*9c5db199SXin Li
761*9c5db199SXin Li    def _collect_crashes(self, namespace, collect_crashinfo):
762*9c5db199SXin Li        """Collect crashes.
763*9c5db199SXin Li
764*9c5db199SXin Li        @param namespace: namespace dict.
765*9c5db199SXin Li        @param collect_crashinfo: whether to collect crashinfo in addition to
766*9c5db199SXin Li                dumps
767*9c5db199SXin Li        """
768*9c5db199SXin Li        if collect_crashinfo:
769*9c5db199SXin Li            # includes crashdumps
770*9c5db199SXin Li            crash_control_file = CRASHINFO_CONTROL_FILE
771*9c5db199SXin Li        else:
772*9c5db199SXin Li            crash_control_file = CRASHDUMPS_CONTROL_FILE
773*9c5db199SXin Li        self._execute_code(crash_control_file, namespace)
774*9c5db199SXin Li
775*9c5db199SXin Li
776*9c5db199SXin Li    _USE_TEMP_DIR = object()
777*9c5db199SXin Li    def run(self, collect_crashdumps=True, namespace={}, control=None,
778*9c5db199SXin Li            control_file_dir=None, verify_job_repo_url=False,
779*9c5db199SXin Li            only_collect_crashinfo=False, skip_crash_collection=False,
780*9c5db199SXin Li            job_labels='', use_packaging=True):
781*9c5db199SXin Li        # for a normal job, make sure the uncollected logs file exists
782*9c5db199SXin Li        # for a crashinfo-only run it should already exist, bail out otherwise
783*9c5db199SXin Li        created_uncollected_logs = False
784*9c5db199SXin Li        logging.info("I am PID %s", os.getpid())
785*9c5db199SXin Li        if self.resultdir and not os.path.exists(self._uncollected_log_file):
786*9c5db199SXin Li            if only_collect_crashinfo:
787*9c5db199SXin Li                # if this is a crashinfo-only run, and there were no existing
788*9c5db199SXin Li                # uncollected logs, just bail out early
789*9c5db199SXin Li                logging.info("No existing uncollected logs, "
790*9c5db199SXin Li                             "skipping crashinfo collection")
791*9c5db199SXin Li                return
792*9c5db199SXin Li            else:
793*9c5db199SXin Li                log_file = open(self._uncollected_log_file, "wb")
794*9c5db199SXin Li                pickle.dump([], log_file)
795*9c5db199SXin Li                log_file.close()
796*9c5db199SXin Li                created_uncollected_logs = True
797*9c5db199SXin Li
798*9c5db199SXin Li        # use a copy so changes don't affect the original dictionary
799*9c5db199SXin Li        namespace = namespace.copy()
800*9c5db199SXin Li        machines = self.machines
801*9c5db199SXin Li        if control is None:
802*9c5db199SXin Li            if self.control is None:
803*9c5db199SXin Li                control = ''
804*9c5db199SXin Li            elif self._use_client_trampoline:
805*9c5db199SXin Li                # Some tests must be loaded and staged before they can be run,
806*9c5db199SXin Li                # see crbug.com/883403#c42 and #c46 for details.
807*9c5db199SXin Li                control = self._load_control_file(
808*9c5db199SXin Li                        CLIENT_TRAMPOLINE_CONTROL_FILE)
809*9c5db199SXin Li                # repr of a string is safe for eval.
810*9c5db199SXin Li                control = (('trampoline_testname = %r\n' % str(self.control))
811*9c5db199SXin Li                           + control)
812*9c5db199SXin Li            else:
813*9c5db199SXin Li                control = self._load_control_file(self.control)
814*9c5db199SXin Li        if control_file_dir is None:
815*9c5db199SXin Li            control_file_dir = self.resultdir
816*9c5db199SXin Li
817*9c5db199SXin Li        self.aborted = False
818*9c5db199SXin Li        namespace.update(self._make_namespace())
819*9c5db199SXin Li        namespace.update({
820*9c5db199SXin Li                'args': self.args,
821*9c5db199SXin Li                'job_labels': job_labels,
822*9c5db199SXin Li                'gtest_runner': site_gtest_runner.gtest_runner(),
823*9c5db199SXin Li        })
824*9c5db199SXin Li        test_start_time = int(time.time())
825*9c5db199SXin Li
826*9c5db199SXin Li        if self.resultdir:
827*9c5db199SXin Li            os.chdir(self.resultdir)
828*9c5db199SXin Li            # touch status.log so that the parser knows a job is running here
829*9c5db199SXin Li            open(self.get_status_log_path(), 'a').close()
830*9c5db199SXin Li            self.enable_external_logging()
831*9c5db199SXin Li
832*9c5db199SXin Li        collect_crashinfo = True
833*9c5db199SXin Li        temp_control_file_dir = None
834*9c5db199SXin Li        try:
835*9c5db199SXin Li            try:
836*9c5db199SXin Li                if not self.fast:
837*9c5db199SXin Li                    with metrics.SecondsTimer(
838*9c5db199SXin Li                            'chromeos/autotest/job/get_network_stats',
839*9c5db199SXin Li                            fields = {'stage': 'start'}):
840*9c5db199SXin Li                        namespace['network_stats_label'] = 'at-start'
841*9c5db199SXin Li                        self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
842*9c5db199SXin Li                                           namespace)
843*9c5db199SXin Li
844*9c5db199SXin Li                if only_collect_crashinfo:
845*9c5db199SXin Li                    return
846*9c5db199SXin Li
847*9c5db199SXin Li                # If the verify_job_repo_url option is set but we're unable
848*9c5db199SXin Li                # to actually verify that the job_repo_url contains the autotest
849*9c5db199SXin Li                # package, this job will fail.
850*9c5db199SXin Li                if verify_job_repo_url:
851*9c5db199SXin Li                    self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
852*9c5db199SXin Li                                       namespace)
853*9c5db199SXin Li                else:
854*9c5db199SXin Li                    logging.warning('Not checking if job_repo_url contains '
855*9c5db199SXin Li                                    'autotest packages on %s', machines)
856*9c5db199SXin Li
857*9c5db199SXin Li                # determine the dir to write the control files to
858*9c5db199SXin Li                cfd_specified = (control_file_dir
859*9c5db199SXin Li                                 and control_file_dir is not self._USE_TEMP_DIR)
860*9c5db199SXin Li                if cfd_specified:
861*9c5db199SXin Li                    temp_control_file_dir = None
862*9c5db199SXin Li                else:
863*9c5db199SXin Li                    temp_control_file_dir = tempfile.mkdtemp(
864*9c5db199SXin Li                        suffix='temp_control_file_dir')
865*9c5db199SXin Li                    control_file_dir = temp_control_file_dir
866*9c5db199SXin Li                server_control_file = os.path.join(control_file_dir,
867*9c5db199SXin Li                                                   self._control_filename)
868*9c5db199SXin Li                client_control_file = os.path.join(control_file_dir,
869*9c5db199SXin Li                                                   CLIENT_CONTROL_FILENAME)
870*9c5db199SXin Li                if self._client:
871*9c5db199SXin Li                    namespace['control'] = control
872*9c5db199SXin Li                    utils.open_write_close(client_control_file, control)
873*9c5db199SXin Li                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
874*9c5db199SXin Li                                    server_control_file)
875*9c5db199SXin Li                else:
876*9c5db199SXin Li                    utils.open_write_close(server_control_file, control)
877*9c5db199SXin Li
878*9c5db199SXin Li                sync_dir = self._offload_dir_target_path()
879*9c5db199SXin Li                if self._sync_offload_dir:
880*9c5db199SXin Li                    logging.info("Preparing synchronous offload dir")
881*9c5db199SXin Li                    self.create_marker_file()
882*9c5db199SXin Li                    logging.info("Offload dir and marker file ready")
883*9c5db199SXin Li                    logging.debug("Results dir is %s", self.resultdir)
884*9c5db199SXin Li                    logging.debug("Synchronous offload dir is %s", sync_dir)
885*9c5db199SXin Li                logging.info("Processing control file")
886*9c5db199SXin Li                if self._companion_hosts:
887*9c5db199SXin Li                    namespace['companion_hosts'] = self._companion_hosts
888*9c5db199SXin Li                if self._dut_servers:
889*9c5db199SXin Li                    namespace['dut_servers'] = self._dut_servers
890*9c5db199SXin Li                namespace['use_packaging'] = use_packaging
891*9c5db199SXin Li                namespace['synchronous_offload_dir'] = sync_dir
892*9c5db199SXin Li                namespace['extended_timeout'] = self.extended_timeout
893*9c5db199SXin Li                namespace['is_cft'] = self._is_cft
894*9c5db199SXin Li                os.environ[OFFLOAD_ENVVAR] = sync_dir
895*9c5db199SXin Li                self._execute_code(server_control_file, namespace)
896*9c5db199SXin Li                logging.info("Finished processing control file")
897*9c5db199SXin Li                self._maybe_retrieve_client_offload_dirs()
898*9c5db199SXin Li                # If no device error occurred, no need to collect crashinfo.
899*9c5db199SXin Li                collect_crashinfo = self.failed_with_device_error
900*9c5db199SXin Li            except Exception as e:
901*9c5db199SXin Li                try:
902*9c5db199SXin Li                    logging.exception(
903*9c5db199SXin Li                            'Exception escaped control file, job aborting:')
904*9c5db199SXin Li                    reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
905*9c5db199SXin Li                                    ' ', str(e))
906*9c5db199SXin Li                    self.record('INFO', None, None, str(e),
907*9c5db199SXin Li                                {'job_abort_reason': reason})
908*9c5db199SXin Li                except:
909*9c5db199SXin Li                    pass # don't let logging exceptions here interfere
910*9c5db199SXin Li                raise
911*9c5db199SXin Li        finally:
912*9c5db199SXin Li            if temp_control_file_dir:
913*9c5db199SXin Li                # Clean up temp directory used for copies of the control files
914*9c5db199SXin Li                try:
915*9c5db199SXin Li                    shutil.rmtree(temp_control_file_dir)
916*9c5db199SXin Li                except Exception as e:
917*9c5db199SXin Li                    logging.warning('Could not remove temp directory %s: %s',
918*9c5db199SXin Li                                 temp_control_file_dir, e)
919*9c5db199SXin Li
920*9c5db199SXin Li            if machines and (collect_crashdumps or collect_crashinfo):
921*9c5db199SXin Li                if skip_crash_collection or self.fast:
922*9c5db199SXin Li                    logging.info('Skipping crash dump/info collection '
923*9c5db199SXin Li                                 'as requested.')
924*9c5db199SXin Li                else:
925*9c5db199SXin Li                    with metrics.SecondsTimer(
926*9c5db199SXin Li                            'chromeos/autotest/job/collect_crashinfo'):
927*9c5db199SXin Li                        namespace['test_start_time'] = test_start_time
928*9c5db199SXin Li                        # Remove crash files for passing tests.
929*9c5db199SXin Li                        # TODO(ayatane): Tests that create crash files should be
930*9c5db199SXin Li                        # reported.
931*9c5db199SXin Li                        namespace['has_failed_tests'] = self._has_failed_tests()
932*9c5db199SXin Li                        self._collect_crashes(namespace, collect_crashinfo)
933*9c5db199SXin Li            self.disable_external_logging()
934*9c5db199SXin Li            if self._uncollected_log_file and created_uncollected_logs:
935*9c5db199SXin Li                os.remove(self._uncollected_log_file)
936*9c5db199SXin Li
937*9c5db199SXin Li            if not self.fast:
938*9c5db199SXin Li                with metrics.SecondsTimer(
939*9c5db199SXin Li                        'chromeos/autotest/job/get_network_stats',
940*9c5db199SXin Li                        fields = {'stage': 'end'}):
941*9c5db199SXin Li                    namespace['network_stats_label'] = 'at-end'
942*9c5db199SXin Li                    self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
943*9c5db199SXin Li                                       namespace)
944*9c5db199SXin Li
945*9c5db199SXin Li    def _server_offload_dir_path(self):
946*9c5db199SXin Li        return os.path.join(self.resultdir, self._sync_offload_dir)
947*9c5db199SXin Li
948*9c5db199SXin Li    def _offload_dir_target_path(self):
949*9c5db199SXin Li        if not self._sync_offload_dir:
950*9c5db199SXin Li            return ''
951*9c5db199SXin Li        if self._client:
952*9c5db199SXin Li            return os.path.join(DUT_STATEFUL_PATH, self._sync_offload_dir)
953*9c5db199SXin Li        return os.path.join(self.resultdir, self._sync_offload_dir)
954*9c5db199SXin Li
955*9c5db199SXin Li    def _maybe_retrieve_client_offload_dirs(self):
956*9c5db199SXin Li        if not(self._sync_offload_dir and self._client):
957*9c5db199SXin Li            logging.info("No client dir to retrieve.")
958*9c5db199SXin Li            return ''
959*9c5db199SXin Li        logging.info("Retrieving synchronous offload dir from client")
960*9c5db199SXin Li        server_path = self._server_offload_dir_path()
961*9c5db199SXin Li        client_path = self._offload_dir_target_path()
962*9c5db199SXin Li        def serial(machine):
963*9c5db199SXin Li            host = hosts.create_host(machine)
964*9c5db199SXin Li            server_subpath = os.path.join(server_path, host.hostname)
965*9c5db199SXin Li            # Empty final piece ensures that get_file gets a trailing slash,
966*9c5db199SXin Li            #  which makes it copy dir contents rather than the dir itself.
967*9c5db199SXin Li            client_subpath = os.path.join(client_path, '')
968*9c5db199SXin Li            logging.debug("Client dir to retrieve is %s", client_subpath)
969*9c5db199SXin Li            os.makedirs(server_subpath)
970*9c5db199SXin Li            host.get_file(client_subpath, server_subpath)
971*9c5db199SXin Li        self.parallel_simple(serial, self.machines)
972*9c5db199SXin Li        logging.debug("Synchronous offload dir retrieved to %s", server_path)
973*9c5db199SXin Li        return server_path
974*9c5db199SXin Li
975*9c5db199SXin Li    def _create_client_offload_dirs(self):
976*9c5db199SXin Li        marker_string = "client %s%s" % (
977*9c5db199SXin Li            "in SSP " if utils.is_in_container() else "",
978*9c5db199SXin Li            str(datetime.utcnow())
979*9c5db199SXin Li        )
980*9c5db199SXin Li        offload_path = self._offload_dir_target_path()
981*9c5db199SXin Li        _, file_path = tempfile.mkstemp()
982*9c5db199SXin Li        def serial(machine):
983*9c5db199SXin Li            host = hosts.create_host(machine)
984*9c5db199SXin Li            marker_path = os.path.join(offload_path, "sync_offloads_marker")
985*9c5db199SXin Li            host.run(("mkdir -p %s" % offload_path), ignore_status=False)
986*9c5db199SXin Li            host.send_file(file_path, marker_path)
987*9c5db199SXin Li
988*9c5db199SXin Li        try:
989*9c5db199SXin Li            utils.open_write_close(file_path, marker_string)
990*9c5db199SXin Li            self.parallel_simple(serial, self.machines)
991*9c5db199SXin Li        finally:
992*9c5db199SXin Li            os.remove(file_path)
993*9c5db199SXin Li
994*9c5db199SXin Li
995*9c5db199SXin Li    def create_marker_file(self):
996*9c5db199SXin Li        """Create a marker file in the leaf task's synchronous offload dir.
997*9c5db199SXin Li
998*9c5db199SXin Li        This ensures that we will get some results offloaded if the test fails
999*9c5db199SXin Li        to create output properly, distinguishing offload errs from test errs.
1000*9c5db199SXin Li        @obj_param _client: Boolean, whether the control file is client-side.
1001*9c5db199SXin Li        @obj_param _sync_offload_dir: rel. path from results dir to offload dir.
1002*9c5db199SXin Li
1003*9c5db199SXin Li        @returns: path to offload dir on the machine of the leaf task
1004*9c5db199SXin Li        """
1005*9c5db199SXin Li        # Make the server-side directory regardless
1006*9c5db199SXin Li        try:
1007*9c5db199SXin Li            # 2.7 makedirs doesn't have an option for pre-existing directories
1008*9c5db199SXin Li            os.makedirs(self._server_offload_dir_path())
1009*9c5db199SXin Li        except OSError as e:
1010*9c5db199SXin Li            if e.errno != errno.EEXIST:
1011*9c5db199SXin Li                raise
1012*9c5db199SXin Li        if not self._client:
1013*9c5db199SXin Li            offload_path = self._offload_dir_target_path()
1014*9c5db199SXin Li            marker_string = "server %s%s" % (
1015*9c5db199SXin Li                "in SSP " if utils.is_in_container() else "",
1016*9c5db199SXin Li                str(datetime.utcnow())
1017*9c5db199SXin Li            )
1018*9c5db199SXin Li            utils.open_write_close(
1019*9c5db199SXin Li                os.path.join(offload_path, "sync_offloads_marker"),
1020*9c5db199SXin Li                marker_string
1021*9c5db199SXin Li            )
1022*9c5db199SXin Li            return offload_path
1023*9c5db199SXin Li        return self._create_client_offload_dirs()
1024*9c5db199SXin Li
1025*9c5db199SXin Li    def run_test(self, url, *args, **dargs):
1026*9c5db199SXin Li        """
1027*9c5db199SXin Li        Summon a test object and run it.
1028*9c5db199SXin Li
1029*9c5db199SXin Li        tag
1030*9c5db199SXin Li                tag to add to testname
1031*9c5db199SXin Li        url
1032*9c5db199SXin Li                url of the test to run
1033*9c5db199SXin Li        """
1034*9c5db199SXin Li        if self._disable_sysinfo:
1035*9c5db199SXin Li            dargs['disable_sysinfo'] = True
1036*9c5db199SXin Li
1037*9c5db199SXin Li        group, testname = self.pkgmgr.get_package_name(url, 'test')
1038*9c5db199SXin Li        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
1039*9c5db199SXin Li        outputdir = self._make_test_outputdir(subdir)
1040*9c5db199SXin Li
1041*9c5db199SXin Li        def group_func():
1042*9c5db199SXin Li            try:
1043*9c5db199SXin Li                test.runtest(self, url, tag, args, dargs)
1044*9c5db199SXin Li            except error.TestBaseException as e:
1045*9c5db199SXin Li                self.record(e.exit_status, subdir, testname, str(e))
1046*9c5db199SXin Li                raise
1047*9c5db199SXin Li            except Exception as e:
1048*9c5db199SXin Li                info = str(e) + "\n" + traceback.format_exc()
1049*9c5db199SXin Li                self.record('FAIL', subdir, testname, info)
1050*9c5db199SXin Li                raise
1051*9c5db199SXin Li            else:
1052*9c5db199SXin Li                self.record('GOOD', subdir, testname, 'completed successfully')
1053*9c5db199SXin Li
1054*9c5db199SXin Li        try:
1055*9c5db199SXin Li            result = self._run_group(testname, subdir, group_func)
1056*9c5db199SXin Li        except error.TestBaseException as e:
1057*9c5db199SXin Li            return False
1058*9c5db199SXin Li        else:
1059*9c5db199SXin Li            return True
1060*9c5db199SXin Li
1061*9c5db199SXin Li
1062*9c5db199SXin Li    def _run_group(self, name, subdir, function, *args, **dargs):
1063*9c5db199SXin Li        """Underlying method for running something inside of a group."""
1064*9c5db199SXin Li        result, exc_info = None, None
1065*9c5db199SXin Li        try:
1066*9c5db199SXin Li            self.record('START', subdir, name)
1067*9c5db199SXin Li            result = function(*args, **dargs)
1068*9c5db199SXin Li        except error.TestBaseException as e:
1069*9c5db199SXin Li            self.record("END %s" % e.exit_status, subdir, name)
1070*9c5db199SXin Li            raise
1071*9c5db199SXin Li        except Exception as e:
1072*9c5db199SXin Li            err_msg = str(e) + '\n'
1073*9c5db199SXin Li            err_msg += traceback.format_exc()
1074*9c5db199SXin Li            self.record('END ABORT', subdir, name, err_msg)
1075*9c5db199SXin Li            raise error.JobError(name + ' failed\n' + traceback.format_exc())
1076*9c5db199SXin Li        else:
1077*9c5db199SXin Li            self.record('END GOOD', subdir, name)
1078*9c5db199SXin Li        finally:
1079*9c5db199SXin Li            for hook in self._post_run_hooks:
1080*9c5db199SXin Li                hook()
1081*9c5db199SXin Li
1082*9c5db199SXin Li        return result
1083*9c5db199SXin Li
1084*9c5db199SXin Li
1085*9c5db199SXin Li    def run_group(self, function, *args, **dargs):
1086*9c5db199SXin Li        """\
1087*9c5db199SXin Li        @param function: subroutine to run
1088*9c5db199SXin Li        @returns: (result, exc_info). When the call succeeds, result contains
1089*9c5db199SXin Li                the return value of |function| and exc_info is None. If
1090*9c5db199SXin Li                |function| raises an exception, exc_info contains the tuple
1091*9c5db199SXin Li                returned by sys.exc_info(), and result is None.
1092*9c5db199SXin Li        """
1093*9c5db199SXin Li
1094*9c5db199SXin Li        name = function.__name__
1095*9c5db199SXin Li        # Allow the tag for the group to be specified.
1096*9c5db199SXin Li        tag = dargs.pop('tag', None)
1097*9c5db199SXin Li        if tag:
1098*9c5db199SXin Li            name = tag
1099*9c5db199SXin Li
1100*9c5db199SXin Li        try:
1101*9c5db199SXin Li            result = self._run_group(name, None, function, *args, **dargs)[0]
1102*9c5db199SXin Li        except error.TestBaseException:
1103*9c5db199SXin Li            return None, sys.exc_info()
1104*9c5db199SXin Li        return result, None
1105*9c5db199SXin Li
1106*9c5db199SXin Li
1107*9c5db199SXin Li    def run_op(self, op, op_func, get_kernel_func):
1108*9c5db199SXin Li        """\
1109*9c5db199SXin Li        A specialization of run_group meant specifically for handling
1110*9c5db199SXin Li        management operation. Includes support for capturing the kernel version
1111*9c5db199SXin Li        after the operation.
1112*9c5db199SXin Li
1113*9c5db199SXin Li        Args:
1114*9c5db199SXin Li           op: name of the operation.
1115*9c5db199SXin Li           op_func: a function that carries out the operation (reboot, suspend)
1116*9c5db199SXin Li           get_kernel_func: a function that returns a string
1117*9c5db199SXin Li                            representing the kernel version.
1118*9c5db199SXin Li        """
1119*9c5db199SXin Li        try:
1120*9c5db199SXin Li            self.record('START', None, op)
1121*9c5db199SXin Li            op_func()
1122*9c5db199SXin Li        except Exception as e:
1123*9c5db199SXin Li            err_msg = str(e) + '\n' + traceback.format_exc()
1124*9c5db199SXin Li            self.record('END FAIL', None, op, err_msg)
1125*9c5db199SXin Li            raise
1126*9c5db199SXin Li        else:
1127*9c5db199SXin Li            kernel = get_kernel_func()
1128*9c5db199SXin Li            self.record('END GOOD', None, op,
1129*9c5db199SXin Li                        optional_fields={"kernel": kernel})
1130*9c5db199SXin Li
1131*9c5db199SXin Li
1132*9c5db199SXin Li    def run_control(self, path):
1133*9c5db199SXin Li        """Execute a control file found at path (relative to the autotest
1134*9c5db199SXin Li        path). Intended for executing a control file within a control file,
1135*9c5db199SXin Li        not for running the top-level job control file."""
1136*9c5db199SXin Li        path = os.path.join(self.autodir, path)
1137*9c5db199SXin Li        control_file = self._load_control_file(path)
1138*9c5db199SXin Li        self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
1139*9c5db199SXin Li
1140*9c5db199SXin Li
1141*9c5db199SXin Li    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1142*9c5db199SXin Li        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1143*9c5db199SXin Li                                   on_every_test)
1144*9c5db199SXin Li
1145*9c5db199SXin Li
1146*9c5db199SXin Li    def add_sysinfo_logfile(self, file, on_every_test=False):
1147*9c5db199SXin Li        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1148*9c5db199SXin Li
1149*9c5db199SXin Li
1150*9c5db199SXin Li    def _add_sysinfo_loggable(self, loggable, on_every_test):
1151*9c5db199SXin Li        if on_every_test:
1152*9c5db199SXin Li            self.sysinfo.test_loggables.add(loggable)
1153*9c5db199SXin Li        else:
1154*9c5db199SXin Li            self.sysinfo.boot_loggables.add(loggable)
1155*9c5db199SXin Li
1156*9c5db199SXin Li
1157*9c5db199SXin Li    def _read_warnings(self):
1158*9c5db199SXin Li        """Poll all the warning loggers and extract any new warnings that have
1159*9c5db199SXin Li        been logged. If the warnings belong to a category that is currently
1160*9c5db199SXin Li        disabled, this method will discard them and they will no longer be
1161*9c5db199SXin Li        retrievable.
1162*9c5db199SXin Li
1163*9c5db199SXin Li        Returns a list of (timestamp, message) tuples, where timestamp is an
1164*9c5db199SXin Li        integer epoch timestamp."""
1165*9c5db199SXin Li        warnings = []
1166*9c5db199SXin Li        while True:
1167*9c5db199SXin Li            # pull in a line of output from every logger that has
1168*9c5db199SXin Li            # output ready to be read
1169*9c5db199SXin Li            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
1170*9c5db199SXin Li            closed_loggers = set()
1171*9c5db199SXin Li            for logger in loggers:
1172*9c5db199SXin Li                line = logger.readline()
1173*9c5db199SXin Li                # record any broken pipes (aka line == empty)
1174*9c5db199SXin Li                if len(line) == 0:
1175*9c5db199SXin Li                    closed_loggers.add(logger)
1176*9c5db199SXin Li                    continue
1177*9c5db199SXin Li                # parse out the warning
1178*9c5db199SXin Li                timestamp, msgtype, msg = line.split('\t', 2)
1179*9c5db199SXin Li                timestamp = int(timestamp)
1180*9c5db199SXin Li                # if the warning is valid, add it to the results
1181*9c5db199SXin Li                if self.warning_manager.is_valid(timestamp, msgtype):
1182*9c5db199SXin Li                    warnings.append((timestamp, msg.strip()))
1183*9c5db199SXin Li
1184*9c5db199SXin Li            # stop listening to loggers that are closed
1185*9c5db199SXin Li            self.warning_loggers -= closed_loggers
1186*9c5db199SXin Li
1187*9c5db199SXin Li            # stop if none of the loggers have any output left
1188*9c5db199SXin Li            if not loggers:
1189*9c5db199SXin Li                break
1190*9c5db199SXin Li
1191*9c5db199SXin Li        # sort into timestamp order
1192*9c5db199SXin Li        warnings.sort()
1193*9c5db199SXin Li        return warnings
1194*9c5db199SXin Li
1195*9c5db199SXin Li
1196*9c5db199SXin Li    def _unique_subdirectory(self, base_subdirectory_name):
1197*9c5db199SXin Li        """Compute a unique results subdirectory based on the given name.
1198*9c5db199SXin Li
1199*9c5db199SXin Li        Appends base_subdirectory_name with a number as necessary to find a
1200*9c5db199SXin Li        directory name that doesn't already exist.
1201*9c5db199SXin Li        """
1202*9c5db199SXin Li        subdirectory = base_subdirectory_name
1203*9c5db199SXin Li        counter = 1
1204*9c5db199SXin Li        while os.path.exists(os.path.join(self.resultdir, subdirectory)):
1205*9c5db199SXin Li            subdirectory = base_subdirectory_name + '.' + str(counter)
1206*9c5db199SXin Li            counter += 1
1207*9c5db199SXin Li        return subdirectory
1208*9c5db199SXin Li
1209*9c5db199SXin Li
1210*9c5db199SXin Li    def get_record_context(self):
1211*9c5db199SXin Li        """Returns an object representing the current job.record context.
1212*9c5db199SXin Li
1213*9c5db199SXin Li        The object returned is an opaque object with a 0-arg restore method
1214*9c5db199SXin Li        which can be called to restore the job.record context (i.e. indentation)
1215*9c5db199SXin Li        to the current level. The intention is that it should be used when
1216*9c5db199SXin Li        something external which generate job.record calls (e.g. an autotest
1217*9c5db199SXin Li        client) can fail catastrophically and the server job record state
1218*9c5db199SXin Li        needs to be reset to its original "known good" state.
1219*9c5db199SXin Li
1220*9c5db199SXin Li        @return: A context object with a 0-arg restore() method."""
1221*9c5db199SXin Li        return self._indenter.get_context()
1222*9c5db199SXin Li
1223*9c5db199SXin Li
1224*9c5db199SXin Li    def record_summary(self, status_code, test_name, reason='', attributes=None,
1225*9c5db199SXin Li                       distinguishing_attributes=(), child_test_ids=None):
1226*9c5db199SXin Li        """Record a summary test result.
1227*9c5db199SXin Li
1228*9c5db199SXin Li        @param status_code: status code string, see
1229*9c5db199SXin Li                common_lib.log.is_valid_status()
1230*9c5db199SXin Li        @param test_name: name of the test
1231*9c5db199SXin Li        @param reason: (optional) string providing detailed reason for test
1232*9c5db199SXin Li                outcome
1233*9c5db199SXin Li        @param attributes: (optional) dict of string keyvals to associate with
1234*9c5db199SXin Li                this result
1235*9c5db199SXin Li        @param distinguishing_attributes: (optional) list of attribute names
1236*9c5db199SXin Li                that should be used to distinguish identically-named test
1237*9c5db199SXin Li                results.  These attributes should be present in the attributes
1238*9c5db199SXin Li                parameter.  This is used to generate user-friendly subdirectory
1239*9c5db199SXin Li                names.
1240*9c5db199SXin Li        @param child_test_ids: (optional) list of test indices for test results
1241*9c5db199SXin Li                used in generating this result.
1242*9c5db199SXin Li        """
1243*9c5db199SXin Li        subdirectory_name_parts = [test_name]
1244*9c5db199SXin Li        for attribute in distinguishing_attributes:
1245*9c5db199SXin Li            assert attributes
1246*9c5db199SXin Li            assert attribute in attributes, '%s not in %s' % (attribute,
1247*9c5db199SXin Li                                                              attributes)
1248*9c5db199SXin Li            subdirectory_name_parts.append(attributes[attribute])
1249*9c5db199SXin Li        base_subdirectory_name = '.'.join(subdirectory_name_parts)
1250*9c5db199SXin Li
1251*9c5db199SXin Li        subdirectory = self._unique_subdirectory(base_subdirectory_name)
1252*9c5db199SXin Li        subdirectory_path = os.path.join(self.resultdir, subdirectory)
1253*9c5db199SXin Li        os.mkdir(subdirectory_path)
1254*9c5db199SXin Li
1255*9c5db199SXin Li        self.record(status_code, subdirectory, test_name,
1256*9c5db199SXin Li                    status=reason, optional_fields={'is_summary': True})
1257*9c5db199SXin Li
1258*9c5db199SXin Li        if attributes:
1259*9c5db199SXin Li            utils.write_keyval(subdirectory_path, attributes)
1260*9c5db199SXin Li
1261*9c5db199SXin Li        if child_test_ids:
1262*9c5db199SXin Li            ids_string = ','.join(str(test_id) for test_id in child_test_ids)
1263*9c5db199SXin Li            summary_data = {'child_test_ids': ids_string}
1264*9c5db199SXin Li            utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
1265*9c5db199SXin Li                               summary_data)
1266*9c5db199SXin Li
1267*9c5db199SXin Li
1268*9c5db199SXin Li    def add_post_run_hook(self, hook):
1269*9c5db199SXin Li        """
1270*9c5db199SXin Li        Registers a hook to run after the main job function.
1271*9c5db199SXin Li
1272*9c5db199SXin Li        This provides a mechanism by which tests that perform multiple tests of
1273*9c5db199SXin Li        their own can write additional top-level results to the TKO status.log
1274*9c5db199SXin Li        file.
1275*9c5db199SXin Li
1276*9c5db199SXin Li        @param hook: Function to invoke (without any args) after the main job
1277*9c5db199SXin Li            function completes and the job status is logged.
1278*9c5db199SXin Li        """
1279*9c5db199SXin Li        self._post_run_hooks.append(hook)
1280*9c5db199SXin Li
1281*9c5db199SXin Li
1282*9c5db199SXin Li    def disable_warnings(self, warning_type):
1283*9c5db199SXin Li        self.warning_manager.disable_warnings(warning_type)
1284*9c5db199SXin Li        self.record("INFO", None, None,
1285*9c5db199SXin Li                    "disabling %s warnings" % warning_type,
1286*9c5db199SXin Li                    {"warnings.disable": warning_type})
1287*9c5db199SXin Li
1288*9c5db199SXin Li
1289*9c5db199SXin Li    def enable_warnings(self, warning_type):
1290*9c5db199SXin Li        self.warning_manager.enable_warnings(warning_type)
1291*9c5db199SXin Li        self.record("INFO", None, None,
1292*9c5db199SXin Li                    "enabling %s warnings" % warning_type,
1293*9c5db199SXin Li                    {"warnings.enable": warning_type})
1294*9c5db199SXin Li
1295*9c5db199SXin Li
1296*9c5db199SXin Li    def get_status_log_path(self, subdir=None):
1297*9c5db199SXin Li        """Return the path to the job status log.
1298*9c5db199SXin Li
1299*9c5db199SXin Li        @param subdir - Optional paramter indicating that you want the path
1300*9c5db199SXin Li            to a subdirectory status log.
1301*9c5db199SXin Li
1302*9c5db199SXin Li        @returns The path where the status log should be.
1303*9c5db199SXin Li        """
1304*9c5db199SXin Li        if self.resultdir:
1305*9c5db199SXin Li            if subdir:
1306*9c5db199SXin Li                return os.path.join(self.resultdir, subdir, "status.log")
1307*9c5db199SXin Li            else:
1308*9c5db199SXin Li                return os.path.join(self.resultdir, "status.log")
1309*9c5db199SXin Li        else:
1310*9c5db199SXin Li            return None
1311*9c5db199SXin Li
1312*9c5db199SXin Li
1313*9c5db199SXin Li    def _update_uncollected_logs_list(self, update_func):
1314*9c5db199SXin Li        """Updates the uncollected logs list in a multi-process safe manner.
1315*9c5db199SXin Li
1316*9c5db199SXin Li        @param update_func - a function that updates the list of uncollected
1317*9c5db199SXin Li            logs. Should take one parameter, the list to be updated.
1318*9c5db199SXin Li        """
1319*9c5db199SXin Li        # Skip log collection if file _uncollected_log_file does not exist.
1320*9c5db199SXin Li        if not (self._uncollected_log_file and
1321*9c5db199SXin Li                os.path.exists(self._uncollected_log_file)):
1322*9c5db199SXin Li            return
1323*9c5db199SXin Li        if self._uncollected_log_file:
1324*9c5db199SXin Li            log_file = open(self._uncollected_log_file, "rb+")
1325*9c5db199SXin Li            fcntl.flock(log_file, fcntl.LOCK_EX)
1326*9c5db199SXin Li        try:
1327*9c5db199SXin Li            uncollected_logs = pickle.load(log_file)
1328*9c5db199SXin Li            update_func(uncollected_logs)
1329*9c5db199SXin Li            log_file.seek(0)
1330*9c5db199SXin Li            log_file.truncate()
1331*9c5db199SXin Li            pickle.dump(uncollected_logs, log_file)
1332*9c5db199SXin Li            log_file.flush()
1333*9c5db199SXin Li        finally:
1334*9c5db199SXin Li            fcntl.flock(log_file, fcntl.LOCK_UN)
1335*9c5db199SXin Li            log_file.close()
1336*9c5db199SXin Li
1337*9c5db199SXin Li
1338*9c5db199SXin Li    def add_client_log(self, hostname, remote_path, local_path):
1339*9c5db199SXin Li        """Adds a new set of client logs to the list of uncollected logs,
1340*9c5db199SXin Li        to allow for future log recovery.
1341*9c5db199SXin Li
1342*9c5db199SXin Li        @param host - the hostname of the machine holding the logs
1343*9c5db199SXin Li        @param remote_path - the directory on the remote machine holding logs
1344*9c5db199SXin Li        @param local_path - the local directory to copy the logs into
1345*9c5db199SXin Li        """
1346*9c5db199SXin Li        def update_func(logs_list):
1347*9c5db199SXin Li            logs_list.append((hostname, remote_path, local_path))
1348*9c5db199SXin Li        self._update_uncollected_logs_list(update_func)
1349*9c5db199SXin Li
1350*9c5db199SXin Li
1351*9c5db199SXin Li    def remove_client_log(self, hostname, remote_path, local_path):
1352*9c5db199SXin Li        """Removes a set of client logs from the list of uncollected logs,
1353*9c5db199SXin Li        to allow for future log recovery.
1354*9c5db199SXin Li
1355*9c5db199SXin Li        @param host - the hostname of the machine holding the logs
1356*9c5db199SXin Li        @param remote_path - the directory on the remote machine holding logs
1357*9c5db199SXin Li        @param local_path - the local directory to copy the logs into
1358*9c5db199SXin Li        """
1359*9c5db199SXin Li        def update_func(logs_list):
1360*9c5db199SXin Li            logs_list.remove((hostname, remote_path, local_path))
1361*9c5db199SXin Li        self._update_uncollected_logs_list(update_func)
1362*9c5db199SXin Li
1363*9c5db199SXin Li
1364*9c5db199SXin Li    def get_client_logs(self):
1365*9c5db199SXin Li        """Retrieves the list of uncollected logs, if it exists.
1366*9c5db199SXin Li
1367*9c5db199SXin Li        @returns A list of (host, remote_path, local_path) tuples. Returns
1368*9c5db199SXin Li                 an empty list if no uncollected logs file exists.
1369*9c5db199SXin Li        """
1370*9c5db199SXin Li        log_exists = (self._uncollected_log_file and
1371*9c5db199SXin Li                      os.path.exists(self._uncollected_log_file))
1372*9c5db199SXin Li        if log_exists:
1373*9c5db199SXin Li            return pickle.load(open(self._uncollected_log_file))
1374*9c5db199SXin Li        else:
1375*9c5db199SXin Li            return []
1376*9c5db199SXin Li
1377*9c5db199SXin Li
1378*9c5db199SXin Li    def _fill_server_control_namespace(self, namespace, protect=True):
1379*9c5db199SXin Li        """
1380*9c5db199SXin Li        Prepare a namespace to be used when executing server control files.
1381*9c5db199SXin Li
1382*9c5db199SXin Li        This sets up the control file API by importing modules and making them
1383*9c5db199SXin Li        available under the appropriate names within namespace.
1384*9c5db199SXin Li
1385*9c5db199SXin Li        For use by _execute_code().
1386*9c5db199SXin Li
1387*9c5db199SXin Li        Args:
1388*9c5db199SXin Li          namespace: The namespace dictionary to fill in.
1389*9c5db199SXin Li          protect: Boolean.  If True (the default) any operation that would
1390*9c5db199SXin Li              clobber an existing entry in namespace will cause an error.
1391*9c5db199SXin Li        Raises:
1392*9c5db199SXin Li          error.AutoservError: When a name would be clobbered by import.
1393*9c5db199SXin Li        """
1394*9c5db199SXin Li        def _import_names(module_name, names=()):
1395*9c5db199SXin Li            """
1396*9c5db199SXin Li            Import a module and assign named attributes into namespace.
1397*9c5db199SXin Li
1398*9c5db199SXin Li            Args:
1399*9c5db199SXin Li                module_name: The string module name.
1400*9c5db199SXin Li                names: A limiting list of names to import from module_name.  If
1401*9c5db199SXin Li                    empty (the default), all names are imported from the module
1402*9c5db199SXin Li                    similar to a "from foo.bar import *" statement.
1403*9c5db199SXin Li            Raises:
1404*9c5db199SXin Li                error.AutoservError: When a name being imported would clobber
1405*9c5db199SXin Li                    a name already in namespace.
1406*9c5db199SXin Li            """
1407*9c5db199SXin Li            module = __import__(module_name, {}, {}, names)
1408*9c5db199SXin Li
1409*9c5db199SXin Li            # No names supplied?  Import * from the lowest level module.
1410*9c5db199SXin Li            # (Ugh, why do I have to implement this part myself?)
1411*9c5db199SXin Li            if not names:
1412*9c5db199SXin Li                for submodule_name in module_name.split('.')[1:]:
1413*9c5db199SXin Li                    module = getattr(module, submodule_name)
1414*9c5db199SXin Li                if hasattr(module, '__all__'):
1415*9c5db199SXin Li                    names = getattr(module, '__all__')
1416*9c5db199SXin Li                else:
1417*9c5db199SXin Li                    names = dir(module)
1418*9c5db199SXin Li
1419*9c5db199SXin Li            # Install each name into namespace, checking to make sure it
1420*9c5db199SXin Li            # doesn't override anything that already exists.
1421*9c5db199SXin Li            for name in names:
1422*9c5db199SXin Li                # Check for conflicts to help prevent future problems.
1423*9c5db199SXin Li                if name in namespace and protect:
1424*9c5db199SXin Li                    if namespace[name] is not getattr(module, name):
1425*9c5db199SXin Li                        raise error.AutoservError('importing name '
1426*9c5db199SXin Li                                '%s from %s %r would override %r' %
1427*9c5db199SXin Li                                (name, module_name, getattr(module, name),
1428*9c5db199SXin Li                                 namespace[name]))
1429*9c5db199SXin Li                    else:
1430*9c5db199SXin Li                        # Encourage cleanliness and the use of __all__ for a
1431*9c5db199SXin Li                        # more concrete API with less surprises on '*' imports.
1432*9c5db199SXin Li                        warnings.warn('%s (%r) being imported from %s for use '
1433*9c5db199SXin Li                                      'in server control files is not the '
1434*9c5db199SXin Li                                      'first occurrence of that import.' %
1435*9c5db199SXin Li                                      (name, namespace[name], module_name))
1436*9c5db199SXin Li
1437*9c5db199SXin Li                namespace[name] = getattr(module, name)
1438*9c5db199SXin Li
1439*9c5db199SXin Li
1440*9c5db199SXin Li        # This is the equivalent of prepending a bunch of import statements to
1441*9c5db199SXin Li        # the front of the control script.
1442*9c5db199SXin Li        namespace.update(os=os, sys=sys, logging=logging)
1443*9c5db199SXin Li        _import_names('autotest_lib.server',
1444*9c5db199SXin Li                ('hosts', 'autotest', 'standalone_profiler'))
1445*9c5db199SXin Li        _import_names('autotest_lib.server.subcommand',
1446*9c5db199SXin Li                      ('parallel', 'parallel_simple', 'subcommand'))
1447*9c5db199SXin Li        _import_names('autotest_lib.server.utils',
1448*9c5db199SXin Li                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
1449*9c5db199SXin Li        _import_names('autotest_lib.client.common_lib.error')
1450*9c5db199SXin Li        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
1451*9c5db199SXin Li
1452*9c5db199SXin Li        # Inject ourself as the job object into other classes within the API.
1453*9c5db199SXin Li        # (Yuck, this injection is a gross thing be part of a public API. -gps)
1454*9c5db199SXin Li        #
1455*9c5db199SXin Li        # XXX Autotest does not appear to use .job.  Who does?
1456*9c5db199SXin Li        namespace['autotest'].Autotest.job = self
1457*9c5db199SXin Li        # server.hosts.base_classes.Host uses .job.
1458*9c5db199SXin Li        namespace['hosts'].Host.job = self
1459*9c5db199SXin Li        namespace['hosts'].factory.ssh_user = self._ssh_user
1460*9c5db199SXin Li        namespace['hosts'].factory.ssh_port = self._ssh_port
1461*9c5db199SXin Li        namespace['hosts'].factory.ssh_pass = self._ssh_pass
1462*9c5db199SXin Li        namespace['hosts'].factory.ssh_verbosity_flag = (
1463*9c5db199SXin Li                self._ssh_verbosity_flag)
1464*9c5db199SXin Li        namespace['hosts'].factory.ssh_options = self._ssh_options
1465*9c5db199SXin Li
1466*9c5db199SXin Li
1467*9c5db199SXin Li    def _execute_code(self, code_file, namespace, protect=True):
1468*9c5db199SXin Li        """
1469*9c5db199SXin Li        Execute code using a copy of namespace as a server control script.
1470*9c5db199SXin Li
1471*9c5db199SXin Li        Unless protect_namespace is explicitly set to False, the dict will not
1472*9c5db199SXin Li        be modified.
1473*9c5db199SXin Li
1474*9c5db199SXin Li        Args:
1475*9c5db199SXin Li          code_file: The filename of the control file to execute.
1476*9c5db199SXin Li          namespace: A dict containing names to make available during execution.
1477*9c5db199SXin Li          protect: Boolean.  If True (the default) a copy of the namespace dict
1478*9c5db199SXin Li              is used during execution to prevent the code from modifying its
1479*9c5db199SXin Li              contents outside of this function.  If False the raw dict is
1480*9c5db199SXin Li              passed in and modifications will be allowed.
1481*9c5db199SXin Li        """
1482*9c5db199SXin Li        if protect:
1483*9c5db199SXin Li            namespace = namespace.copy()
1484*9c5db199SXin Li        self._fill_server_control_namespace(namespace, protect=protect)
1485*9c5db199SXin Li        # TODO: Simplify and get rid of the special cases for only 1 machine.
1486*9c5db199SXin Li        if len(self.machines) > 1:
1487*9c5db199SXin Li            machines_text = '\n'.join(self.machines) + '\n'
1488*9c5db199SXin Li            # Only rewrite the file if it does not match our machine list.
1489*9c5db199SXin Li            try:
1490*9c5db199SXin Li                machines_f = open(MACHINES_FILENAME, 'r')
1491*9c5db199SXin Li                existing_machines_text = machines_f.read()
1492*9c5db199SXin Li                machines_f.close()
1493*9c5db199SXin Li            except EnvironmentError:
1494*9c5db199SXin Li                existing_machines_text = None
1495*9c5db199SXin Li            if machines_text != existing_machines_text:
1496*9c5db199SXin Li                utils.open_write_close(MACHINES_FILENAME, machines_text)
1497*9c5db199SXin Li        seven.exec_file(code_file, locals_=namespace, globals_=namespace)
1498*9c5db199SXin Li
1499*9c5db199SXin Li
1500*9c5db199SXin Li    def preprocess_client_state(self):
1501*9c5db199SXin Li        """
1502*9c5db199SXin Li        Produce a state file for initializing the state of a client job.
1503*9c5db199SXin Li
1504*9c5db199SXin Li        Creates a new client state file with all the current server state, as
1505*9c5db199SXin Li        well as some pre-set client state.
1506*9c5db199SXin Li
1507*9c5db199SXin Li        @returns The path of the file the state was written into.
1508*9c5db199SXin Li        """
1509*9c5db199SXin Li        # initialize the sysinfo state
1510*9c5db199SXin Li        self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1511*9c5db199SXin Li
1512*9c5db199SXin Li        # dump the state out to a tempfile
1513*9c5db199SXin Li        fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1514*9c5db199SXin Li        os.close(fd)
1515*9c5db199SXin Li
1516*9c5db199SXin Li        # write_to_file doesn't need locking, we exclusively own file_path
1517*9c5db199SXin Li        self._state.write_to_file(file_path)
1518*9c5db199SXin Li        return file_path
1519*9c5db199SXin Li
1520*9c5db199SXin Li
1521*9c5db199SXin Li    def postprocess_client_state(self, state_path):
1522*9c5db199SXin Li        """
1523*9c5db199SXin Li        Update the state of this job with the state from a client job.
1524*9c5db199SXin Li
1525*9c5db199SXin Li        Updates the state of the server side of a job with the final state
1526*9c5db199SXin Li        of a client job that was run. Updates the non-client-specific state,
1527*9c5db199SXin Li        pulls in some specific bits from the client-specific state, and then
1528*9c5db199SXin Li        discards the rest. Removes the state file afterwards
1529*9c5db199SXin Li
1530*9c5db199SXin Li        @param state_file A path to the state file from the client.
1531*9c5db199SXin Li        """
1532*9c5db199SXin Li        # update the on-disk state
1533*9c5db199SXin Li        try:
1534*9c5db199SXin Li            self._state.read_from_file(state_path)
1535*9c5db199SXin Li            os.remove(state_path)
1536*9c5db199SXin Li        except OSError as e:
1537*9c5db199SXin Li            # ignore file-not-found errors
1538*9c5db199SXin Li            if e.errno != errno.ENOENT:
1539*9c5db199SXin Li                raise
1540*9c5db199SXin Li            else:
1541*9c5db199SXin Li                logging.debug('Client state file %s not found', state_path)
1542*9c5db199SXin Li
1543*9c5db199SXin Li        # update the sysinfo state
1544*9c5db199SXin Li        if self._state.has('client', 'sysinfo'):
1545*9c5db199SXin Li            self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1546*9c5db199SXin Li
1547*9c5db199SXin Li        # drop all the client-specific state
1548*9c5db199SXin Li        self._state.discard_namespace('client')
1549*9c5db199SXin Li
1550*9c5db199SXin Li
1551*9c5db199SXin Li    def clear_all_known_hosts(self):
1552*9c5db199SXin Li        """Clears known hosts files for all AbstractSSHHosts."""
1553*9c5db199SXin Li        for host in self.hosts:
1554*9c5db199SXin Li            if isinstance(host, abstract_ssh.AbstractSSHHost):
1555*9c5db199SXin Li                host.clear_known_hosts()
1556*9c5db199SXin Li
1557*9c5db199SXin Li
1558*9c5db199SXin Li    def close(self):
1559*9c5db199SXin Li        """Closes this job's operation."""
1560*9c5db199SXin Li
1561*9c5db199SXin Li        # Use shallow copy, because host.close() internally discards itself.
1562*9c5db199SXin Li        for host in list(self.hosts):
1563*9c5db199SXin Li            host.close()
1564*9c5db199SXin Li        assert not self.hosts
1565*9c5db199SXin Li        self._connection_pool.shutdown()
1566*9c5db199SXin Li
1567*9c5db199SXin Li
1568*9c5db199SXin Li    def _get_job_data(self):
1569*9c5db199SXin Li        """Add custom data to the job keyval info.
1570*9c5db199SXin Li
1571*9c5db199SXin Li        When multiple machines are used in a job, change the hostname to
1572*9c5db199SXin Li        the platform of the first machine instead of machine1,machine2,...  This
1573*9c5db199SXin Li        makes the job reports easier to read and keeps the tko_machines table from
1574*9c5db199SXin Li        growing too large.
1575*9c5db199SXin Li
1576*9c5db199SXin Li        Returns:
1577*9c5db199SXin Li            keyval dictionary with new hostname value, or empty dictionary.
1578*9c5db199SXin Li        """
1579*9c5db199SXin Li        job_data = {}
1580*9c5db199SXin Li        # Only modify hostname on multimachine jobs. Assume all host have the same
1581*9c5db199SXin Li        # platform.
1582*9c5db199SXin Li        if len(self.machines) > 1:
1583*9c5db199SXin Li            # Search through machines for first machine with a platform.
1584*9c5db199SXin Li            for host in self.machines:
1585*9c5db199SXin Li                keyval_path = os.path.join(self.resultdir, 'host_keyvals', host)
1586*9c5db199SXin Li                keyvals = utils.read_keyval(keyval_path)
1587*9c5db199SXin Li                host_plat = keyvals.get('platform', None)
1588*9c5db199SXin Li                if not host_plat:
1589*9c5db199SXin Li                    continue
1590*9c5db199SXin Li                job_data['hostname'] = host_plat
1591*9c5db199SXin Li                break
1592*9c5db199SXin Li        return job_data
1593*9c5db199SXin Li
1594*9c5db199SXin Li
1595*9c5db199SXin Liclass warning_manager(object):
1596*9c5db199SXin Li    """Class for controlling warning logs. Manages the enabling and disabling
1597*9c5db199SXin Li    of warnings."""
1598*9c5db199SXin Li    def __init__(self):
1599*9c5db199SXin Li        # a map of warning types to a list of disabled time intervals
1600*9c5db199SXin Li        self.disabled_warnings = {}
1601*9c5db199SXin Li
1602*9c5db199SXin Li
1603*9c5db199SXin Li    def is_valid(self, timestamp, warning_type):
1604*9c5db199SXin Li        """Indicates if a warning (based on the time it occured and its type)
1605*9c5db199SXin Li        is a valid warning. A warning is considered "invalid" if this type of
1606*9c5db199SXin Li        warning was marked as "disabled" at the time the warning occured."""
1607*9c5db199SXin Li        disabled_intervals = self.disabled_warnings.get(warning_type, [])
1608*9c5db199SXin Li        for start, end in disabled_intervals:
1609*9c5db199SXin Li            if timestamp >= start and (end is None or timestamp < end):
1610*9c5db199SXin Li                return False
1611*9c5db199SXin Li        return True
1612*9c5db199SXin Li
1613*9c5db199SXin Li
1614*9c5db199SXin Li    def disable_warnings(self, warning_type, current_time_func=time.time):
1615*9c5db199SXin Li        """As of now, disables all further warnings of this type."""
1616*9c5db199SXin Li        intervals = self.disabled_warnings.setdefault(warning_type, [])
1617*9c5db199SXin Li        if not intervals or intervals[-1][1] is not None:
1618*9c5db199SXin Li            intervals.append((int(current_time_func()), None))
1619*9c5db199SXin Li
1620*9c5db199SXin Li
1621*9c5db199SXin Li    def enable_warnings(self, warning_type, current_time_func=time.time):
1622*9c5db199SXin Li        """As of now, enables all further warnings of this type."""
1623*9c5db199SXin Li        intervals = self.disabled_warnings.get(warning_type, [])
1624*9c5db199SXin Li        if intervals and intervals[-1][1] is None:
1625*9c5db199SXin Li            intervals[-1] = (intervals[-1][0], int(current_time_func()))
1626*9c5db199SXin Li
1627*9c5db199SXin Li
1628*9c5db199SXin Lidef _is_current_server_job(test):
1629*9c5db199SXin Li    """Return True if parsed test is the currently running job.
1630*9c5db199SXin Li
1631*9c5db199SXin Li    @param test: test instance from tko parser.
1632*9c5db199SXin Li    """
1633*9c5db199SXin Li    return test.testname == 'SERVER_JOB'
1634*9c5db199SXin Li
1635*9c5db199SXin Li
1636*9c5db199SXin Lidef _create_afe_host(hostname):
1637*9c5db199SXin Li    """Create an afe_host object backed by the AFE.
1638*9c5db199SXin Li
1639*9c5db199SXin Li    @param hostname: Name of the host for which we want the Host object.
1640*9c5db199SXin Li    @returns: An object of type frontend.AFE
1641*9c5db199SXin Li    """
1642*9c5db199SXin Li    afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
1643*9c5db199SXin Li    hosts = afe.get_hosts(hostname=hostname)
1644*9c5db199SXin Li    if not hosts:
1645*9c5db199SXin Li        raise error.AutoservError('No hosts named %s found' % hostname)
1646*9c5db199SXin Li
1647*9c5db199SXin Li    return hosts[0]
1648*9c5db199SXin Li
1649*9c5db199SXin Li
1650*9c5db199SXin Lidef _create_file_backed_host_info_store(store_dir, hostname):
1651*9c5db199SXin Li    """Create a CachingHostInfoStore backed by an existing file.
1652*9c5db199SXin Li
1653*9c5db199SXin Li    @param store_dir: A directory to contain store backing files.
1654*9c5db199SXin Li    @param hostname: Name of the host for which we want the store.
1655*9c5db199SXin Li
1656*9c5db199SXin Li    @returns: An object of type CachingHostInfoStore.
1657*9c5db199SXin Li    """
1658*9c5db199SXin Li    backing_file_path = os.path.join(store_dir, '%s.store' % hostname)
1659*9c5db199SXin Li    if not os.path.isfile(backing_file_path):
1660*9c5db199SXin Li        raise error.AutoservError(
1661*9c5db199SXin Li                'Requested FileStore but no backing file at %s'
1662*9c5db199SXin Li                % backing_file_path
1663*9c5db199SXin Li        )
1664*9c5db199SXin Li    return file_store.FileStore(backing_file_path)
1665*9c5db199SXin Li
1666*9c5db199SXin Li
1667*9c5db199SXin Lidef _create_afe_backed_host_info_store(store_dir, hostname):
1668*9c5db199SXin Li    """Create a CachingHostInfoStore backed by the AFE.
1669*9c5db199SXin Li
1670*9c5db199SXin Li    @param store_dir: A directory to contain store backing files.
1671*9c5db199SXin Li    @param hostname: Name of the host for which we want the store.
1672*9c5db199SXin Li
1673*9c5db199SXin Li    @returns: An object of type CachingHostInfoStore.
1674*9c5db199SXin Li    """
1675*9c5db199SXin Li    primary_store = afe_store.AfeStore(hostname)
1676*9c5db199SXin Li    try:
1677*9c5db199SXin Li        primary_store.get(force_refresh=True)
1678*9c5db199SXin Li    except host_info.StoreError:
1679*9c5db199SXin Li        raise error.AutoservError(
1680*9c5db199SXin Li                'Could not obtain HostInfo for hostname %s' % hostname)
1681*9c5db199SXin Li    # Since the store wasn't initialized external to autoserv, we must
1682*9c5db199SXin Li    # ensure that the store we create is unique within store_dir.
1683*9c5db199SXin Li    backing_file_path = os.path.join(
1684*9c5db199SXin Li            _make_unique_subdir(store_dir),
1685*9c5db199SXin Li            '%s.store' % hostname,
1686*9c5db199SXin Li    )
1687*9c5db199SXin Li    logging.info('Shadowing AFE store with a FileStore at %s',
1688*9c5db199SXin Li                 backing_file_path)
1689*9c5db199SXin Li    shadow_store = file_store.FileStore(backing_file_path)
1690*9c5db199SXin Li    return shadowing_store.ShadowingStore(primary_store, shadow_store)
1691*9c5db199SXin Li
1692*9c5db199SXin Li
1693*9c5db199SXin Lidef _make_unique_subdir(workdir):
1694*9c5db199SXin Li    """Creates a new subdir within workdir and returns the path to it."""
1695*9c5db199SXin Li    store_dir = os.path.join(workdir, 'dir_%s' % uuid.uuid4())
1696*9c5db199SXin Li    _make_dirs_if_needed(store_dir)
1697*9c5db199SXin Li    return store_dir
1698*9c5db199SXin Li
1699*9c5db199SXin Li
1700*9c5db199SXin Lidef _make_dirs_if_needed(path):
1701*9c5db199SXin Li    """os.makedirs, but ignores failure because the leaf directory exists"""
1702*9c5db199SXin Li    try:
1703*9c5db199SXin Li        os.makedirs(path)
1704*9c5db199SXin Li    except OSError as e:
1705*9c5db199SXin Li        if e.errno != errno.EEXIST:
1706*9c5db199SXin Li            raise
1707