1import abc 2import datetime 3import glob 4import json 5import logging 6import os 7import re 8import shutil 9import six 10 11import common 12 13from autotest_lib.client.common_lib import time_utils 14from autotest_lib.client.common_lib import utils 15from autotest_lib.server.cros.dynamic_suite import constants 16from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 17 18try: 19 from autotest_lib.utils.frozen_chromite.lib import metrics 20except ImportError: 21 metrics = utils.metrics_mock 22 23 24SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+' 25 26def is_job_expired(age_limit, timestamp): 27 """Check whether a job timestamp is older than an age limit. 28 29 @param age_limit: Minimum age, measured in days. If the value is 30 not positive, the job is always expired. 31 @param timestamp: Timestamp of the job whose age we are checking. 32 The format must match time_utils.TIME_FMT. 33 34 @returns True if the job is old enough to be expired. 35 """ 36 if age_limit <= 0: 37 return True 38 job_time = time_utils.time_string_to_datetime(timestamp) 39 expiration = job_time + datetime.timedelta(days=age_limit) 40 return datetime.datetime.now() >= expiration 41 42 43def get_job_id_or_task_id(result_dir): 44 """Extract job id or special task id from result_dir 45 46 @param result_dir: path to the result dir. 47 For test job: 48 /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6 49 The hostname at the end is optional. 50 For special task: 51 /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup 52 53 @returns: str representing the job id or task id. Returns None if fail 54 to parse job or task id from the result_dir. 55 """ 56 if not result_dir: 57 return 58 result_dir = os.path.abspath(result_dir) 59 # Result folder for job running inside container has only job id. 60 ssp_job_pattern = '.*/(\d+)$' 61 # Try to get the job ID from the last pattern of number-text. This avoids 62 # issue with path like 123-results/456-debug_user, in which 456 is the real 63 # job ID. 64 m_job = re.findall('.*/(\d+)-[^/]+', result_dir) 65 if m_job: 66 return m_job[-1] 67 m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir) 68 if m_special_task: 69 return m_special_task.group(1) 70 m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir) 71 if m_ssp_job_pattern and utils.is_in_container(): 72 return m_ssp_job_pattern.group(1) 73 return _get_swarming_run_id(result_dir) 74 75 76def _get_swarming_run_id(path): 77 """Extract the Swarming run_id for a Skylab task from the result path.""" 78 # Legacy swarming results are in directories like 79 # .../results/swarming-3e4391423c3a4311 80 # In particular, the ending digit is never 0 81 m_legacy_path = re.match('.*/swarming-([0-9a-fA-F]*[1-9a-fA-F])$', path) 82 if m_legacy_path: 83 return m_legacy_path.group(1) 84 # New style swarming results are in directories like 85 # .../results/swarming-3e4391423c3a4310/1 86 # - Results are one directory deeper. 87 # - Ending digit of first directory is always 0. 88 m_path = re.match('.*/swarming-([0-9a-fA-F]*)0/([1-9a-fA-F])$', path) 89 if m_path: 90 return m_path.group(1) + m_path.group(2) 91 return None 92 93 94class _JobDirectory(six.with_metaclass(abc.ABCMeta, object)): 95 """State associated with a job to be offloaded. 96 97 The full life-cycle of a job (including failure events that 98 normally don't occur) looks like this: 99 1. The job's results directory is discovered by 100 `get_job_directories()`, and a job instance is created for it. 101 2. Calls to `offload()` have no effect so long as the job 102 isn't complete in the database and the job isn't expired 103 according to the `age_limit` parameter. 104 3. Eventually, the job is both finished and expired. The next 105 call to `offload()` makes the first attempt to offload the 106 directory to GS. Offload is attempted, but fails to complete 107 (e.g. because of a GS problem). 108 4. Finally, a call to `offload()` succeeds, and the directory no 109 longer exists. Now `is_offloaded()` is true, so the job 110 instance is deleted, and future failures will not mention this 111 directory any more. 112 113 Only steps 1. and 4. are guaranteed to occur. The others depend 114 on the timing of calls to `offload()`, and on the reliability of 115 the actual offload process. 116 117 """ 118 GLOB_PATTERN = None # must be redefined in subclass 119 120 def __init__(self, resultsdir): 121 self.dirname = resultsdir 122 self._id = get_job_id_or_task_id(resultsdir) 123 self.offload_count = 0 124 self.first_offload_start = 0 125 126 @classmethod 127 def get_job_directories(cls): 128 """Return a list of directories of jobs that need offloading.""" 129 return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)] 130 131 @abc.abstractmethod 132 def get_timestamp_if_finished(self): 133 """Return this job's timestamp from the database. 134 135 If the database has not marked the job as finished, return 136 `None`. Otherwise, return a timestamp for the job. The 137 timestamp is to be used to determine expiration in 138 `is_job_expired()`. 139 140 @return Return `None` if the job is still running; otherwise 141 return a string with a timestamp in the appropriate 142 format. 143 """ 144 raise NotImplementedError("_JobDirectory.get_timestamp_if_finished") 145 146 def process_gs_instructions(self): 147 """Process any gs_offloader instructions for this special task. 148 149 @returns True/False if there is anything left to offload. 150 """ 151 # Default support is to still offload the directory. 152 return True 153 154 155NO_OFFLOAD_README = """These results have been deleted rather than offloaded. 156This is the expected behavior for passing jobs from the Commit Queue.""" 157 158 159class RegularJobDirectory(_JobDirectory): 160 """Subclass of _JobDirectory for regular test jobs.""" 161 162 GLOB_PATTERN = '[0-9]*-*' 163 164 def process_gs_instructions(self): 165 """Process any gs_offloader instructions for this job. 166 167 @returns True/False if there is anything left to offload. 168 """ 169 # Go through the gs_offloader instructions file for each test in this job. 170 for path in glob.glob( 171 os.path.join(self.dirname, '*', 172 constants.GS_OFFLOADER_INSTRUCTIONS)): 173 with open(path, 'r') as f: 174 gs_off_instructions = json.load(f) 175 if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD): 176 dirname = os.path.dirname(path) 177 _remove_log_directory_contents(dirname) 178 179 # Finally check if there's anything left to offload. 180 if os.path.exists(self.dirname) and not os.listdir(self.dirname): 181 shutil.rmtree(self.dirname) 182 return False 183 return True 184 185 def get_timestamp_if_finished(self): 186 """Get the timestamp to use for finished jobs. 187 188 @returns the latest hqe finished_on time. If the finished_on times are null 189 returns the job's created_on time. 190 """ 191 entry = _cached_afe().get_jobs(id=self._id, finished=True) 192 if not entry: 193 return None 194 hqes = _cached_afe().get_host_queue_entries(finished_on__isnull=False, 195 job_id=self._id) 196 if not hqes: 197 return entry[0].created_on 198 # While most Jobs have 1 HQE, some can have multiple, so check them all. 199 return max([hqe.finished_on for hqe in hqes]) 200 201 202def _remove_log_directory_contents(dirpath): 203 """Remove log directory contents. 204 205 Leave a note explaining what has happened to the logs. 206 207 @param dirpath: Path to log directory. 208 """ 209 shutil.rmtree(dirpath) 210 os.mkdir(dirpath) 211 breadcrumb_name = os.path.join(dirpath, 'logs-removed-readme.txt') 212 with open(breadcrumb_name, 'w') as f: 213 f.write(NO_OFFLOAD_README) 214 215 216class SpecialJobDirectory(_JobDirectory): 217 """Subclass of _JobDirectory for special (per-host) jobs.""" 218 219 GLOB_PATTERN = 'hosts/*/[0-9]*-*' 220 221 def __init__(self, resultsdir): 222 super(SpecialJobDirectory, self).__init__(resultsdir) 223 224 def get_timestamp_if_finished(self): 225 entry = _cached_afe().get_special_tasks(id=self._id, is_complete=True) 226 return entry[0].time_finished if entry else None 227 228 229def _find_results_dir(dirname): 230 subdirs = [] 231 for root, dirs, files in os.walk(dirname, topdown=True): 232 for f in files: 233 if f == _OFFLOAD_MARKER: 234 subdirs.append(root) 235 return subdirs 236 237 238_OFFLOAD_MARKER = ".ready_for_offload" 239_marker_parse_error_metric = metrics.Counter( 240 'chromeos/autotest/gs_offloader/offload_marker_parse_errors', 241 description='Errors parsing the offload marker file') 242 243 244class SwarmingJobDirectory(_JobDirectory): 245 """Subclass of _JobDirectory for Skylab swarming jobs.""" 246 247 @classmethod 248 def get_job_directories(cls): 249 """Return a list of directories of jobs that need offloading.""" 250 # Legacy swarming results are in directories like 251 # .../results/swarming-3e4391423c3a4311 252 # In particular, the ending digit is never 0 253 jobdirs = [ 254 d for d in glob.glob('swarming-[0-9a-f]*[1-9a-f]') 255 if os.path.isdir(d) 256 ] 257 # New style swarming results are in directories like 258 # .../results/swarming-3e4391423c3a4310/1 259 # - Results are one directory deeper. 260 # - Ending digit of first directory is always 0. 261 new_style_topdir = [ 262 d for d in glob.glob('swarming-[0-9a-f]*0') if os.path.isdir(d) 263 ] 264 # When there are multiple tests run in one test_runner build, 265 # the results will be one level deeper with the test_id 266 # as one further subdirectory. 267 # Example: .../results/swarming-3e4391423c3a4310/1/test_id 268 for topdir in new_style_topdir: 269 for d in glob.glob('%s/[1-9a-f]*' % topdir): 270 subdirs = _find_results_dir(d) 271 jobdirs += subdirs 272 273 return jobdirs 274 275 def get_timestamp_if_finished(self): 276 """Get the timestamp to use for finished jobs. 277 278 @returns the latest hqe finished_on time. If the finished_on times are null 279 returns the job's created_on time. 280 """ 281 marker_path = os.path.join(self.dirname, _OFFLOAD_MARKER) 282 try: 283 with open(marker_path) as f: 284 ts_string = f.read().strip() 285 except: 286 return None 287 try: 288 ts = int(ts_string) 289 return time_utils.epoch_time_to_date_string(ts) 290 except ValueError as e: 291 logging.debug('Error parsing %s for %s: %s', _OFFLOAD_MARKER, 292 self.dirname, e) 293 _marker_parse_error_metric.increment() 294 return None 295 296 297_AFE = None 298def _cached_afe(): 299 global _AFE 300 if _AFE is None: 301 _AFE = frontend_wrappers.RetryingAFE() 302 return _AFE