1*9c5db199SXin Liimport abc 2*9c5db199SXin Liimport datetime 3*9c5db199SXin Liimport glob 4*9c5db199SXin Liimport json 5*9c5db199SXin Liimport logging 6*9c5db199SXin Liimport os 7*9c5db199SXin Liimport re 8*9c5db199SXin Liimport shutil 9*9c5db199SXin Liimport six 10*9c5db199SXin Li 11*9c5db199SXin Liimport common 12*9c5db199SXin Li 13*9c5db199SXin Lifrom autotest_lib.client.common_lib import time_utils 14*9c5db199SXin Lifrom autotest_lib.client.common_lib import utils 15*9c5db199SXin Lifrom autotest_lib.server.cros.dynamic_suite import constants 16*9c5db199SXin Lifrom autotest_lib.server.cros.dynamic_suite import frontend_wrappers 17*9c5db199SXin Li 18*9c5db199SXin Litry: 19*9c5db199SXin Li from autotest_lib.utils.frozen_chromite.lib import metrics 20*9c5db199SXin Liexcept ImportError: 21*9c5db199SXin Li metrics = utils.metrics_mock 22*9c5db199SXin Li 23*9c5db199SXin Li 24*9c5db199SXin LiSPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+' 25*9c5db199SXin Li 26*9c5db199SXin Lidef is_job_expired(age_limit, timestamp): 27*9c5db199SXin Li """Check whether a job timestamp is older than an age limit. 28*9c5db199SXin Li 29*9c5db199SXin Li @param age_limit: Minimum age, measured in days. If the value is 30*9c5db199SXin Li not positive, the job is always expired. 31*9c5db199SXin Li @param timestamp: Timestamp of the job whose age we are checking. 32*9c5db199SXin Li The format must match time_utils.TIME_FMT. 33*9c5db199SXin Li 34*9c5db199SXin Li @returns True if the job is old enough to be expired. 35*9c5db199SXin Li """ 36*9c5db199SXin Li if age_limit <= 0: 37*9c5db199SXin Li return True 38*9c5db199SXin Li job_time = time_utils.time_string_to_datetime(timestamp) 39*9c5db199SXin Li expiration = job_time + datetime.timedelta(days=age_limit) 40*9c5db199SXin Li return datetime.datetime.now() >= expiration 41*9c5db199SXin Li 42*9c5db199SXin Li 43*9c5db199SXin Lidef get_job_id_or_task_id(result_dir): 44*9c5db199SXin Li """Extract job id or special task id from result_dir 45*9c5db199SXin Li 46*9c5db199SXin Li @param result_dir: path to the result dir. 47*9c5db199SXin Li For test job: 48*9c5db199SXin Li /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6 49*9c5db199SXin Li The hostname at the end is optional. 50*9c5db199SXin Li For special task: 51*9c5db199SXin Li /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup 52*9c5db199SXin Li 53*9c5db199SXin Li @returns: str representing the job id or task id. Returns None if fail 54*9c5db199SXin Li to parse job or task id from the result_dir. 55*9c5db199SXin Li """ 56*9c5db199SXin Li if not result_dir: 57*9c5db199SXin Li return 58*9c5db199SXin Li result_dir = os.path.abspath(result_dir) 59*9c5db199SXin Li # Result folder for job running inside container has only job id. 60*9c5db199SXin Li ssp_job_pattern = '.*/(\d+)$' 61*9c5db199SXin Li # Try to get the job ID from the last pattern of number-text. This avoids 62*9c5db199SXin Li # issue with path like 123-results/456-debug_user, in which 456 is the real 63*9c5db199SXin Li # job ID. 64*9c5db199SXin Li m_job = re.findall('.*/(\d+)-[^/]+', result_dir) 65*9c5db199SXin Li if m_job: 66*9c5db199SXin Li return m_job[-1] 67*9c5db199SXin Li m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir) 68*9c5db199SXin Li if m_special_task: 69*9c5db199SXin Li return m_special_task.group(1) 70*9c5db199SXin Li m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir) 71*9c5db199SXin Li if m_ssp_job_pattern and utils.is_in_container(): 72*9c5db199SXin Li return m_ssp_job_pattern.group(1) 73*9c5db199SXin Li return _get_swarming_run_id(result_dir) 74*9c5db199SXin Li 75*9c5db199SXin Li 76*9c5db199SXin Lidef _get_swarming_run_id(path): 77*9c5db199SXin Li """Extract the Swarming run_id for a Skylab task from the result path.""" 78*9c5db199SXin Li # Legacy swarming results are in directories like 79*9c5db199SXin Li # .../results/swarming-3e4391423c3a4311 80*9c5db199SXin Li # In particular, the ending digit is never 0 81*9c5db199SXin Li m_legacy_path = re.match('.*/swarming-([0-9a-fA-F]*[1-9a-fA-F])$', path) 82*9c5db199SXin Li if m_legacy_path: 83*9c5db199SXin Li return m_legacy_path.group(1) 84*9c5db199SXin Li # New style swarming results are in directories like 85*9c5db199SXin Li # .../results/swarming-3e4391423c3a4310/1 86*9c5db199SXin Li # - Results are one directory deeper. 87*9c5db199SXin Li # - Ending digit of first directory is always 0. 88*9c5db199SXin Li m_path = re.match('.*/swarming-([0-9a-fA-F]*)0/([1-9a-fA-F])$', path) 89*9c5db199SXin Li if m_path: 90*9c5db199SXin Li return m_path.group(1) + m_path.group(2) 91*9c5db199SXin Li return None 92*9c5db199SXin Li 93*9c5db199SXin Li 94*9c5db199SXin Liclass _JobDirectory(six.with_metaclass(abc.ABCMeta, object)): 95*9c5db199SXin Li """State associated with a job to be offloaded. 96*9c5db199SXin Li 97*9c5db199SXin Li The full life-cycle of a job (including failure events that 98*9c5db199SXin Li normally don't occur) looks like this: 99*9c5db199SXin Li 1. The job's results directory is discovered by 100*9c5db199SXin Li `get_job_directories()`, and a job instance is created for it. 101*9c5db199SXin Li 2. Calls to `offload()` have no effect so long as the job 102*9c5db199SXin Li isn't complete in the database and the job isn't expired 103*9c5db199SXin Li according to the `age_limit` parameter. 104*9c5db199SXin Li 3. Eventually, the job is both finished and expired. The next 105*9c5db199SXin Li call to `offload()` makes the first attempt to offload the 106*9c5db199SXin Li directory to GS. Offload is attempted, but fails to complete 107*9c5db199SXin Li (e.g. because of a GS problem). 108*9c5db199SXin Li 4. Finally, a call to `offload()` succeeds, and the directory no 109*9c5db199SXin Li longer exists. Now `is_offloaded()` is true, so the job 110*9c5db199SXin Li instance is deleted, and future failures will not mention this 111*9c5db199SXin Li directory any more. 112*9c5db199SXin Li 113*9c5db199SXin Li Only steps 1. and 4. are guaranteed to occur. The others depend 114*9c5db199SXin Li on the timing of calls to `offload()`, and on the reliability of 115*9c5db199SXin Li the actual offload process. 116*9c5db199SXin Li 117*9c5db199SXin Li """ 118*9c5db199SXin Li GLOB_PATTERN = None # must be redefined in subclass 119*9c5db199SXin Li 120*9c5db199SXin Li def __init__(self, resultsdir): 121*9c5db199SXin Li self.dirname = resultsdir 122*9c5db199SXin Li self._id = get_job_id_or_task_id(resultsdir) 123*9c5db199SXin Li self.offload_count = 0 124*9c5db199SXin Li self.first_offload_start = 0 125*9c5db199SXin Li 126*9c5db199SXin Li @classmethod 127*9c5db199SXin Li def get_job_directories(cls): 128*9c5db199SXin Li """Return a list of directories of jobs that need offloading.""" 129*9c5db199SXin Li return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)] 130*9c5db199SXin Li 131*9c5db199SXin Li @abc.abstractmethod 132*9c5db199SXin Li def get_timestamp_if_finished(self): 133*9c5db199SXin Li """Return this job's timestamp from the database. 134*9c5db199SXin Li 135*9c5db199SXin Li If the database has not marked the job as finished, return 136*9c5db199SXin Li `None`. Otherwise, return a timestamp for the job. The 137*9c5db199SXin Li timestamp is to be used to determine expiration in 138*9c5db199SXin Li `is_job_expired()`. 139*9c5db199SXin Li 140*9c5db199SXin Li @return Return `None` if the job is still running; otherwise 141*9c5db199SXin Li return a string with a timestamp in the appropriate 142*9c5db199SXin Li format. 143*9c5db199SXin Li """ 144*9c5db199SXin Li raise NotImplementedError("_JobDirectory.get_timestamp_if_finished") 145*9c5db199SXin Li 146*9c5db199SXin Li def process_gs_instructions(self): 147*9c5db199SXin Li """Process any gs_offloader instructions for this special task. 148*9c5db199SXin Li 149*9c5db199SXin Li @returns True/False if there is anything left to offload. 150*9c5db199SXin Li """ 151*9c5db199SXin Li # Default support is to still offload the directory. 152*9c5db199SXin Li return True 153*9c5db199SXin Li 154*9c5db199SXin Li 155*9c5db199SXin LiNO_OFFLOAD_README = """These results have been deleted rather than offloaded. 156*9c5db199SXin LiThis is the expected behavior for passing jobs from the Commit Queue.""" 157*9c5db199SXin Li 158*9c5db199SXin Li 159*9c5db199SXin Liclass RegularJobDirectory(_JobDirectory): 160*9c5db199SXin Li """Subclass of _JobDirectory for regular test jobs.""" 161*9c5db199SXin Li 162*9c5db199SXin Li GLOB_PATTERN = '[0-9]*-*' 163*9c5db199SXin Li 164*9c5db199SXin Li def process_gs_instructions(self): 165*9c5db199SXin Li """Process any gs_offloader instructions for this job. 166*9c5db199SXin Li 167*9c5db199SXin Li @returns True/False if there is anything left to offload. 168*9c5db199SXin Li """ 169*9c5db199SXin Li # Go through the gs_offloader instructions file for each test in this job. 170*9c5db199SXin Li for path in glob.glob( 171*9c5db199SXin Li os.path.join(self.dirname, '*', 172*9c5db199SXin Li constants.GS_OFFLOADER_INSTRUCTIONS)): 173*9c5db199SXin Li with open(path, 'r') as f: 174*9c5db199SXin Li gs_off_instructions = json.load(f) 175*9c5db199SXin Li if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD): 176*9c5db199SXin Li dirname = os.path.dirname(path) 177*9c5db199SXin Li _remove_log_directory_contents(dirname) 178*9c5db199SXin Li 179*9c5db199SXin Li # Finally check if there's anything left to offload. 180*9c5db199SXin Li if os.path.exists(self.dirname) and not os.listdir(self.dirname): 181*9c5db199SXin Li shutil.rmtree(self.dirname) 182*9c5db199SXin Li return False 183*9c5db199SXin Li return True 184*9c5db199SXin Li 185*9c5db199SXin Li def get_timestamp_if_finished(self): 186*9c5db199SXin Li """Get the timestamp to use for finished jobs. 187*9c5db199SXin Li 188*9c5db199SXin Li @returns the latest hqe finished_on time. If the finished_on times are null 189*9c5db199SXin Li returns the job's created_on time. 190*9c5db199SXin Li """ 191*9c5db199SXin Li entry = _cached_afe().get_jobs(id=self._id, finished=True) 192*9c5db199SXin Li if not entry: 193*9c5db199SXin Li return None 194*9c5db199SXin Li hqes = _cached_afe().get_host_queue_entries(finished_on__isnull=False, 195*9c5db199SXin Li job_id=self._id) 196*9c5db199SXin Li if not hqes: 197*9c5db199SXin Li return entry[0].created_on 198*9c5db199SXin Li # While most Jobs have 1 HQE, some can have multiple, so check them all. 199*9c5db199SXin Li return max([hqe.finished_on for hqe in hqes]) 200*9c5db199SXin Li 201*9c5db199SXin Li 202*9c5db199SXin Lidef _remove_log_directory_contents(dirpath): 203*9c5db199SXin Li """Remove log directory contents. 204*9c5db199SXin Li 205*9c5db199SXin Li Leave a note explaining what has happened to the logs. 206*9c5db199SXin Li 207*9c5db199SXin Li @param dirpath: Path to log directory. 208*9c5db199SXin Li """ 209*9c5db199SXin Li shutil.rmtree(dirpath) 210*9c5db199SXin Li os.mkdir(dirpath) 211*9c5db199SXin Li breadcrumb_name = os.path.join(dirpath, 'logs-removed-readme.txt') 212*9c5db199SXin Li with open(breadcrumb_name, 'w') as f: 213*9c5db199SXin Li f.write(NO_OFFLOAD_README) 214*9c5db199SXin Li 215*9c5db199SXin Li 216*9c5db199SXin Liclass SpecialJobDirectory(_JobDirectory): 217*9c5db199SXin Li """Subclass of _JobDirectory for special (per-host) jobs.""" 218*9c5db199SXin Li 219*9c5db199SXin Li GLOB_PATTERN = 'hosts/*/[0-9]*-*' 220*9c5db199SXin Li 221*9c5db199SXin Li def __init__(self, resultsdir): 222*9c5db199SXin Li super(SpecialJobDirectory, self).__init__(resultsdir) 223*9c5db199SXin Li 224*9c5db199SXin Li def get_timestamp_if_finished(self): 225*9c5db199SXin Li entry = _cached_afe().get_special_tasks(id=self._id, is_complete=True) 226*9c5db199SXin Li return entry[0].time_finished if entry else None 227*9c5db199SXin Li 228*9c5db199SXin Li 229*9c5db199SXin Lidef _find_results_dir(dirname): 230*9c5db199SXin Li subdirs = [] 231*9c5db199SXin Li for root, dirs, files in os.walk(dirname, topdown=True): 232*9c5db199SXin Li for f in files: 233*9c5db199SXin Li if f == _OFFLOAD_MARKER: 234*9c5db199SXin Li subdirs.append(root) 235*9c5db199SXin Li return subdirs 236*9c5db199SXin Li 237*9c5db199SXin Li 238*9c5db199SXin Li_OFFLOAD_MARKER = ".ready_for_offload" 239*9c5db199SXin Li_marker_parse_error_metric = metrics.Counter( 240*9c5db199SXin Li 'chromeos/autotest/gs_offloader/offload_marker_parse_errors', 241*9c5db199SXin Li description='Errors parsing the offload marker file') 242*9c5db199SXin Li 243*9c5db199SXin Li 244*9c5db199SXin Liclass SwarmingJobDirectory(_JobDirectory): 245*9c5db199SXin Li """Subclass of _JobDirectory for Skylab swarming jobs.""" 246*9c5db199SXin Li 247*9c5db199SXin Li @classmethod 248*9c5db199SXin Li def get_job_directories(cls): 249*9c5db199SXin Li """Return a list of directories of jobs that need offloading.""" 250*9c5db199SXin Li # Legacy swarming results are in directories like 251*9c5db199SXin Li # .../results/swarming-3e4391423c3a4311 252*9c5db199SXin Li # In particular, the ending digit is never 0 253*9c5db199SXin Li jobdirs = [ 254*9c5db199SXin Li d for d in glob.glob('swarming-[0-9a-f]*[1-9a-f]') 255*9c5db199SXin Li if os.path.isdir(d) 256*9c5db199SXin Li ] 257*9c5db199SXin Li # New style swarming results are in directories like 258*9c5db199SXin Li # .../results/swarming-3e4391423c3a4310/1 259*9c5db199SXin Li # - Results are one directory deeper. 260*9c5db199SXin Li # - Ending digit of first directory is always 0. 261*9c5db199SXin Li new_style_topdir = [ 262*9c5db199SXin Li d for d in glob.glob('swarming-[0-9a-f]*0') if os.path.isdir(d) 263*9c5db199SXin Li ] 264*9c5db199SXin Li # When there are multiple tests run in one test_runner build, 265*9c5db199SXin Li # the results will be one level deeper with the test_id 266*9c5db199SXin Li # as one further subdirectory. 267*9c5db199SXin Li # Example: .../results/swarming-3e4391423c3a4310/1/test_id 268*9c5db199SXin Li for topdir in new_style_topdir: 269*9c5db199SXin Li for d in glob.glob('%s/[1-9a-f]*' % topdir): 270*9c5db199SXin Li subdirs = _find_results_dir(d) 271*9c5db199SXin Li jobdirs += subdirs 272*9c5db199SXin Li 273*9c5db199SXin Li return jobdirs 274*9c5db199SXin Li 275*9c5db199SXin Li def get_timestamp_if_finished(self): 276*9c5db199SXin Li """Get the timestamp to use for finished jobs. 277*9c5db199SXin Li 278*9c5db199SXin Li @returns the latest hqe finished_on time. If the finished_on times are null 279*9c5db199SXin Li returns the job's created_on time. 280*9c5db199SXin Li """ 281*9c5db199SXin Li marker_path = os.path.join(self.dirname, _OFFLOAD_MARKER) 282*9c5db199SXin Li try: 283*9c5db199SXin Li with open(marker_path) as f: 284*9c5db199SXin Li ts_string = f.read().strip() 285*9c5db199SXin Li except: 286*9c5db199SXin Li return None 287*9c5db199SXin Li try: 288*9c5db199SXin Li ts = int(ts_string) 289*9c5db199SXin Li return time_utils.epoch_time_to_date_string(ts) 290*9c5db199SXin Li except ValueError as e: 291*9c5db199SXin Li logging.debug('Error parsing %s for %s: %s', _OFFLOAD_MARKER, 292*9c5db199SXin Li self.dirname, e) 293*9c5db199SXin Li _marker_parse_error_metric.increment() 294*9c5db199SXin Li return None 295*9c5db199SXin Li 296*9c5db199SXin Li 297*9c5db199SXin Li_AFE = None 298*9c5db199SXin Lidef _cached_afe(): 299*9c5db199SXin Li global _AFE 300*9c5db199SXin Li if _AFE is None: 301*9c5db199SXin Li _AFE = frontend_wrappers.RetryingAFE() 302*9c5db199SXin Li return _AFE