xref: /aosp_15_r20/external/autotest/site_utils/job_directories.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1import abc
2import datetime
3import glob
4import json
5import logging
6import os
7import re
8import shutil
9import six
10
11import common
12
13from autotest_lib.client.common_lib import time_utils
14from autotest_lib.client.common_lib import utils
15from autotest_lib.server.cros.dynamic_suite import constants
16from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
17
18try:
19    from autotest_lib.utils.frozen_chromite.lib import metrics
20except ImportError:
21    metrics = utils.metrics_mock
22
23
24SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+'
25
26def is_job_expired(age_limit, timestamp):
27    """Check whether a job timestamp is older than an age limit.
28
29    @param age_limit: Minimum age, measured in days.  If the value is
30                      not positive, the job is always expired.
31    @param timestamp: Timestamp of the job whose age we are checking.
32                      The format must match time_utils.TIME_FMT.
33
34    @returns True if the job is old enough to be expired.
35    """
36    if age_limit <= 0:
37        return True
38    job_time = time_utils.time_string_to_datetime(timestamp)
39    expiration = job_time + datetime.timedelta(days=age_limit)
40    return datetime.datetime.now() >= expiration
41
42
43def get_job_id_or_task_id(result_dir):
44    """Extract job id or special task id from result_dir
45
46    @param result_dir: path to the result dir.
47            For test job:
48            /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6
49            The hostname at the end is optional.
50            For special task:
51            /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup
52
53    @returns: str representing the job id or task id. Returns None if fail
54        to parse job or task id from the result_dir.
55    """
56    if not result_dir:
57        return
58    result_dir = os.path.abspath(result_dir)
59    # Result folder for job running inside container has only job id.
60    ssp_job_pattern = '.*/(\d+)$'
61    # Try to get the job ID from the last pattern of number-text. This avoids
62    # issue with path like 123-results/456-debug_user, in which 456 is the real
63    # job ID.
64    m_job = re.findall('.*/(\d+)-[^/]+', result_dir)
65    if m_job:
66        return m_job[-1]
67    m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir)
68    if m_special_task:
69        return m_special_task.group(1)
70    m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir)
71    if m_ssp_job_pattern and utils.is_in_container():
72        return m_ssp_job_pattern.group(1)
73    return _get_swarming_run_id(result_dir)
74
75
76def _get_swarming_run_id(path):
77    """Extract the Swarming run_id for a Skylab task from the result path."""
78    # Legacy swarming results are in directories like
79    #   .../results/swarming-3e4391423c3a4311
80    # In particular, the ending digit is never 0
81    m_legacy_path = re.match('.*/swarming-([0-9a-fA-F]*[1-9a-fA-F])$', path)
82    if m_legacy_path:
83        return m_legacy_path.group(1)
84    # New style swarming results are in directories like
85    #   .../results/swarming-3e4391423c3a4310/1
86    # - Results are one directory deeper.
87    # - Ending digit of first directory is always 0.
88    m_path = re.match('.*/swarming-([0-9a-fA-F]*)0/([1-9a-fA-F])$', path)
89    if m_path:
90        return m_path.group(1) + m_path.group(2)
91    return None
92
93
94class _JobDirectory(six.with_metaclass(abc.ABCMeta, object)):
95    """State associated with a job to be offloaded.
96
97    The full life-cycle of a job (including failure events that
98    normally don't occur) looks like this:
99      1. The job's results directory is discovered by
100         `get_job_directories()`, and a job instance is created for it.
101      2. Calls to `offload()` have no effect so long as the job
102         isn't complete in the database and the job isn't expired
103         according to the `age_limit` parameter.
104      3. Eventually, the job is both finished and expired.  The next
105         call to `offload()` makes the first attempt to offload the
106         directory to GS.  Offload is attempted, but fails to complete
107         (e.g. because of a GS problem).
108      4. Finally, a call to `offload()` succeeds, and the directory no
109         longer exists.  Now `is_offloaded()` is true, so the job
110         instance is deleted, and future failures will not mention this
111         directory any more.
112
113    Only steps 1. and 4. are guaranteed to occur.  The others depend
114    on the timing of calls to `offload()`, and on the reliability of
115    the actual offload process.
116
117    """
118    GLOB_PATTERN = None  # must be redefined in subclass
119
120    def __init__(self, resultsdir):
121        self.dirname = resultsdir
122        self._id = get_job_id_or_task_id(resultsdir)
123        self.offload_count = 0
124        self.first_offload_start = 0
125
126    @classmethod
127    def get_job_directories(cls):
128        """Return a list of directories of jobs that need offloading."""
129        return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)]
130
131    @abc.abstractmethod
132    def get_timestamp_if_finished(self):
133        """Return this job's timestamp from the database.
134
135        If the database has not marked the job as finished, return
136        `None`.  Otherwise, return a timestamp for the job.  The
137        timestamp is to be used to determine expiration in
138        `is_job_expired()`.
139
140        @return Return `None` if the job is still running; otherwise
141                return a string with a timestamp in the appropriate
142                format.
143        """
144        raise NotImplementedError("_JobDirectory.get_timestamp_if_finished")
145
146    def process_gs_instructions(self):
147        """Process any gs_offloader instructions for this special task.
148
149        @returns True/False if there is anything left to offload.
150        """
151        # Default support is to still offload the directory.
152        return True
153
154
155NO_OFFLOAD_README = """These results have been deleted rather than offloaded.
156This is the expected behavior for passing jobs from the Commit Queue."""
157
158
159class RegularJobDirectory(_JobDirectory):
160    """Subclass of _JobDirectory for regular test jobs."""
161
162    GLOB_PATTERN = '[0-9]*-*'
163
164    def process_gs_instructions(self):
165        """Process any gs_offloader instructions for this job.
166
167        @returns True/False if there is anything left to offload.
168        """
169        # Go through the gs_offloader instructions file for each test in this job.
170        for path in glob.glob(
171                os.path.join(self.dirname, '*',
172                             constants.GS_OFFLOADER_INSTRUCTIONS)):
173            with open(path, 'r') as f:
174                gs_off_instructions = json.load(f)
175            if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD):
176                dirname = os.path.dirname(path)
177                _remove_log_directory_contents(dirname)
178
179        # Finally check if there's anything left to offload.
180        if os.path.exists(self.dirname) and not os.listdir(self.dirname):
181            shutil.rmtree(self.dirname)
182            return False
183        return True
184
185    def get_timestamp_if_finished(self):
186        """Get the timestamp to use for finished jobs.
187
188        @returns the latest hqe finished_on time. If the finished_on times are null
189                 returns the job's created_on time.
190        """
191        entry = _cached_afe().get_jobs(id=self._id, finished=True)
192        if not entry:
193            return None
194        hqes = _cached_afe().get_host_queue_entries(finished_on__isnull=False,
195                                                    job_id=self._id)
196        if not hqes:
197            return entry[0].created_on
198        # While most Jobs have 1 HQE, some can have multiple, so check them all.
199        return max([hqe.finished_on for hqe in hqes])
200
201
202def _remove_log_directory_contents(dirpath):
203    """Remove log directory contents.
204
205    Leave a note explaining what has happened to the logs.
206
207    @param dirpath: Path to log directory.
208    """
209    shutil.rmtree(dirpath)
210    os.mkdir(dirpath)
211    breadcrumb_name = os.path.join(dirpath, 'logs-removed-readme.txt')
212    with open(breadcrumb_name, 'w') as f:
213        f.write(NO_OFFLOAD_README)
214
215
216class SpecialJobDirectory(_JobDirectory):
217    """Subclass of _JobDirectory for special (per-host) jobs."""
218
219    GLOB_PATTERN = 'hosts/*/[0-9]*-*'
220
221    def __init__(self, resultsdir):
222        super(SpecialJobDirectory, self).__init__(resultsdir)
223
224    def get_timestamp_if_finished(self):
225        entry = _cached_afe().get_special_tasks(id=self._id, is_complete=True)
226        return entry[0].time_finished if entry else None
227
228
229def _find_results_dir(dirname):
230    subdirs = []
231    for root, dirs, files in os.walk(dirname, topdown=True):
232        for f in files:
233            if f == _OFFLOAD_MARKER:
234                subdirs.append(root)
235    return subdirs
236
237
238_OFFLOAD_MARKER = ".ready_for_offload"
239_marker_parse_error_metric = metrics.Counter(
240    'chromeos/autotest/gs_offloader/offload_marker_parse_errors',
241    description='Errors parsing the offload marker file')
242
243
244class SwarmingJobDirectory(_JobDirectory):
245    """Subclass of _JobDirectory for Skylab swarming jobs."""
246
247    @classmethod
248    def get_job_directories(cls):
249        """Return a list of directories of jobs that need offloading."""
250        # Legacy swarming results are in directories like
251        #   .../results/swarming-3e4391423c3a4311
252        # In particular, the ending digit is never 0
253        jobdirs = [
254                d for d in glob.glob('swarming-[0-9a-f]*[1-9a-f]')
255                if os.path.isdir(d)
256        ]
257        # New style swarming results are in directories like
258        #   .../results/swarming-3e4391423c3a4310/1
259        # - Results are one directory deeper.
260        # - Ending digit of first directory is always 0.
261        new_style_topdir = [
262                d for d in glob.glob('swarming-[0-9a-f]*0') if os.path.isdir(d)
263        ]
264        # When there are multiple tests run in one test_runner build,
265        # the results will be one level deeper with the test_id
266        # as one further subdirectory.
267        # Example: .../results/swarming-3e4391423c3a4310/1/test_id
268        for topdir in new_style_topdir:
269            for d in glob.glob('%s/[1-9a-f]*' % topdir):
270                subdirs = _find_results_dir(d)
271                jobdirs += subdirs
272
273        return jobdirs
274
275    def get_timestamp_if_finished(self):
276        """Get the timestamp to use for finished jobs.
277
278        @returns the latest hqe finished_on time. If the finished_on times are null
279                 returns the job's created_on time.
280        """
281        marker_path = os.path.join(self.dirname, _OFFLOAD_MARKER)
282        try:
283            with open(marker_path) as f:
284                ts_string = f.read().strip()
285        except:
286            return None
287        try:
288            ts = int(ts_string)
289            return time_utils.epoch_time_to_date_string(ts)
290        except ValueError as e:
291            logging.debug('Error parsing %s for %s: %s', _OFFLOAD_MARKER,
292                          self.dirname, e)
293            _marker_parse_error_metric.increment()
294            return None
295
296
297_AFE = None
298def _cached_afe():
299    global _AFE
300    if _AFE is None:
301        _AFE = frontend_wrappers.RetryingAFE()
302    return _AFE