xref: /aosp_15_r20/external/autotest/site_utils/job_directories.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Liimport abc
2*9c5db199SXin Liimport datetime
3*9c5db199SXin Liimport glob
4*9c5db199SXin Liimport json
5*9c5db199SXin Liimport logging
6*9c5db199SXin Liimport os
7*9c5db199SXin Liimport re
8*9c5db199SXin Liimport shutil
9*9c5db199SXin Liimport six
10*9c5db199SXin Li
11*9c5db199SXin Liimport common
12*9c5db199SXin Li
13*9c5db199SXin Lifrom autotest_lib.client.common_lib import time_utils
14*9c5db199SXin Lifrom autotest_lib.client.common_lib import utils
15*9c5db199SXin Lifrom autotest_lib.server.cros.dynamic_suite import constants
16*9c5db199SXin Lifrom autotest_lib.server.cros.dynamic_suite import frontend_wrappers
17*9c5db199SXin Li
18*9c5db199SXin Litry:
19*9c5db199SXin Li    from autotest_lib.utils.frozen_chromite.lib import metrics
20*9c5db199SXin Liexcept ImportError:
21*9c5db199SXin Li    metrics = utils.metrics_mock
22*9c5db199SXin Li
23*9c5db199SXin Li
24*9c5db199SXin LiSPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+'
25*9c5db199SXin Li
26*9c5db199SXin Lidef is_job_expired(age_limit, timestamp):
27*9c5db199SXin Li    """Check whether a job timestamp is older than an age limit.
28*9c5db199SXin Li
29*9c5db199SXin Li    @param age_limit: Minimum age, measured in days.  If the value is
30*9c5db199SXin Li                      not positive, the job is always expired.
31*9c5db199SXin Li    @param timestamp: Timestamp of the job whose age we are checking.
32*9c5db199SXin Li                      The format must match time_utils.TIME_FMT.
33*9c5db199SXin Li
34*9c5db199SXin Li    @returns True if the job is old enough to be expired.
35*9c5db199SXin Li    """
36*9c5db199SXin Li    if age_limit <= 0:
37*9c5db199SXin Li        return True
38*9c5db199SXin Li    job_time = time_utils.time_string_to_datetime(timestamp)
39*9c5db199SXin Li    expiration = job_time + datetime.timedelta(days=age_limit)
40*9c5db199SXin Li    return datetime.datetime.now() >= expiration
41*9c5db199SXin Li
42*9c5db199SXin Li
43*9c5db199SXin Lidef get_job_id_or_task_id(result_dir):
44*9c5db199SXin Li    """Extract job id or special task id from result_dir
45*9c5db199SXin Li
46*9c5db199SXin Li    @param result_dir: path to the result dir.
47*9c5db199SXin Li            For test job:
48*9c5db199SXin Li            /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6
49*9c5db199SXin Li            The hostname at the end is optional.
50*9c5db199SXin Li            For special task:
51*9c5db199SXin Li            /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup
52*9c5db199SXin Li
53*9c5db199SXin Li    @returns: str representing the job id or task id. Returns None if fail
54*9c5db199SXin Li        to parse job or task id from the result_dir.
55*9c5db199SXin Li    """
56*9c5db199SXin Li    if not result_dir:
57*9c5db199SXin Li        return
58*9c5db199SXin Li    result_dir = os.path.abspath(result_dir)
59*9c5db199SXin Li    # Result folder for job running inside container has only job id.
60*9c5db199SXin Li    ssp_job_pattern = '.*/(\d+)$'
61*9c5db199SXin Li    # Try to get the job ID from the last pattern of number-text. This avoids
62*9c5db199SXin Li    # issue with path like 123-results/456-debug_user, in which 456 is the real
63*9c5db199SXin Li    # job ID.
64*9c5db199SXin Li    m_job = re.findall('.*/(\d+)-[^/]+', result_dir)
65*9c5db199SXin Li    if m_job:
66*9c5db199SXin Li        return m_job[-1]
67*9c5db199SXin Li    m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir)
68*9c5db199SXin Li    if m_special_task:
69*9c5db199SXin Li        return m_special_task.group(1)
70*9c5db199SXin Li    m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir)
71*9c5db199SXin Li    if m_ssp_job_pattern and utils.is_in_container():
72*9c5db199SXin Li        return m_ssp_job_pattern.group(1)
73*9c5db199SXin Li    return _get_swarming_run_id(result_dir)
74*9c5db199SXin Li
75*9c5db199SXin Li
76*9c5db199SXin Lidef _get_swarming_run_id(path):
77*9c5db199SXin Li    """Extract the Swarming run_id for a Skylab task from the result path."""
78*9c5db199SXin Li    # Legacy swarming results are in directories like
79*9c5db199SXin Li    #   .../results/swarming-3e4391423c3a4311
80*9c5db199SXin Li    # In particular, the ending digit is never 0
81*9c5db199SXin Li    m_legacy_path = re.match('.*/swarming-([0-9a-fA-F]*[1-9a-fA-F])$', path)
82*9c5db199SXin Li    if m_legacy_path:
83*9c5db199SXin Li        return m_legacy_path.group(1)
84*9c5db199SXin Li    # New style swarming results are in directories like
85*9c5db199SXin Li    #   .../results/swarming-3e4391423c3a4310/1
86*9c5db199SXin Li    # - Results are one directory deeper.
87*9c5db199SXin Li    # - Ending digit of first directory is always 0.
88*9c5db199SXin Li    m_path = re.match('.*/swarming-([0-9a-fA-F]*)0/([1-9a-fA-F])$', path)
89*9c5db199SXin Li    if m_path:
90*9c5db199SXin Li        return m_path.group(1) + m_path.group(2)
91*9c5db199SXin Li    return None
92*9c5db199SXin Li
93*9c5db199SXin Li
94*9c5db199SXin Liclass _JobDirectory(six.with_metaclass(abc.ABCMeta, object)):
95*9c5db199SXin Li    """State associated with a job to be offloaded.
96*9c5db199SXin Li
97*9c5db199SXin Li    The full life-cycle of a job (including failure events that
98*9c5db199SXin Li    normally don't occur) looks like this:
99*9c5db199SXin Li      1. The job's results directory is discovered by
100*9c5db199SXin Li         `get_job_directories()`, and a job instance is created for it.
101*9c5db199SXin Li      2. Calls to `offload()` have no effect so long as the job
102*9c5db199SXin Li         isn't complete in the database and the job isn't expired
103*9c5db199SXin Li         according to the `age_limit` parameter.
104*9c5db199SXin Li      3. Eventually, the job is both finished and expired.  The next
105*9c5db199SXin Li         call to `offload()` makes the first attempt to offload the
106*9c5db199SXin Li         directory to GS.  Offload is attempted, but fails to complete
107*9c5db199SXin Li         (e.g. because of a GS problem).
108*9c5db199SXin Li      4. Finally, a call to `offload()` succeeds, and the directory no
109*9c5db199SXin Li         longer exists.  Now `is_offloaded()` is true, so the job
110*9c5db199SXin Li         instance is deleted, and future failures will not mention this
111*9c5db199SXin Li         directory any more.
112*9c5db199SXin Li
113*9c5db199SXin Li    Only steps 1. and 4. are guaranteed to occur.  The others depend
114*9c5db199SXin Li    on the timing of calls to `offload()`, and on the reliability of
115*9c5db199SXin Li    the actual offload process.
116*9c5db199SXin Li
117*9c5db199SXin Li    """
118*9c5db199SXin Li    GLOB_PATTERN = None  # must be redefined in subclass
119*9c5db199SXin Li
120*9c5db199SXin Li    def __init__(self, resultsdir):
121*9c5db199SXin Li        self.dirname = resultsdir
122*9c5db199SXin Li        self._id = get_job_id_or_task_id(resultsdir)
123*9c5db199SXin Li        self.offload_count = 0
124*9c5db199SXin Li        self.first_offload_start = 0
125*9c5db199SXin Li
126*9c5db199SXin Li    @classmethod
127*9c5db199SXin Li    def get_job_directories(cls):
128*9c5db199SXin Li        """Return a list of directories of jobs that need offloading."""
129*9c5db199SXin Li        return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)]
130*9c5db199SXin Li
131*9c5db199SXin Li    @abc.abstractmethod
132*9c5db199SXin Li    def get_timestamp_if_finished(self):
133*9c5db199SXin Li        """Return this job's timestamp from the database.
134*9c5db199SXin Li
135*9c5db199SXin Li        If the database has not marked the job as finished, return
136*9c5db199SXin Li        `None`.  Otherwise, return a timestamp for the job.  The
137*9c5db199SXin Li        timestamp is to be used to determine expiration in
138*9c5db199SXin Li        `is_job_expired()`.
139*9c5db199SXin Li
140*9c5db199SXin Li        @return Return `None` if the job is still running; otherwise
141*9c5db199SXin Li                return a string with a timestamp in the appropriate
142*9c5db199SXin Li                format.
143*9c5db199SXin Li        """
144*9c5db199SXin Li        raise NotImplementedError("_JobDirectory.get_timestamp_if_finished")
145*9c5db199SXin Li
146*9c5db199SXin Li    def process_gs_instructions(self):
147*9c5db199SXin Li        """Process any gs_offloader instructions for this special task.
148*9c5db199SXin Li
149*9c5db199SXin Li        @returns True/False if there is anything left to offload.
150*9c5db199SXin Li        """
151*9c5db199SXin Li        # Default support is to still offload the directory.
152*9c5db199SXin Li        return True
153*9c5db199SXin Li
154*9c5db199SXin Li
155*9c5db199SXin LiNO_OFFLOAD_README = """These results have been deleted rather than offloaded.
156*9c5db199SXin LiThis is the expected behavior for passing jobs from the Commit Queue."""
157*9c5db199SXin Li
158*9c5db199SXin Li
159*9c5db199SXin Liclass RegularJobDirectory(_JobDirectory):
160*9c5db199SXin Li    """Subclass of _JobDirectory for regular test jobs."""
161*9c5db199SXin Li
162*9c5db199SXin Li    GLOB_PATTERN = '[0-9]*-*'
163*9c5db199SXin Li
164*9c5db199SXin Li    def process_gs_instructions(self):
165*9c5db199SXin Li        """Process any gs_offloader instructions for this job.
166*9c5db199SXin Li
167*9c5db199SXin Li        @returns True/False if there is anything left to offload.
168*9c5db199SXin Li        """
169*9c5db199SXin Li        # Go through the gs_offloader instructions file for each test in this job.
170*9c5db199SXin Li        for path in glob.glob(
171*9c5db199SXin Li                os.path.join(self.dirname, '*',
172*9c5db199SXin Li                             constants.GS_OFFLOADER_INSTRUCTIONS)):
173*9c5db199SXin Li            with open(path, 'r') as f:
174*9c5db199SXin Li                gs_off_instructions = json.load(f)
175*9c5db199SXin Li            if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD):
176*9c5db199SXin Li                dirname = os.path.dirname(path)
177*9c5db199SXin Li                _remove_log_directory_contents(dirname)
178*9c5db199SXin Li
179*9c5db199SXin Li        # Finally check if there's anything left to offload.
180*9c5db199SXin Li        if os.path.exists(self.dirname) and not os.listdir(self.dirname):
181*9c5db199SXin Li            shutil.rmtree(self.dirname)
182*9c5db199SXin Li            return False
183*9c5db199SXin Li        return True
184*9c5db199SXin Li
185*9c5db199SXin Li    def get_timestamp_if_finished(self):
186*9c5db199SXin Li        """Get the timestamp to use for finished jobs.
187*9c5db199SXin Li
188*9c5db199SXin Li        @returns the latest hqe finished_on time. If the finished_on times are null
189*9c5db199SXin Li                 returns the job's created_on time.
190*9c5db199SXin Li        """
191*9c5db199SXin Li        entry = _cached_afe().get_jobs(id=self._id, finished=True)
192*9c5db199SXin Li        if not entry:
193*9c5db199SXin Li            return None
194*9c5db199SXin Li        hqes = _cached_afe().get_host_queue_entries(finished_on__isnull=False,
195*9c5db199SXin Li                                                    job_id=self._id)
196*9c5db199SXin Li        if not hqes:
197*9c5db199SXin Li            return entry[0].created_on
198*9c5db199SXin Li        # While most Jobs have 1 HQE, some can have multiple, so check them all.
199*9c5db199SXin Li        return max([hqe.finished_on for hqe in hqes])
200*9c5db199SXin Li
201*9c5db199SXin Li
202*9c5db199SXin Lidef _remove_log_directory_contents(dirpath):
203*9c5db199SXin Li    """Remove log directory contents.
204*9c5db199SXin Li
205*9c5db199SXin Li    Leave a note explaining what has happened to the logs.
206*9c5db199SXin Li
207*9c5db199SXin Li    @param dirpath: Path to log directory.
208*9c5db199SXin Li    """
209*9c5db199SXin Li    shutil.rmtree(dirpath)
210*9c5db199SXin Li    os.mkdir(dirpath)
211*9c5db199SXin Li    breadcrumb_name = os.path.join(dirpath, 'logs-removed-readme.txt')
212*9c5db199SXin Li    with open(breadcrumb_name, 'w') as f:
213*9c5db199SXin Li        f.write(NO_OFFLOAD_README)
214*9c5db199SXin Li
215*9c5db199SXin Li
216*9c5db199SXin Liclass SpecialJobDirectory(_JobDirectory):
217*9c5db199SXin Li    """Subclass of _JobDirectory for special (per-host) jobs."""
218*9c5db199SXin Li
219*9c5db199SXin Li    GLOB_PATTERN = 'hosts/*/[0-9]*-*'
220*9c5db199SXin Li
221*9c5db199SXin Li    def __init__(self, resultsdir):
222*9c5db199SXin Li        super(SpecialJobDirectory, self).__init__(resultsdir)
223*9c5db199SXin Li
224*9c5db199SXin Li    def get_timestamp_if_finished(self):
225*9c5db199SXin Li        entry = _cached_afe().get_special_tasks(id=self._id, is_complete=True)
226*9c5db199SXin Li        return entry[0].time_finished if entry else None
227*9c5db199SXin Li
228*9c5db199SXin Li
229*9c5db199SXin Lidef _find_results_dir(dirname):
230*9c5db199SXin Li    subdirs = []
231*9c5db199SXin Li    for root, dirs, files in os.walk(dirname, topdown=True):
232*9c5db199SXin Li        for f in files:
233*9c5db199SXin Li            if f == _OFFLOAD_MARKER:
234*9c5db199SXin Li                subdirs.append(root)
235*9c5db199SXin Li    return subdirs
236*9c5db199SXin Li
237*9c5db199SXin Li
238*9c5db199SXin Li_OFFLOAD_MARKER = ".ready_for_offload"
239*9c5db199SXin Li_marker_parse_error_metric = metrics.Counter(
240*9c5db199SXin Li    'chromeos/autotest/gs_offloader/offload_marker_parse_errors',
241*9c5db199SXin Li    description='Errors parsing the offload marker file')
242*9c5db199SXin Li
243*9c5db199SXin Li
244*9c5db199SXin Liclass SwarmingJobDirectory(_JobDirectory):
245*9c5db199SXin Li    """Subclass of _JobDirectory for Skylab swarming jobs."""
246*9c5db199SXin Li
247*9c5db199SXin Li    @classmethod
248*9c5db199SXin Li    def get_job_directories(cls):
249*9c5db199SXin Li        """Return a list of directories of jobs that need offloading."""
250*9c5db199SXin Li        # Legacy swarming results are in directories like
251*9c5db199SXin Li        #   .../results/swarming-3e4391423c3a4311
252*9c5db199SXin Li        # In particular, the ending digit is never 0
253*9c5db199SXin Li        jobdirs = [
254*9c5db199SXin Li                d for d in glob.glob('swarming-[0-9a-f]*[1-9a-f]')
255*9c5db199SXin Li                if os.path.isdir(d)
256*9c5db199SXin Li        ]
257*9c5db199SXin Li        # New style swarming results are in directories like
258*9c5db199SXin Li        #   .../results/swarming-3e4391423c3a4310/1
259*9c5db199SXin Li        # - Results are one directory deeper.
260*9c5db199SXin Li        # - Ending digit of first directory is always 0.
261*9c5db199SXin Li        new_style_topdir = [
262*9c5db199SXin Li                d for d in glob.glob('swarming-[0-9a-f]*0') if os.path.isdir(d)
263*9c5db199SXin Li        ]
264*9c5db199SXin Li        # When there are multiple tests run in one test_runner build,
265*9c5db199SXin Li        # the results will be one level deeper with the test_id
266*9c5db199SXin Li        # as one further subdirectory.
267*9c5db199SXin Li        # Example: .../results/swarming-3e4391423c3a4310/1/test_id
268*9c5db199SXin Li        for topdir in new_style_topdir:
269*9c5db199SXin Li            for d in glob.glob('%s/[1-9a-f]*' % topdir):
270*9c5db199SXin Li                subdirs = _find_results_dir(d)
271*9c5db199SXin Li                jobdirs += subdirs
272*9c5db199SXin Li
273*9c5db199SXin Li        return jobdirs
274*9c5db199SXin Li
275*9c5db199SXin Li    def get_timestamp_if_finished(self):
276*9c5db199SXin Li        """Get the timestamp to use for finished jobs.
277*9c5db199SXin Li
278*9c5db199SXin Li        @returns the latest hqe finished_on time. If the finished_on times are null
279*9c5db199SXin Li                 returns the job's created_on time.
280*9c5db199SXin Li        """
281*9c5db199SXin Li        marker_path = os.path.join(self.dirname, _OFFLOAD_MARKER)
282*9c5db199SXin Li        try:
283*9c5db199SXin Li            with open(marker_path) as f:
284*9c5db199SXin Li                ts_string = f.read().strip()
285*9c5db199SXin Li        except:
286*9c5db199SXin Li            return None
287*9c5db199SXin Li        try:
288*9c5db199SXin Li            ts = int(ts_string)
289*9c5db199SXin Li            return time_utils.epoch_time_to_date_string(ts)
290*9c5db199SXin Li        except ValueError as e:
291*9c5db199SXin Li            logging.debug('Error parsing %s for %s: %s', _OFFLOAD_MARKER,
292*9c5db199SXin Li                          self.dirname, e)
293*9c5db199SXin Li            _marker_parse_error_metric.increment()
294*9c5db199SXin Li            return None
295*9c5db199SXin Li
296*9c5db199SXin Li
297*9c5db199SXin Li_AFE = None
298*9c5db199SXin Lidef _cached_afe():
299*9c5db199SXin Li    global _AFE
300*9c5db199SXin Li    if _AFE is None:
301*9c5db199SXin Li        _AFE = frontend_wrappers.RetryingAFE()
302*9c5db199SXin Li    return _AFE