1*9c5db199SXin Li#!/usr/bin/python3 2*9c5db199SXin Li# 3*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be 5*9c5db199SXin Li# found in the LICENSE file. 6*9c5db199SXin Li 7*9c5db199SXin Li"""Script to archive old Autotest results to Google Storage. 8*9c5db199SXin Li 9*9c5db199SXin LiUses gsutil to archive files to the configured Google Storage bucket. 10*9c5db199SXin LiUpon successful copy, the local results directory is deleted. 11*9c5db199SXin Li""" 12*9c5db199SXin Li 13*9c5db199SXin Lifrom __future__ import absolute_import 14*9c5db199SXin Lifrom __future__ import division 15*9c5db199SXin Lifrom __future__ import print_function 16*9c5db199SXin Li 17*9c5db199SXin Liimport abc 18*9c5db199SXin Litry: 19*9c5db199SXin Li import cachetools 20*9c5db199SXin Liexcept ImportError: 21*9c5db199SXin Li cachetools = None 22*9c5db199SXin Liimport datetime 23*9c5db199SXin Liimport errno 24*9c5db199SXin Liimport glob 25*9c5db199SXin Liimport logging 26*9c5db199SXin Liimport logging.handlers 27*9c5db199SXin Liimport os 28*9c5db199SXin Liimport re 29*9c5db199SXin Liimport shutil 30*9c5db199SXin Liimport stat 31*9c5db199SXin Liimport subprocess 32*9c5db199SXin Liimport sys 33*9c5db199SXin Liimport tarfile 34*9c5db199SXin Liimport tempfile 35*9c5db199SXin Liimport time 36*9c5db199SXin Li 37*9c5db199SXin Lifrom optparse import OptionParser 38*9c5db199SXin Li 39*9c5db199SXin Liimport common 40*9c5db199SXin Lifrom autotest_lib.client.common_lib import file_utils 41*9c5db199SXin Lifrom autotest_lib.client.common_lib import global_config 42*9c5db199SXin Lifrom autotest_lib.client.common_lib import utils 43*9c5db199SXin Lifrom autotest_lib.site_utils import job_directories 44*9c5db199SXin Li# For unittest, the cloud_console.proto is not compiled yet. 45*9c5db199SXin Litry: 46*9c5db199SXin Li from autotest_lib.site_utils import cloud_console_client 47*9c5db199SXin Liexcept ImportError: 48*9c5db199SXin Li cloud_console_client = None 49*9c5db199SXin Lifrom autotest_lib.tko import models 50*9c5db199SXin Lifrom autotest_lib.utils import labellib 51*9c5db199SXin Lifrom autotest_lib.utils import gslib 52*9c5db199SXin Lifrom autotest_lib.utils.side_effects import config_loader 53*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import timeout_util 54*9c5db199SXin Li 55*9c5db199SXin Li# Autotest requires the psutil module from site-packages, so it must be imported 56*9c5db199SXin Li# after "import common". 57*9c5db199SXin Litry: 58*9c5db199SXin Li # Does not exist, nor is needed, on moblab. 59*9c5db199SXin Li import psutil 60*9c5db199SXin Liexcept ImportError: 61*9c5db199SXin Li psutil = None 62*9c5db199SXin Li 63*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import parallel 64*9c5db199SXin Liimport six 65*9c5db199SXin Litry: 66*9c5db199SXin Li from autotest_lib.utils.frozen_chromite.lib import metrics 67*9c5db199SXin Li from autotest_lib.utils.frozen_chromite.lib import ts_mon_config 68*9c5db199SXin Liexcept ImportError: 69*9c5db199SXin Li metrics = utils.metrics_mock 70*9c5db199SXin Li ts_mon_config = utils.metrics_mock 71*9c5db199SXin Li 72*9c5db199SXin Li 73*9c5db199SXin LiGS_OFFLOADING_ENABLED = global_config.global_config.get_config_value( 74*9c5db199SXin Li 'CROS', 'gs_offloading_enabled', type=bool, default=True) 75*9c5db199SXin Li 76*9c5db199SXin Li# Nice setting for process, the higher the number the lower the priority. 77*9c5db199SXin LiNICENESS = 10 78*9c5db199SXin Li 79*9c5db199SXin Li# Maximum number of seconds to allow for offloading a single 80*9c5db199SXin Li# directory. 81*9c5db199SXin LiOFFLOAD_TIMEOUT_SECS = 60 * 60 82*9c5db199SXin Li 83*9c5db199SXin Li# Sleep time per loop. 84*9c5db199SXin LiSLEEP_TIME_SECS = 5 85*9c5db199SXin Li 86*9c5db199SXin Li# Minimum number of seconds between e-mail reports. 87*9c5db199SXin LiREPORT_INTERVAL_SECS = 60 * 60 88*9c5db199SXin Li 89*9c5db199SXin Li# Location of Autotest results on disk. 90*9c5db199SXin LiRESULTS_DIR = '/usr/local/autotest/results' 91*9c5db199SXin LiFAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS') 92*9c5db199SXin Li 93*9c5db199SXin LiFAILED_OFFLOADS_FILE_HEADER = ''' 94*9c5db199SXin LiThis is the list of gs_offloader failed jobs. 95*9c5db199SXin LiLast offloader attempt at %s failed to offload %d files. 96*9c5db199SXin LiCheck http://go/cros-triage-gsoffloader to triage the issue 97*9c5db199SXin Li 98*9c5db199SXin Li 99*9c5db199SXin LiFirst failure Count Directory name 100*9c5db199SXin Li=================== ====== ============================== 101*9c5db199SXin Li''' 102*9c5db199SXin Li# --+----1----+---- ----+ ----+----1----+----2----+----3 103*9c5db199SXin Li 104*9c5db199SXin LiFAILED_OFFLOADS_LINE_FORMAT = '%19s %5d %-1s\n' 105*9c5db199SXin LiFAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' 106*9c5db199SXin Li 107*9c5db199SXin LiUSE_RSYNC_ENABLED = global_config.global_config.get_config_value( 108*9c5db199SXin Li 'CROS', 'gs_offloader_use_rsync', type=bool, default=False) 109*9c5db199SXin Li 110*9c5db199SXin LiLIMIT_FILE_COUNT = True 111*9c5db199SXin Li 112*9c5db199SXin Li# Use multiprocessing for gsutil uploading. 113*9c5db199SXin LiGS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value( 114*9c5db199SXin Li 'CROS', 'gs_offloader_multiprocessing', type=bool, default=False) 115*9c5db199SXin Li 116*9c5db199SXin Li 117*9c5db199SXin Li# metadata type 118*9c5db199SXin LiGS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success' 119*9c5db199SXin LiGS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure' 120*9c5db199SXin Li 121*9c5db199SXin Li# Autotest test to collect list of CTS tests 122*9c5db199SXin LiTEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only' 123*9c5db199SXin Li 124*9c5db199SXin Lidef _get_metrics_fields(dir_entry): 125*9c5db199SXin Li """Get metrics fields for the given test result directory, including board 126*9c5db199SXin Li and milestone. 127*9c5db199SXin Li 128*9c5db199SXin Li @param dir_entry: Directory entry to offload. 129*9c5db199SXin Li @return A dictionary for the metrics data to be uploaded. 130*9c5db199SXin Li """ 131*9c5db199SXin Li fields = {'board': 'unknown', 132*9c5db199SXin Li 'milestone': 'unknown'} 133*9c5db199SXin Li if dir_entry: 134*9c5db199SXin Li # There could be multiple hosts in the job directory, use the first one 135*9c5db199SXin Li # available. 136*9c5db199SXin Li for host in glob.glob(os.path.join(dir_entry, '*')): 137*9c5db199SXin Li try: 138*9c5db199SXin Li keyval = models.test.parse_job_keyval(host) 139*9c5db199SXin Li except ValueError: 140*9c5db199SXin Li continue 141*9c5db199SXin Li build = keyval.get('build') 142*9c5db199SXin Li if build: 143*9c5db199SXin Li try: 144*9c5db199SXin Li cros_version = labellib.parse_cros_version(build) 145*9c5db199SXin Li fields['board'] = cros_version.board 146*9c5db199SXin Li fields['milestone'] = cros_version.milestone 147*9c5db199SXin Li break 148*9c5db199SXin Li except ValueError: 149*9c5db199SXin Li # Ignore version parsing error so it won't crash 150*9c5db199SXin Li # gs_offloader. 151*9c5db199SXin Li pass 152*9c5db199SXin Li 153*9c5db199SXin Li return fields 154*9c5db199SXin Li 155*9c5db199SXin Li 156*9c5db199SXin Lidef _get_cmd_list(multiprocessing, dir_entry, gs_path): 157*9c5db199SXin Li """Return the command to offload a specified directory. 158*9c5db199SXin Li 159*9c5db199SXin Li @param multiprocessing: True to turn on -m option for gsutil. 160*9c5db199SXin Li @param dir_entry: Directory entry/path that which we need a cmd_list 161*9c5db199SXin Li to offload. 162*9c5db199SXin Li @param gs_path: Location in google storage where we will 163*9c5db199SXin Li offload the directory. 164*9c5db199SXin Li 165*9c5db199SXin Li @return A command list to be executed by Popen. 166*9c5db199SXin Li """ 167*9c5db199SXin Li cmd = ['gsutil'] 168*9c5db199SXin Li if multiprocessing: 169*9c5db199SXin Li cmd.append('-m') 170*9c5db199SXin Li if USE_RSYNC_ENABLED: 171*9c5db199SXin Li cmd.append('rsync') 172*9c5db199SXin Li target = os.path.join(gs_path, os.path.basename(dir_entry)) 173*9c5db199SXin Li else: 174*9c5db199SXin Li cmd.append('cp') 175*9c5db199SXin Li target = gs_path 176*9c5db199SXin Li cmd += ['-eR', dir_entry, target] 177*9c5db199SXin Li return cmd 178*9c5db199SXin Li 179*9c5db199SXin Li 180*9c5db199SXin Lidef _get_finish_cmd_list(gs_path): 181*9c5db199SXin Li """Returns a command to remotely mark a given gs path as finished. 182*9c5db199SXin Li 183*9c5db199SXin Li @param gs_path: Location in google storage where the offload directory 184*9c5db199SXin Li should be marked as finished. 185*9c5db199SXin Li 186*9c5db199SXin Li @return A command list to be executed by Popen. 187*9c5db199SXin Li """ 188*9c5db199SXin Li target = os.path.join(gs_path, '.finished_offload') 189*9c5db199SXin Li return [ 190*9c5db199SXin Li 'gsutil', 191*9c5db199SXin Li 'cp', 192*9c5db199SXin Li '/dev/null', 193*9c5db199SXin Li target, 194*9c5db199SXin Li ] 195*9c5db199SXin Li 196*9c5db199SXin Li 197*9c5db199SXin Lidef sanitize_dir(dirpath): 198*9c5db199SXin Li """Sanitize directory for gs upload. 199*9c5db199SXin Li 200*9c5db199SXin Li Symlinks and FIFOS are converted to regular files to fix bugs. 201*9c5db199SXin Li 202*9c5db199SXin Li @param dirpath: Directory entry to be sanitized. 203*9c5db199SXin Li """ 204*9c5db199SXin Li if not os.path.exists(dirpath): 205*9c5db199SXin Li return 206*9c5db199SXin Li _escape_rename(dirpath) 207*9c5db199SXin Li _escape_rename_dir_contents(dirpath) 208*9c5db199SXin Li _sanitize_fifos(dirpath) 209*9c5db199SXin Li _sanitize_symlinks(dirpath) 210*9c5db199SXin Li 211*9c5db199SXin Li 212*9c5db199SXin Lidef _escape_rename_dir_contents(dirpath): 213*9c5db199SXin Li """Recursively rename directory to escape filenames for gs upload. 214*9c5db199SXin Li 215*9c5db199SXin Li @param dirpath: Directory path string. 216*9c5db199SXin Li """ 217*9c5db199SXin Li for filename in os.listdir(dirpath): 218*9c5db199SXin Li path = os.path.join(dirpath, filename) 219*9c5db199SXin Li _escape_rename(path) 220*9c5db199SXin Li for filename in os.listdir(dirpath): 221*9c5db199SXin Li path = os.path.join(dirpath, filename) 222*9c5db199SXin Li if os.path.isdir(path): 223*9c5db199SXin Li _escape_rename_dir_contents(path) 224*9c5db199SXin Li 225*9c5db199SXin Li 226*9c5db199SXin Lidef _escape_rename(path): 227*9c5db199SXin Li """Rename file to escape filenames for gs upload. 228*9c5db199SXin Li 229*9c5db199SXin Li @param path: File path string. 230*9c5db199SXin Li """ 231*9c5db199SXin Li dirpath, filename = os.path.split(path) 232*9c5db199SXin Li sanitized_filename = gslib.escape(filename) 233*9c5db199SXin Li sanitized_path = os.path.join(dirpath, sanitized_filename) 234*9c5db199SXin Li os.rename(path, sanitized_path) 235*9c5db199SXin Li 236*9c5db199SXin Li 237*9c5db199SXin Lidef _sanitize_fifos(dirpath): 238*9c5db199SXin Li """Convert fifos to regular files (fixes crbug.com/684122). 239*9c5db199SXin Li 240*9c5db199SXin Li @param dirpath: Directory path string. 241*9c5db199SXin Li """ 242*9c5db199SXin Li for root, _, files in os.walk(dirpath): 243*9c5db199SXin Li for filename in files: 244*9c5db199SXin Li path = os.path.join(root, filename) 245*9c5db199SXin Li file_stat = os.lstat(path) 246*9c5db199SXin Li if stat.S_ISFIFO(file_stat.st_mode): 247*9c5db199SXin Li _replace_fifo_with_file(path) 248*9c5db199SXin Li 249*9c5db199SXin Li 250*9c5db199SXin Lidef _replace_fifo_with_file(path): 251*9c5db199SXin Li """Replace a fifo with a normal file. 252*9c5db199SXin Li 253*9c5db199SXin Li @param path: Fifo path string. 254*9c5db199SXin Li """ 255*9c5db199SXin Li logging.debug('Removing fifo %s', path) 256*9c5db199SXin Li os.remove(path) 257*9c5db199SXin Li logging.debug('Creating fifo marker %s', path) 258*9c5db199SXin Li with open(path, 'w') as f: 259*9c5db199SXin Li f.write('<FIFO>') 260*9c5db199SXin Li 261*9c5db199SXin Li 262*9c5db199SXin Lidef _sanitize_symlinks(dirpath): 263*9c5db199SXin Li """Convert Symlinks to regular files (fixes crbug.com/692788). 264*9c5db199SXin Li 265*9c5db199SXin Li @param dirpath: Directory path string. 266*9c5db199SXin Li """ 267*9c5db199SXin Li for root, _, files in os.walk(dirpath): 268*9c5db199SXin Li for filename in files: 269*9c5db199SXin Li path = os.path.join(root, filename) 270*9c5db199SXin Li file_stat = os.lstat(path) 271*9c5db199SXin Li if stat.S_ISLNK(file_stat.st_mode): 272*9c5db199SXin Li _replace_symlink_with_file(path) 273*9c5db199SXin Li 274*9c5db199SXin Li 275*9c5db199SXin Lidef _replace_symlink_with_file(path): 276*9c5db199SXin Li """Replace a symlink with a normal file. 277*9c5db199SXin Li 278*9c5db199SXin Li @param path: Symlink path string. 279*9c5db199SXin Li """ 280*9c5db199SXin Li target = os.readlink(path) 281*9c5db199SXin Li logging.debug('Removing symlink %s', path) 282*9c5db199SXin Li os.remove(path) 283*9c5db199SXin Li logging.debug('Creating symlink marker %s', path) 284*9c5db199SXin Li with open(path, 'w') as f: 285*9c5db199SXin Li f.write('<symlink to %s>' % target) 286*9c5db199SXin Li 287*9c5db199SXin Li 288*9c5db199SXin Li# Maximum number of files in the folder. 289*9c5db199SXin Li_MAX_FILE_COUNT = 3000 290*9c5db199SXin Li_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs'] 291*9c5db199SXin Li 292*9c5db199SXin Li 293*9c5db199SXin Lidef _get_zippable_folders(dir_entry): 294*9c5db199SXin Li folders_list = [] 295*9c5db199SXin Li for folder in os.listdir(dir_entry): 296*9c5db199SXin Li folder_path = os.path.join(dir_entry, folder) 297*9c5db199SXin Li if (not os.path.isfile(folder_path) and 298*9c5db199SXin Li not folder in _FOLDERS_NEVER_ZIP): 299*9c5db199SXin Li folders_list.append(folder_path) 300*9c5db199SXin Li return folders_list 301*9c5db199SXin Li 302*9c5db199SXin Li 303*9c5db199SXin Lidef limit_file_count(dir_entry): 304*9c5db199SXin Li """Limit the number of files in given directory. 305*9c5db199SXin Li 306*9c5db199SXin Li The method checks the total number of files in the given directory. 307*9c5db199SXin Li If the number is greater than _MAX_FILE_COUNT, the method will 308*9c5db199SXin Li compress each folder in the given directory, except folders in 309*9c5db199SXin Li _FOLDERS_NEVER_ZIP. 310*9c5db199SXin Li 311*9c5db199SXin Li @param dir_entry: Directory entry to be checked. 312*9c5db199SXin Li """ 313*9c5db199SXin Li try: 314*9c5db199SXin Li count = _count_files(dir_entry) 315*9c5db199SXin Li except ValueError: 316*9c5db199SXin Li logging.warning('Fail to get the file count in folder %s.', dir_entry) 317*9c5db199SXin Li return 318*9c5db199SXin Li if count < _MAX_FILE_COUNT: 319*9c5db199SXin Li return 320*9c5db199SXin Li 321*9c5db199SXin Li # For test job, zip folders in a second level, e.g. 123-debug/host1. 322*9c5db199SXin Li # This is to allow autoserv debug folder still be accessible. 323*9c5db199SXin Li # For special task, it does not need to dig one level deeper. 324*9c5db199SXin Li is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN, 325*9c5db199SXin Li dir_entry) 326*9c5db199SXin Li 327*9c5db199SXin Li folders = _get_zippable_folders(dir_entry) 328*9c5db199SXin Li if not is_special_task: 329*9c5db199SXin Li subfolders = [] 330*9c5db199SXin Li for folder in folders: 331*9c5db199SXin Li subfolders.extend(_get_zippable_folders(folder)) 332*9c5db199SXin Li folders = subfolders 333*9c5db199SXin Li 334*9c5db199SXin Li for folder in folders: 335*9c5db199SXin Li _make_into_tarball(folder) 336*9c5db199SXin Li 337*9c5db199SXin Li 338*9c5db199SXin Lidef _count_files(dirpath): 339*9c5db199SXin Li """Count the number of files in a directory recursively. 340*9c5db199SXin Li 341*9c5db199SXin Li @param dirpath: Directory path string. 342*9c5db199SXin Li """ 343*9c5db199SXin Li return sum(len(files) for _path, _dirs, files in os.walk(dirpath)) 344*9c5db199SXin Li 345*9c5db199SXin Li 346*9c5db199SXin Lidef _make_into_tarball(dirpath): 347*9c5db199SXin Li """Make directory into tarball. 348*9c5db199SXin Li 349*9c5db199SXin Li @param dirpath: Directory path string. 350*9c5db199SXin Li """ 351*9c5db199SXin Li tarpath = '%s.tgz' % dirpath 352*9c5db199SXin Li with tarfile.open(tarpath, 'w:gz') as tar: 353*9c5db199SXin Li tar.add(dirpath, arcname=os.path.basename(dirpath)) 354*9c5db199SXin Li shutil.rmtree(dirpath) 355*9c5db199SXin Li 356*9c5db199SXin Li 357*9c5db199SXin Lidef correct_results_folder_permission(dir_entry): 358*9c5db199SXin Li """Make sure the results folder has the right permission settings. 359*9c5db199SXin Li 360*9c5db199SXin Li For tests running with server-side packaging, the results folder has 361*9c5db199SXin Li the owner of root. This must be changed to the user running the 362*9c5db199SXin Li autoserv process, so parsing job can access the results folder. 363*9c5db199SXin Li 364*9c5db199SXin Li @param dir_entry: Path to the results folder. 365*9c5db199SXin Li """ 366*9c5db199SXin Li if not dir_entry: 367*9c5db199SXin Li return 368*9c5db199SXin Li 369*9c5db199SXin Li logging.info('Trying to correct file permission of %s.', dir_entry) 370*9c5db199SXin Li try: 371*9c5db199SXin Li owner = '%s:%s' % (os.getuid(), os.getgid()) 372*9c5db199SXin Li subprocess.check_call( 373*9c5db199SXin Li ['sudo', '-n', 'chown', '-R', owner, dir_entry]) 374*9c5db199SXin Li subprocess.check_call(['chmod', '-R', 'u+rw', dir_entry]) 375*9c5db199SXin Li subprocess.check_call( 376*9c5db199SXin Li ['find', dir_entry, '-type', 'd', 377*9c5db199SXin Li '-exec', 'chmod', 'u+x', '{}', ';']) 378*9c5db199SXin Li except subprocess.CalledProcessError as e: 379*9c5db199SXin Li logging.error('Failed to modify permission for %s: %s', 380*9c5db199SXin Li dir_entry, e) 381*9c5db199SXin Li 382*9c5db199SXin Li 383*9c5db199SXin Lidef _get_swarming_req_dir(path): 384*9c5db199SXin Li """ 385*9c5db199SXin Li Returns the parent directory of |path|, if |path| is a swarming task result. 386*9c5db199SXin Li 387*9c5db199SXin Li @param path: Full path to the result of a task. 388*9c5db199SXin Li e.g. /tmp/results/swarming-44466815c4bc951/1 389*9c5db199SXin Li 390*9c5db199SXin Li @return string of the parent dir or None if not a swarming task. 391*9c5db199SXin Li """ 392*9c5db199SXin Li m_parent = re.match( 393*9c5db199SXin Li '(?P<parent_dir>.*/swarming-[0-9a-fA-F]*0)/[1-9a-fA-F]$', path) 394*9c5db199SXin Li if m_parent: 395*9c5db199SXin Li return m_parent.group('parent_dir') 396*9c5db199SXin Li return None 397*9c5db199SXin Li 398*9c5db199SXin Li 399*9c5db199SXin Lidef _parse_cts_job_results_file_path(path): 400*9c5db199SXin Li """Parse CTS file paths an extract required information from them.""" 401*9c5db199SXin Li 402*9c5db199SXin Li # Autotest paths look like: 403*9c5db199SXin Li # /317739475-chromeos-test/chromeos4-row9-rack11-host22/ 404*9c5db199SXin Li # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44 405*9c5db199SXin Li 406*9c5db199SXin Li # Swarming paths look like: 407*9c5db199SXin Li # /swarming-458e3a3a7fc6f210/1/autoserv_test/ 408*9c5db199SXin Li # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44 409*9c5db199SXin Li 410*9c5db199SXin Li folders = path.split(os.sep) 411*9c5db199SXin Li if 'swarming' in folders[1]: 412*9c5db199SXin Li # Swarming job and attempt combined 413*9c5db199SXin Li job_id = "%s-%s" % (folders[-7], folders[-6]) 414*9c5db199SXin Li else: 415*9c5db199SXin Li job_id = folders[-6] 416*9c5db199SXin Li 417*9c5db199SXin Li cts_package = folders[-4] 418*9c5db199SXin Li timestamp = folders[-1] 419*9c5db199SXin Li 420*9c5db199SXin Li return job_id, cts_package, timestamp 421*9c5db199SXin Li 422*9c5db199SXin Li 423*9c5db199SXin Lidef _emit_gs_returncode_metric(returncode): 424*9c5db199SXin Li """Increment the gs_returncode counter based on |returncode|.""" 425*9c5db199SXin Li m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode' 426*9c5db199SXin Li rcode = int(returncode) 427*9c5db199SXin Li if rcode < 0 or rcode > 255: 428*9c5db199SXin Li rcode = -1 429*9c5db199SXin Li metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode}) 430*9c5db199SXin Li 431*9c5db199SXin Li 432*9c5db199SXin Lidef _handle_dir_os_error(dir_entry, fix_permission=False): 433*9c5db199SXin Li """Try to fix the result directory's permission issue if needed. 434*9c5db199SXin Li 435*9c5db199SXin Li @param dir_entry: Directory entry to offload. 436*9c5db199SXin Li @param fix_permission: True to change the directory's owner to the same one 437*9c5db199SXin Li running gs_offloader. 438*9c5db199SXin Li """ 439*9c5db199SXin Li if fix_permission: 440*9c5db199SXin Li correct_results_folder_permission(dir_entry) 441*9c5db199SXin Li m_permission_error = ('chromeos/autotest/errors/gs_offloader/' 442*9c5db199SXin Li 'wrong_permissions_count') 443*9c5db199SXin Li metrics_fields = _get_metrics_fields(dir_entry) 444*9c5db199SXin Li metrics.Counter(m_permission_error).increment(fields=metrics_fields) 445*9c5db199SXin Li 446*9c5db199SXin Li 447*9c5db199SXin Liclass BaseGSOffloader(six.with_metaclass(abc.ABCMeta, object)): 448*9c5db199SXin Li 449*9c5db199SXin Li """Google Storage offloader interface.""" 450*9c5db199SXin Li 451*9c5db199SXin Li def offload(self, dir_entry, dest_path, job_complete_time): 452*9c5db199SXin Li """Safely offload a directory entry to Google Storage. 453*9c5db199SXin Li 454*9c5db199SXin Li This method is responsible for copying the contents of 455*9c5db199SXin Li `dir_entry` to Google storage at `dest_path`. 456*9c5db199SXin Li 457*9c5db199SXin Li When successful, the method must delete all of `dir_entry`. 458*9c5db199SXin Li On failure, `dir_entry` should be left undisturbed, in order 459*9c5db199SXin Li to allow for retry. 460*9c5db199SXin Li 461*9c5db199SXin Li Errors are conveyed simply and solely by two methods: 462*9c5db199SXin Li * At the time of failure, write enough information to the log 463*9c5db199SXin Li to allow later debug, if necessary. 464*9c5db199SXin Li * Don't delete the content. 465*9c5db199SXin Li 466*9c5db199SXin Li In order to guarantee robustness, this method must not raise any 467*9c5db199SXin Li exceptions. 468*9c5db199SXin Li 469*9c5db199SXin Li @param dir_entry: Directory entry to offload. 470*9c5db199SXin Li @param dest_path: Location in google storage where we will 471*9c5db199SXin Li offload the directory. 472*9c5db199SXin Li @param job_complete_time: The complete time of the job from the AFE 473*9c5db199SXin Li database. 474*9c5db199SXin Li """ 475*9c5db199SXin Li try: 476*9c5db199SXin Li self._full_offload(dir_entry, dest_path, job_complete_time) 477*9c5db199SXin Li except Exception as e: 478*9c5db199SXin Li logging.debug('Exception in offload for %s', dir_entry) 479*9c5db199SXin Li logging.debug('Ignoring this error: %s', str(e)) 480*9c5db199SXin Li 481*9c5db199SXin Li @abc.abstractmethod 482*9c5db199SXin Li def _full_offload(self, dir_entry, dest_path, job_complete_time): 483*9c5db199SXin Li """Offload a directory entry to Google Storage. 484*9c5db199SXin Li 485*9c5db199SXin Li This method implements the actual offload behavior of its 486*9c5db199SXin Li subclass. To guarantee effective debug, this method should 487*9c5db199SXin Li catch all exceptions, and perform any reasonable diagnosis 488*9c5db199SXin Li or other handling. 489*9c5db199SXin Li 490*9c5db199SXin Li @param dir_entry: Directory entry to offload. 491*9c5db199SXin Li @param dest_path: Location in google storage where we will 492*9c5db199SXin Li offload the directory. 493*9c5db199SXin Li @param job_complete_time: The complete time of the job from the AFE 494*9c5db199SXin Li database. 495*9c5db199SXin Li """ 496*9c5db199SXin Li 497*9c5db199SXin Li 498*9c5db199SXin Liclass GSOffloader(BaseGSOffloader): 499*9c5db199SXin Li """Google Storage Offloader.""" 500*9c5db199SXin Li 501*9c5db199SXin Li def __init__(self, gs_uri, multiprocessing, delete_age, 502*9c5db199SXin Li console_client=None): 503*9c5db199SXin Li """Returns the offload directory function for the given gs_uri 504*9c5db199SXin Li 505*9c5db199SXin Li @param gs_uri: Google storage bucket uri to offload to. 506*9c5db199SXin Li @param multiprocessing: True to turn on -m option for gsutil. 507*9c5db199SXin Li @param console_client: The cloud console client. If None, 508*9c5db199SXin Li cloud console APIs are not called. 509*9c5db199SXin Li """ 510*9c5db199SXin Li self._gs_uri = gs_uri 511*9c5db199SXin Li self._multiprocessing = multiprocessing 512*9c5db199SXin Li self._delete_age = delete_age 513*9c5db199SXin Li self._console_client = console_client 514*9c5db199SXin Li 515*9c5db199SXin Li @metrics.SecondsTimerDecorator( 516*9c5db199SXin Li 'chromeos/autotest/gs_offloader/job_offload_duration') 517*9c5db199SXin Li def _full_offload(self, dir_entry, dest_path, job_complete_time): 518*9c5db199SXin Li """Offload the specified directory entry to Google storage. 519*9c5db199SXin Li 520*9c5db199SXin Li @param dir_entry: Directory entry to offload. 521*9c5db199SXin Li @param dest_path: Location in google storage where we will 522*9c5db199SXin Li offload the directory. 523*9c5db199SXin Li @param job_complete_time: The complete time of the job from the AFE 524*9c5db199SXin Li database. 525*9c5db199SXin Li """ 526*9c5db199SXin Li with tempfile.TemporaryFile('w+') as stdout_file, \ 527*9c5db199SXin Li tempfile.TemporaryFile('w+') as stderr_file: 528*9c5db199SXin Li try: 529*9c5db199SXin Li try: 530*9c5db199SXin Li self._try_offload(dir_entry, dest_path, stdout_file, 531*9c5db199SXin Li stderr_file) 532*9c5db199SXin Li except OSError as e: 533*9c5db199SXin Li # Correct file permission error of the directory, then raise 534*9c5db199SXin Li # the exception so gs_offloader can retry later. 535*9c5db199SXin Li _handle_dir_os_error(dir_entry, e.errno==errno.EACCES) 536*9c5db199SXin Li # Try again after the permission issue is fixed. 537*9c5db199SXin Li self._try_offload(dir_entry, dest_path, stdout_file, 538*9c5db199SXin Li stderr_file) 539*9c5db199SXin Li except _OffloadError as e: 540*9c5db199SXin Li metrics_fields = _get_metrics_fields(dir_entry) 541*9c5db199SXin Li m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error' 542*9c5db199SXin Li metrics.Counter(m_any_error).increment(fields=metrics_fields) 543*9c5db199SXin Li 544*9c5db199SXin Li # Rewind the log files for stdout and stderr and log 545*9c5db199SXin Li # their contents. 546*9c5db199SXin Li stdout_file.seek(0) 547*9c5db199SXin Li stderr_file.seek(0) 548*9c5db199SXin Li stderr_content = stderr_file.read() 549*9c5db199SXin Li logging.warning('Error occurred when offloading %s:', dir_entry) 550*9c5db199SXin Li logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(), 551*9c5db199SXin Li stderr_content) 552*9c5db199SXin Li 553*9c5db199SXin Li # Some result files may have wrong file permission. Try 554*9c5db199SXin Li # to correct such error so later try can success. 555*9c5db199SXin Li # TODO(dshi): The code is added to correct result files 556*9c5db199SXin Li # with wrong file permission caused by bug 511778. After 557*9c5db199SXin Li # this code is pushed to lab and run for a while to 558*9c5db199SXin Li # clean up these files, following code and function 559*9c5db199SXin Li # correct_results_folder_permission can be deleted. 560*9c5db199SXin Li if 'CommandException: Error opening file' in stderr_content: 561*9c5db199SXin Li correct_results_folder_permission(dir_entry) 562*9c5db199SXin Li else: 563*9c5db199SXin Li self._prune(dir_entry, job_complete_time) 564*9c5db199SXin Li swarming_req_dir = _get_swarming_req_dir(dir_entry) 565*9c5db199SXin Li if swarming_req_dir: 566*9c5db199SXin Li self._prune_swarming_req_dir(swarming_req_dir) 567*9c5db199SXin Li 568*9c5db199SXin Li 569*9c5db199SXin Li def _try_offload(self, dir_entry, dest_path, 570*9c5db199SXin Li stdout_file, stderr_file): 571*9c5db199SXin Li """Offload the specified directory entry to Google storage. 572*9c5db199SXin Li 573*9c5db199SXin Li @param dir_entry: Directory entry to offload. 574*9c5db199SXin Li @param dest_path: Location in google storage where we will 575*9c5db199SXin Li offload the directory. 576*9c5db199SXin Li @param job_complete_time: The complete time of the job from the AFE 577*9c5db199SXin Li database. 578*9c5db199SXin Li @param stdout_file: Log file. 579*9c5db199SXin Li @param stderr_file: Log file. 580*9c5db199SXin Li """ 581*9c5db199SXin Li if _is_uploaded(dir_entry): 582*9c5db199SXin Li return 583*9c5db199SXin Li start_time = time.time() 584*9c5db199SXin Li metrics_fields = _get_metrics_fields(dir_entry) 585*9c5db199SXin Li error_obj = _OffloadError(start_time) 586*9c5db199SXin Li config = config_loader.load(dir_entry) 587*9c5db199SXin Li if config: 588*9c5db199SXin Li # TODO(linxinan): use credential file assigned by the side_effect 589*9c5db199SXin Li # config. 590*9c5db199SXin Li if config.google_storage.bucket: 591*9c5db199SXin Li gs_prefix = ('' if 592*9c5db199SXin Li config.google_storage.bucket.startswith('gs://') 593*9c5db199SXin Li else 'gs://') 594*9c5db199SXin Li self._gs_uri = gs_prefix + config.google_storage.bucket 595*9c5db199SXin Li else: 596*9c5db199SXin Li # For now, the absence of config does not block gs_offloader 597*9c5db199SXin Li # from uploading files via default credential. 598*9c5db199SXin Li logging.debug('Failed to load the side effects config in %s.', 599*9c5db199SXin Li dir_entry) 600*9c5db199SXin Li try: 601*9c5db199SXin Li sanitize_dir(dir_entry) 602*9c5db199SXin Li if LIMIT_FILE_COUNT: 603*9c5db199SXin Li limit_file_count(dir_entry) 604*9c5db199SXin Li 605*9c5db199SXin Li process = None 606*9c5db199SXin Li with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS): 607*9c5db199SXin Li gs_path = '%s%s' % (self._gs_uri, dest_path) 608*9c5db199SXin Li cmd = _get_cmd_list(self._multiprocessing, dir_entry, gs_path) 609*9c5db199SXin Li logging.debug('Attempting an offload command %s', cmd) 610*9c5db199SXin Li process = subprocess.Popen( 611*9c5db199SXin Li cmd, stdout=stdout_file, stderr=stderr_file) 612*9c5db199SXin Li process.wait() 613*9c5db199SXin Li logging.debug('Offload command %s completed; ' 614*9c5db199SXin Li 'marking offload complete.', cmd) 615*9c5db199SXin Li _mark_upload_finished(gs_path, stdout_file, stderr_file) 616*9c5db199SXin Li 617*9c5db199SXin Li _emit_gs_returncode_metric(process.returncode) 618*9c5db199SXin Li if process.returncode != 0: 619*9c5db199SXin Li raise error_obj 620*9c5db199SXin Li _emit_offload_metrics(dir_entry) 621*9c5db199SXin Li 622*9c5db199SXin Li if self._console_client: 623*9c5db199SXin Li gcs_uri = os.path.join(gs_path, 624*9c5db199SXin Li os.path.basename(dir_entry)) 625*9c5db199SXin Li if not self._console_client.send_test_job_offloaded_message( 626*9c5db199SXin Li gcs_uri): 627*9c5db199SXin Li raise error_obj 628*9c5db199SXin Li 629*9c5db199SXin Li _mark_uploaded(dir_entry) 630*9c5db199SXin Li except timeout_util.TimeoutError: 631*9c5db199SXin Li m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count' 632*9c5db199SXin Li metrics.Counter(m_timeout).increment(fields=metrics_fields) 633*9c5db199SXin Li # If we finished the call to Popen(), we may need to 634*9c5db199SXin Li # terminate the child process. We don't bother calling 635*9c5db199SXin Li # process.poll(); that inherently races because the child 636*9c5db199SXin Li # can die any time it wants. 637*9c5db199SXin Li if process: 638*9c5db199SXin Li try: 639*9c5db199SXin Li process.terminate() 640*9c5db199SXin Li except OSError: 641*9c5db199SXin Li # We don't expect any error other than "No such 642*9c5db199SXin Li # process". 643*9c5db199SXin Li pass 644*9c5db199SXin Li logging.error('Offloading %s timed out after waiting %d ' 645*9c5db199SXin Li 'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS) 646*9c5db199SXin Li raise error_obj 647*9c5db199SXin Li 648*9c5db199SXin Li def _prune(self, dir_entry, job_complete_time): 649*9c5db199SXin Li """Prune directory if it is uploaded and expired. 650*9c5db199SXin Li 651*9c5db199SXin Li @param dir_entry: Directory entry to offload. 652*9c5db199SXin Li @param job_complete_time: The complete time of the job from the AFE 653*9c5db199SXin Li database. 654*9c5db199SXin Li """ 655*9c5db199SXin Li if not (_is_uploaded(dir_entry) 656*9c5db199SXin Li and job_directories.is_job_expired(self._delete_age, 657*9c5db199SXin Li job_complete_time)): 658*9c5db199SXin Li return 659*9c5db199SXin Li try: 660*9c5db199SXin Li logging.debug('Pruning uploaded directory %s', dir_entry) 661*9c5db199SXin Li shutil.rmtree(dir_entry) 662*9c5db199SXin Li job_timestamp_cache.delete(dir_entry) 663*9c5db199SXin Li except OSError as e: 664*9c5db199SXin Li # The wrong file permission can lead call `shutil.rmtree(dir_entry)` 665*9c5db199SXin Li # to raise OSError with message 'Permission denied'. Details can be 666*9c5db199SXin Li # found in crbug.com/536151 667*9c5db199SXin Li _handle_dir_os_error(dir_entry, e.errno==errno.EACCES) 668*9c5db199SXin Li # Try again after the permission issue is fixed. 669*9c5db199SXin Li shutil.rmtree(dir_entry) 670*9c5db199SXin Li 671*9c5db199SXin Li def _prune_swarming_req_dir(self, swarming_req_dir): 672*9c5db199SXin Li """Prune swarming request directory, if it is empty. 673*9c5db199SXin Li 674*9c5db199SXin Li @param swarming_req_dir: Directory entry of a swarming request. 675*9c5db199SXin Li """ 676*9c5db199SXin Li try: 677*9c5db199SXin Li logging.debug('Pruning swarming request directory %s', 678*9c5db199SXin Li swarming_req_dir) 679*9c5db199SXin Li os.rmdir(swarming_req_dir) 680*9c5db199SXin Li except OSError as e: 681*9c5db199SXin Li # Do nothing and leave this directory to next attempt to remove. 682*9c5db199SXin Li logging.debug('Failed to prune swarming request directory %s', 683*9c5db199SXin Li swarming_req_dir) 684*9c5db199SXin Li 685*9c5db199SXin Li 686*9c5db199SXin Liclass _OffloadError(Exception): 687*9c5db199SXin Li """Google Storage offload failed.""" 688*9c5db199SXin Li 689*9c5db199SXin Li def __init__(self, start_time): 690*9c5db199SXin Li super(_OffloadError, self).__init__(start_time) 691*9c5db199SXin Li self.start_time = start_time 692*9c5db199SXin Li 693*9c5db199SXin Li 694*9c5db199SXin Li 695*9c5db199SXin Liclass FakeGSOffloader(BaseGSOffloader): 696*9c5db199SXin Li 697*9c5db199SXin Li """Fake Google Storage Offloader that only deletes directories.""" 698*9c5db199SXin Li 699*9c5db199SXin Li def _full_offload(self, dir_entry, dest_path, job_complete_time): 700*9c5db199SXin Li """Pretend to offload a directory and delete it. 701*9c5db199SXin Li 702*9c5db199SXin Li @param dir_entry: Directory entry to offload. 703*9c5db199SXin Li @param dest_path: Location in google storage where we will 704*9c5db199SXin Li offload the directory. 705*9c5db199SXin Li @param job_complete_time: The complete time of the job from the AFE 706*9c5db199SXin Li database. 707*9c5db199SXin Li """ 708*9c5db199SXin Li shutil.rmtree(dir_entry) 709*9c5db199SXin Li 710*9c5db199SXin Li 711*9c5db199SXin Liclass OptionalMemoryCache(object): 712*9c5db199SXin Li """Implements memory cache if cachetools module can be loaded. 713*9c5db199SXin Li 714*9c5db199SXin Li If the platform has cachetools available then the cache will 715*9c5db199SXin Li be created, otherwise the get calls will always act as if there 716*9c5db199SXin Li was a cache miss and the set/delete will be no-ops. 717*9c5db199SXin Li """ 718*9c5db199SXin Li cache = None 719*9c5db199SXin Li 720*9c5db199SXin Li def setup(self, age_to_delete): 721*9c5db199SXin Li """Set up a TTL cache size based on how long the job will be handled. 722*9c5db199SXin Li 723*9c5db199SXin Li Autotest jobs are handled by gs_offloader until they are deleted from 724*9c5db199SXin Li local storage, base the cache size on how long that is. 725*9c5db199SXin Li 726*9c5db199SXin Li @param age_to_delete: Number of days after which items in the cache 727*9c5db199SXin Li should expire. 728*9c5db199SXin Li """ 729*9c5db199SXin Li if cachetools: 730*9c5db199SXin Li # Min cache is 1000 items for 10 mins. If the age to delete is 0 731*9c5db199SXin Li # days you still want a short / small cache. 732*9c5db199SXin Li # 2000 items is a good approximation for the max number of jobs a 733*9c5db199SXin Li # moblab # can produce in a day, lab offloads immediatly so 734*9c5db199SXin Li # the number of carried jobs should be very small in the normal 735*9c5db199SXin Li # case. 736*9c5db199SXin Li ttl = max(age_to_delete * 24 * 60 * 60, 600) 737*9c5db199SXin Li maxsize = max(age_to_delete * 2000, 1000) 738*9c5db199SXin Li job_timestamp_cache.cache = cachetools.TTLCache(maxsize=maxsize, 739*9c5db199SXin Li ttl=ttl) 740*9c5db199SXin Li 741*9c5db199SXin Li def get(self, key): 742*9c5db199SXin Li """If we have a cache try to retrieve from it.""" 743*9c5db199SXin Li if self.cache is not None: 744*9c5db199SXin Li result = self.cache.get(key) 745*9c5db199SXin Li return result 746*9c5db199SXin Li return None 747*9c5db199SXin Li 748*9c5db199SXin Li def add(self, key, value): 749*9c5db199SXin Li """If we have a cache try to store key/value.""" 750*9c5db199SXin Li if self.cache is not None: 751*9c5db199SXin Li self.cache[key] = value 752*9c5db199SXin Li 753*9c5db199SXin Li def delete(self, key): 754*9c5db199SXin Li """If we have a cache try to remove a key.""" 755*9c5db199SXin Li if self.cache is not None: 756*9c5db199SXin Li return self.cache.delete(key) 757*9c5db199SXin Li 758*9c5db199SXin Li 759*9c5db199SXin Lijob_timestamp_cache = OptionalMemoryCache() 760*9c5db199SXin Li 761*9c5db199SXin Li 762*9c5db199SXin Lidef _cached_get_timestamp_if_finished(job): 763*9c5db199SXin Li """Retrieve a job finished timestamp from cache or AFE. 764*9c5db199SXin Li @param job _JobDirectory instance to retrieve 765*9c5db199SXin Li finished timestamp of.. 766*9c5db199SXin Li 767*9c5db199SXin Li @returns: None if the job is not finished, or the 768*9c5db199SXin Li last job finished time recorded by Autotest. 769*9c5db199SXin Li """ 770*9c5db199SXin Li job_timestamp = job_timestamp_cache.get(job.dirname) 771*9c5db199SXin Li if not job_timestamp: 772*9c5db199SXin Li job_timestamp = job.get_timestamp_if_finished() 773*9c5db199SXin Li if job_timestamp: 774*9c5db199SXin Li job_timestamp_cache.add(job.dirname, job_timestamp) 775*9c5db199SXin Li return job_timestamp 776*9c5db199SXin Li 777*9c5db199SXin Li 778*9c5db199SXin Lidef _is_expired(job, age_limit): 779*9c5db199SXin Li """Return whether job directory is expired for uploading 780*9c5db199SXin Li 781*9c5db199SXin Li @param job: _JobDirectory instance. 782*9c5db199SXin Li @param age_limit: Minimum age in days at which a job may be offloaded. 783*9c5db199SXin Li """ 784*9c5db199SXin Li job_timestamp = _cached_get_timestamp_if_finished(job) 785*9c5db199SXin Li if not job_timestamp: 786*9c5db199SXin Li return False 787*9c5db199SXin Li return job_directories.is_job_expired(age_limit, job_timestamp) 788*9c5db199SXin Li 789*9c5db199SXin Li 790*9c5db199SXin Lidef _emit_offload_metrics(dirpath): 791*9c5db199SXin Li """Emit gs offload metrics. 792*9c5db199SXin Li 793*9c5db199SXin Li @param dirpath: Offloaded directory path. 794*9c5db199SXin Li """ 795*9c5db199SXin Li dir_size = file_utils.get_directory_size_kibibytes(dirpath) 796*9c5db199SXin Li metrics_fields = _get_metrics_fields(dirpath) 797*9c5db199SXin Li 798*9c5db199SXin Li m_offload_count = ( 799*9c5db199SXin Li 'chromeos/autotest/gs_offloader/jobs_offloaded') 800*9c5db199SXin Li metrics.Counter(m_offload_count).increment( 801*9c5db199SXin Li fields=metrics_fields) 802*9c5db199SXin Li m_offload_size = ('chromeos/autotest/gs_offloader/' 803*9c5db199SXin Li 'kilobytes_transferred') 804*9c5db199SXin Li metrics.Counter(m_offload_size).increment_by( 805*9c5db199SXin Li dir_size, fields=metrics_fields) 806*9c5db199SXin Li 807*9c5db199SXin Li 808*9c5db199SXin Lidef _is_uploaded(dirpath): 809*9c5db199SXin Li """Return whether directory has been uploaded. 810*9c5db199SXin Li 811*9c5db199SXin Li @param dirpath: Directory path string. 812*9c5db199SXin Li """ 813*9c5db199SXin Li return os.path.isfile(_get_uploaded_marker_file(dirpath)) 814*9c5db199SXin Li 815*9c5db199SXin Li 816*9c5db199SXin Lidef _mark_uploaded(dirpath): 817*9c5db199SXin Li """Mark directory as uploaded. 818*9c5db199SXin Li 819*9c5db199SXin Li @param dirpath: Directory path string. 820*9c5db199SXin Li """ 821*9c5db199SXin Li logging.debug('Creating uploaded marker for directory %s', dirpath) 822*9c5db199SXin Li with open(_get_uploaded_marker_file(dirpath), 'a'): 823*9c5db199SXin Li pass 824*9c5db199SXin Li 825*9c5db199SXin Li 826*9c5db199SXin Lidef _mark_upload_finished(gs_path, stdout_file, stderr_file): 827*9c5db199SXin Li """Mark a given gs_path upload as finished (remotely). 828*9c5db199SXin Li 829*9c5db199SXin Li @param gs_path: gs:// url of the remote directory that is finished 830*9c5db199SXin Li upload. 831*9c5db199SXin Li """ 832*9c5db199SXin Li cmd = _get_finish_cmd_list(gs_path) 833*9c5db199SXin Li process = subprocess.Popen(cmd, stdout=stdout_file, stderr=stderr_file) 834*9c5db199SXin Li process.wait() 835*9c5db199SXin Li logging.debug('Finished marking as complete %s', cmd) 836*9c5db199SXin Li 837*9c5db199SXin Li 838*9c5db199SXin Lidef _get_uploaded_marker_file(dirpath): 839*9c5db199SXin Li """Return path to upload marker file for directory. 840*9c5db199SXin Li 841*9c5db199SXin Li @param dirpath: Directory path string. 842*9c5db199SXin Li """ 843*9c5db199SXin Li return '%s/.GS_UPLOADED' % (dirpath,) 844*9c5db199SXin Li 845*9c5db199SXin Li 846*9c5db199SXin Lidef _format_job_for_failure_reporting(job): 847*9c5db199SXin Li """Formats a _JobDirectory for reporting / logging. 848*9c5db199SXin Li 849*9c5db199SXin Li @param job: The _JobDirectory to format. 850*9c5db199SXin Li """ 851*9c5db199SXin Li d = datetime.datetime.fromtimestamp(job.first_offload_start) 852*9c5db199SXin Li data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT), 853*9c5db199SXin Li job.offload_count, 854*9c5db199SXin Li job.dirname) 855*9c5db199SXin Li return FAILED_OFFLOADS_LINE_FORMAT % data 856*9c5db199SXin Li 857*9c5db199SXin Li 858*9c5db199SXin Lidef wait_for_gs_write_access(gs_uri): 859*9c5db199SXin Li """Verify and wait until we have write access to Google Storage. 860*9c5db199SXin Li 861*9c5db199SXin Li @param gs_uri: The Google Storage URI we are trying to offload to. 862*9c5db199SXin Li """ 863*9c5db199SXin Li # TODO (sbasi) Try to use the gsutil command to check write access. 864*9c5db199SXin Li # Ensure we have write access to gs_uri. 865*9c5db199SXin Li dummy_file = tempfile.NamedTemporaryFile() 866*9c5db199SXin Li test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri) 867*9c5db199SXin Li while True: 868*9c5db199SXin Li logging.debug('Checking for write access with dummy file %s', 869*9c5db199SXin Li dummy_file.name) 870*9c5db199SXin Li try: 871*9c5db199SXin Li subprocess.check_call(test_cmd) 872*9c5db199SXin Li subprocess.check_call( 873*9c5db199SXin Li ['gsutil', 'rm', 874*9c5db199SXin Li os.path.join(gs_uri, 875*9c5db199SXin Li os.path.basename(dummy_file.name))]) 876*9c5db199SXin Li break 877*9c5db199SXin Li except subprocess.CalledProcessError: 878*9c5db199SXin Li t = 120 879*9c5db199SXin Li logging.debug('Unable to offload dummy file to %s, sleeping for %s ' 880*9c5db199SXin Li 'seconds.', gs_uri, t) 881*9c5db199SXin Li time.sleep(t) 882*9c5db199SXin Li logging.debug('Dummy file write check to gs succeeded.') 883*9c5db199SXin Li 884*9c5db199SXin Li 885*9c5db199SXin Liclass Offloader(object): 886*9c5db199SXin Li """State of the offload process. 887*9c5db199SXin Li 888*9c5db199SXin Li Contains the following member fields: 889*9c5db199SXin Li * _gs_offloader: _BaseGSOffloader to use to offload a job directory. 890*9c5db199SXin Li * _jobdir_classes: List of classes of job directory to be 891*9c5db199SXin Li offloaded. 892*9c5db199SXin Li * _processes: Maximum number of outstanding offload processes 893*9c5db199SXin Li to allow during an offload cycle. 894*9c5db199SXin Li * _age_limit: Minimum age in days at which a job may be 895*9c5db199SXin Li offloaded. 896*9c5db199SXin Li * _open_jobs: a dictionary mapping directory paths to Job 897*9c5db199SXin Li objects. 898*9c5db199SXin Li """ 899*9c5db199SXin Li 900*9c5db199SXin Li def __init__(self, options): 901*9c5db199SXin Li self._upload_age_limit = options.age_to_upload 902*9c5db199SXin Li self._delete_age_limit = options.age_to_delete 903*9c5db199SXin Li if options.delete_only: 904*9c5db199SXin Li self._gs_offloader = FakeGSOffloader() 905*9c5db199SXin Li else: 906*9c5db199SXin Li self.gs_uri = utils.get_offload_gsuri() 907*9c5db199SXin Li logging.debug('Offloading to: %s', self.gs_uri) 908*9c5db199SXin Li multiprocessing = False 909*9c5db199SXin Li if options.multiprocessing: 910*9c5db199SXin Li multiprocessing = True 911*9c5db199SXin Li elif options.multiprocessing is None: 912*9c5db199SXin Li multiprocessing = GS_OFFLOADER_MULTIPROCESSING 913*9c5db199SXin Li logging.info( 914*9c5db199SXin Li 'Offloader multiprocessing is set to:%r', multiprocessing) 915*9c5db199SXin Li console_client = None 916*9c5db199SXin Li if (cloud_console_client and 917*9c5db199SXin Li cloud_console_client.is_cloud_notification_enabled()): 918*9c5db199SXin Li console_client = cloud_console_client.PubSubBasedClient() 919*9c5db199SXin Li self._gs_offloader = GSOffloader( 920*9c5db199SXin Li self.gs_uri, multiprocessing, self._delete_age_limit, 921*9c5db199SXin Li console_client) 922*9c5db199SXin Li classlist = [ 923*9c5db199SXin Li job_directories.SwarmingJobDirectory, 924*9c5db199SXin Li ] 925*9c5db199SXin Li if options.process_hosts_only or options.process_all: 926*9c5db199SXin Li classlist.append(job_directories.SpecialJobDirectory) 927*9c5db199SXin Li if not options.process_hosts_only: 928*9c5db199SXin Li classlist.append(job_directories.RegularJobDirectory) 929*9c5db199SXin Li self._jobdir_classes = classlist 930*9c5db199SXin Li assert self._jobdir_classes 931*9c5db199SXin Li self._processes = options.parallelism 932*9c5db199SXin Li self._open_jobs = {} 933*9c5db199SXin Li self._pusub_topic = None 934*9c5db199SXin Li self._offload_count_limit = 3 935*9c5db199SXin Li 936*9c5db199SXin Li 937*9c5db199SXin Li def _add_new_jobs(self): 938*9c5db199SXin Li """Find new job directories that need offloading. 939*9c5db199SXin Li 940*9c5db199SXin Li Go through the file system looking for valid job directories 941*9c5db199SXin Li that are currently not in `self._open_jobs`, and add them in. 942*9c5db199SXin Li 943*9c5db199SXin Li """ 944*9c5db199SXin Li new_job_count = 0 945*9c5db199SXin Li for cls in self._jobdir_classes: 946*9c5db199SXin Li for resultsdir in cls.get_job_directories(): 947*9c5db199SXin Li if resultsdir in self._open_jobs: 948*9c5db199SXin Li continue 949*9c5db199SXin Li self._open_jobs[resultsdir] = cls(resultsdir) 950*9c5db199SXin Li new_job_count += 1 951*9c5db199SXin Li logging.debug('Start of offload cycle - found %d new jobs', 952*9c5db199SXin Li new_job_count) 953*9c5db199SXin Li 954*9c5db199SXin Li 955*9c5db199SXin Li def _remove_offloaded_jobs(self): 956*9c5db199SXin Li """Removed offloaded jobs from `self._open_jobs`.""" 957*9c5db199SXin Li removed_job_count = 0 958*9c5db199SXin Li # must be list to make a copy of the dictionary to allow deletion 959*9c5db199SXin Li # during iterations. 960*9c5db199SXin Li for jobkey, job in list(six.iteritems(self._open_jobs)): 961*9c5db199SXin Li if ( 962*9c5db199SXin Li not os.path.exists(job.dirname) 963*9c5db199SXin Li or _is_uploaded(job.dirname)): 964*9c5db199SXin Li del self._open_jobs[jobkey] 965*9c5db199SXin Li removed_job_count += 1 966*9c5db199SXin Li logging.debug('End of offload cycle - cleared %d jobs, ' 967*9c5db199SXin Li 'carrying %d open jobs', 968*9c5db199SXin Li removed_job_count, len(self._open_jobs)) 969*9c5db199SXin Li 970*9c5db199SXin Li 971*9c5db199SXin Li def _report_failed_jobs(self): 972*9c5db199SXin Li """Report status after attempting offload. 973*9c5db199SXin Li 974*9c5db199SXin Li This function processes all jobs in `self._open_jobs`, assuming 975*9c5db199SXin Li an attempt has just been made to offload all of them. 976*9c5db199SXin Li 977*9c5db199SXin Li If any jobs have reportable errors, and we haven't generated 978*9c5db199SXin Li an e-mail report in the last `REPORT_INTERVAL_SECS` seconds, 979*9c5db199SXin Li send new e-mail describing the failures. 980*9c5db199SXin Li 981*9c5db199SXin Li """ 982*9c5db199SXin Li failed_jobs = [j for j in self._open_jobs.values() if 983*9c5db199SXin Li j.first_offload_start] 984*9c5db199SXin Li self._report_failed_jobs_count(failed_jobs) 985*9c5db199SXin Li self._log_failed_jobs_locally(failed_jobs) 986*9c5db199SXin Li 987*9c5db199SXin Li 988*9c5db199SXin Li def offload_once(self): 989*9c5db199SXin Li """Perform one offload cycle. 990*9c5db199SXin Li 991*9c5db199SXin Li Find all job directories for new jobs that we haven't seen 992*9c5db199SXin Li before. Then, attempt to offload the directories for any 993*9c5db199SXin Li jobs that have finished running. Offload of multiple jobs 994*9c5db199SXin Li is done in parallel, up to `self._processes` at a time. 995*9c5db199SXin Li 996*9c5db199SXin Li After we've tried uploading all directories, go through the list 997*9c5db199SXin Li checking the status of all uploaded directories. If necessary, 998*9c5db199SXin Li report failures via e-mail. 999*9c5db199SXin Li 1000*9c5db199SXin Li """ 1001*9c5db199SXin Li self._add_new_jobs() 1002*9c5db199SXin Li self._report_current_jobs_count() 1003*9c5db199SXin Li with parallel.BackgroundTaskRunner( 1004*9c5db199SXin Li self._gs_offloader.offload, processes=self._processes) as queue: 1005*9c5db199SXin Li for job in self._open_jobs.values(): 1006*9c5db199SXin Li _enqueue_offload(job, queue, self._upload_age_limit) 1007*9c5db199SXin Li self._give_up_on_jobs_over_limit() 1008*9c5db199SXin Li self._remove_offloaded_jobs() 1009*9c5db199SXin Li self._report_failed_jobs() 1010*9c5db199SXin Li 1011*9c5db199SXin Li 1012*9c5db199SXin Li def _give_up_on_jobs_over_limit(self): 1013*9c5db199SXin Li """Give up on jobs that have gone over the offload limit. 1014*9c5db199SXin Li 1015*9c5db199SXin Li We mark them as uploaded as we won't try to offload them any more. 1016*9c5db199SXin Li """ 1017*9c5db199SXin Li for job in self._open_jobs.values(): 1018*9c5db199SXin Li if job.offload_count >= self._offload_count_limit: 1019*9c5db199SXin Li _mark_uploaded(job.dirname) 1020*9c5db199SXin Li 1021*9c5db199SXin Li 1022*9c5db199SXin Li def _log_failed_jobs_locally(self, failed_jobs, 1023*9c5db199SXin Li log_file=FAILED_OFFLOADS_FILE): 1024*9c5db199SXin Li """Updates a local file listing all the failed jobs. 1025*9c5db199SXin Li 1026*9c5db199SXin Li The dropped file can be used by the developers to list jobs that we have 1027*9c5db199SXin Li failed to upload. 1028*9c5db199SXin Li 1029*9c5db199SXin Li @param failed_jobs: A list of failed _JobDirectory objects. 1030*9c5db199SXin Li @param log_file: The file to log the failed jobs to. 1031*9c5db199SXin Li """ 1032*9c5db199SXin Li now = datetime.datetime.now() 1033*9c5db199SXin Li now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT) 1034*9c5db199SXin Li formatted_jobs = [_format_job_for_failure_reporting(job) 1035*9c5db199SXin Li for job in failed_jobs] 1036*9c5db199SXin Li formatted_jobs.sort() 1037*9c5db199SXin Li 1038*9c5db199SXin Li with open(log_file, 'w') as logfile: 1039*9c5db199SXin Li logfile.write(FAILED_OFFLOADS_FILE_HEADER % 1040*9c5db199SXin Li (now_str, len(failed_jobs))) 1041*9c5db199SXin Li logfile.writelines(formatted_jobs) 1042*9c5db199SXin Li 1043*9c5db199SXin Li 1044*9c5db199SXin Li def _report_current_jobs_count(self): 1045*9c5db199SXin Li """Report the number of outstanding jobs to monarch.""" 1046*9c5db199SXin Li metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set( 1047*9c5db199SXin Li len(self._open_jobs)) 1048*9c5db199SXin Li 1049*9c5db199SXin Li 1050*9c5db199SXin Li def _report_failed_jobs_count(self, failed_jobs): 1051*9c5db199SXin Li """Report the number of outstanding failed offload jobs to monarch. 1052*9c5db199SXin Li 1053*9c5db199SXin Li @param: List of failed jobs. 1054*9c5db199SXin Li """ 1055*9c5db199SXin Li metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set( 1056*9c5db199SXin Li len(failed_jobs)) 1057*9c5db199SXin Li 1058*9c5db199SXin Li 1059*9c5db199SXin Lidef _enqueue_offload(job, queue, age_limit): 1060*9c5db199SXin Li """Enqueue the job for offload, if it's eligible. 1061*9c5db199SXin Li 1062*9c5db199SXin Li The job is eligible for offloading if the database has marked 1063*9c5db199SXin Li it finished, and the job is older than the `age_limit` 1064*9c5db199SXin Li parameter. 1065*9c5db199SXin Li 1066*9c5db199SXin Li If the job is eligible, offload processing is requested by 1067*9c5db199SXin Li passing the `queue` parameter's `put()` method a sequence with 1068*9c5db199SXin Li the job's `dirname` attribute and its directory name. 1069*9c5db199SXin Li 1070*9c5db199SXin Li @param job _JobDirectory instance to offload. 1071*9c5db199SXin Li @param queue If the job should be offloaded, put the offload 1072*9c5db199SXin Li parameters into this queue for processing. 1073*9c5db199SXin Li @param age_limit Minimum age for a job to be offloaded. A value 1074*9c5db199SXin Li of 0 means that the job will be offloaded as 1075*9c5db199SXin Li soon as it is finished. 1076*9c5db199SXin Li 1077*9c5db199SXin Li """ 1078*9c5db199SXin Li if not job.offload_count: 1079*9c5db199SXin Li if not _is_expired(job, age_limit): 1080*9c5db199SXin Li return 1081*9c5db199SXin Li job.first_offload_start = time.time() 1082*9c5db199SXin Li job.offload_count += 1 1083*9c5db199SXin Li if job.process_gs_instructions(): 1084*9c5db199SXin Li timestamp = _cached_get_timestamp_if_finished(job) 1085*9c5db199SXin Li queue.put([job.dirname, os.path.dirname(job.dirname), timestamp]) 1086*9c5db199SXin Li 1087*9c5db199SXin Li 1088*9c5db199SXin Lidef parse_options(): 1089*9c5db199SXin Li """Parse the args passed into gs_offloader.""" 1090*9c5db199SXin Li defaults = 'Defaults:\n Destination: %s\n Results Path: %s' % ( 1091*9c5db199SXin Li utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR) 1092*9c5db199SXin Li usage = 'usage: %prog [options]\n' + defaults 1093*9c5db199SXin Li parser = OptionParser(usage) 1094*9c5db199SXin Li parser.add_option('-a', '--all', dest='process_all', 1095*9c5db199SXin Li action='store_true', 1096*9c5db199SXin Li help='Offload all files in the results directory.') 1097*9c5db199SXin Li parser.add_option('-s', '--hosts', dest='process_hosts_only', 1098*9c5db199SXin Li action='store_true', 1099*9c5db199SXin Li help='Offload only the special tasks result files ' 1100*9c5db199SXin Li 'located in the results/hosts subdirectory') 1101*9c5db199SXin Li parser.add_option('-p', '--parallelism', dest='parallelism', 1102*9c5db199SXin Li type='int', default=1, 1103*9c5db199SXin Li help='Number of parallel workers to use.') 1104*9c5db199SXin Li parser.add_option('-o', '--delete_only', dest='delete_only', 1105*9c5db199SXin Li action='store_true', 1106*9c5db199SXin Li help='GS Offloader will only the delete the ' 1107*9c5db199SXin Li 'directories and will not offload them to google ' 1108*9c5db199SXin Li 'storage. NOTE: If global_config variable ' 1109*9c5db199SXin Li 'CROS.gs_offloading_enabled is False, --delete_only ' 1110*9c5db199SXin Li 'is automatically True.', 1111*9c5db199SXin Li default=not GS_OFFLOADING_ENABLED) 1112*9c5db199SXin Li parser.add_option('-d', '--days_old', dest='days_old', 1113*9c5db199SXin Li help='Minimum job age in days before a result can be ' 1114*9c5db199SXin Li 'offloaded.', type='int', default=0) 1115*9c5db199SXin Li parser.add_option('-l', '--log_size', dest='log_size', 1116*9c5db199SXin Li help='Limit the offloader logs to a specified ' 1117*9c5db199SXin Li 'number of Mega Bytes.', type='int', default=0) 1118*9c5db199SXin Li parser.add_option('-m', dest='multiprocessing', action='store_true', 1119*9c5db199SXin Li help='Turn on -m option for gsutil. If not set, the ' 1120*9c5db199SXin Li 'global config setting gs_offloader_multiprocessing ' 1121*9c5db199SXin Li 'under CROS section is applied.') 1122*9c5db199SXin Li parser.add_option('-i', '--offload_once', dest='offload_once', 1123*9c5db199SXin Li action='store_true', 1124*9c5db199SXin Li help='Upload all available results and then exit.') 1125*9c5db199SXin Li parser.add_option('-y', '--normal_priority', dest='normal_priority', 1126*9c5db199SXin Li action='store_true', 1127*9c5db199SXin Li help='Upload using normal process priority.') 1128*9c5db199SXin Li parser.add_option('-u', '--age_to_upload', dest='age_to_upload', 1129*9c5db199SXin Li help='Minimum job age in days before a result can be ' 1130*9c5db199SXin Li 'offloaded, but not removed from local storage', 1131*9c5db199SXin Li type='int', default=None) 1132*9c5db199SXin Li parser.add_option('-n', '--age_to_delete', dest='age_to_delete', 1133*9c5db199SXin Li help='Minimum job age in days before a result can be ' 1134*9c5db199SXin Li 'removed from local storage', 1135*9c5db199SXin Li type='int', default=None) 1136*9c5db199SXin Li parser.add_option( 1137*9c5db199SXin Li '--metrics-file', 1138*9c5db199SXin Li help='If provided, drop metrics to this local file instead of ' 1139*9c5db199SXin Li 'reporting to ts_mon', 1140*9c5db199SXin Li type=str, 1141*9c5db199SXin Li default=None, 1142*9c5db199SXin Li ) 1143*9c5db199SXin Li parser.add_option('-t', '--enable_timestamp_cache', 1144*9c5db199SXin Li dest='enable_timestamp_cache', 1145*9c5db199SXin Li action='store_true', 1146*9c5db199SXin Li help='Cache the finished timestamps from AFE.') 1147*9c5db199SXin Li 1148*9c5db199SXin Li options = parser.parse_args()[0] 1149*9c5db199SXin Li if options.process_all and options.process_hosts_only: 1150*9c5db199SXin Li parser.print_help() 1151*9c5db199SXin Li print ('Cannot process all files and only the hosts ' 1152*9c5db199SXin Li 'subdirectory. Please remove an argument.') 1153*9c5db199SXin Li sys.exit(1) 1154*9c5db199SXin Li 1155*9c5db199SXin Li if options.days_old and (options.age_to_upload or options.age_to_delete): 1156*9c5db199SXin Li parser.print_help() 1157*9c5db199SXin Li print('Use the days_old option or the age_to_* options but not both') 1158*9c5db199SXin Li sys.exit(1) 1159*9c5db199SXin Li 1160*9c5db199SXin Li if options.age_to_upload == None: 1161*9c5db199SXin Li options.age_to_upload = options.days_old 1162*9c5db199SXin Li if options.age_to_delete == None: 1163*9c5db199SXin Li options.age_to_delete = options.days_old 1164*9c5db199SXin Li 1165*9c5db199SXin Li return options 1166*9c5db199SXin Li 1167*9c5db199SXin Li 1168*9c5db199SXin Lidef main(): 1169*9c5db199SXin Li """Main method of gs_offloader.""" 1170*9c5db199SXin Li options = parse_options() 1171*9c5db199SXin Li 1172*9c5db199SXin Li if options.process_all: 1173*9c5db199SXin Li offloader_type = 'all' 1174*9c5db199SXin Li elif options.process_hosts_only: 1175*9c5db199SXin Li offloader_type = 'hosts' 1176*9c5db199SXin Li else: 1177*9c5db199SXin Li offloader_type = 'jobs' 1178*9c5db199SXin Li 1179*9c5db199SXin Li _setup_logging(options, offloader_type) 1180*9c5db199SXin Li 1181*9c5db199SXin Li if options.enable_timestamp_cache: 1182*9c5db199SXin Li # Extend the cache expiry time by another 1% so the timstamps 1183*9c5db199SXin Li # are available as the results are purged. 1184*9c5db199SXin Li job_timestamp_cache.setup(options.age_to_delete * 1.01) 1185*9c5db199SXin Li 1186*9c5db199SXin Li # Nice our process (carried to subprocesses) so we don't overload 1187*9c5db199SXin Li # the system. 1188*9c5db199SXin Li if not options.normal_priority: 1189*9c5db199SXin Li logging.debug('Set process to nice value: %d', NICENESS) 1190*9c5db199SXin Li os.nice(NICENESS) 1191*9c5db199SXin Li if psutil: 1192*9c5db199SXin Li proc = psutil.Process() 1193*9c5db199SXin Li logging.debug('Set process to ionice IDLE') 1194*9c5db199SXin Li proc.ionice(psutil.IOPRIO_CLASS_IDLE) 1195*9c5db199SXin Li 1196*9c5db199SXin Li # os.listdir returns relative paths, so change to where we need to 1197*9c5db199SXin Li # be to avoid an os.path.join on each loop. 1198*9c5db199SXin Li logging.debug('Offloading Autotest results in %s', RESULTS_DIR) 1199*9c5db199SXin Li os.chdir(RESULTS_DIR) 1200*9c5db199SXin Li 1201*9c5db199SXin Li service_name = 'gs_offloader(%s)' % offloader_type 1202*9c5db199SXin Li with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True, 1203*9c5db199SXin Li short_lived=False, 1204*9c5db199SXin Li debug_file=options.metrics_file): 1205*9c5db199SXin Li with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'): 1206*9c5db199SXin Li offloader = Offloader(options) 1207*9c5db199SXin Li if not options.delete_only: 1208*9c5db199SXin Li wait_for_gs_write_access(offloader.gs_uri) 1209*9c5db199SXin Li while True: 1210*9c5db199SXin Li offloader.offload_once() 1211*9c5db199SXin Li if options.offload_once: 1212*9c5db199SXin Li break 1213*9c5db199SXin Li time.sleep(SLEEP_TIME_SECS) 1214*9c5db199SXin Li 1215*9c5db199SXin Li 1216*9c5db199SXin Li_LOG_LOCATION = '/usr/local/autotest/logs/' 1217*9c5db199SXin Li_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt' 1218*9c5db199SXin Li_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S' 1219*9c5db199SXin Li_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' 1220*9c5db199SXin Li 1221*9c5db199SXin Li 1222*9c5db199SXin Lidef _setup_logging(options, offloader_type): 1223*9c5db199SXin Li """Set up logging. 1224*9c5db199SXin Li 1225*9c5db199SXin Li @param options: Parsed options. 1226*9c5db199SXin Li @param offloader_type: Type of offloader action as string. 1227*9c5db199SXin Li """ 1228*9c5db199SXin Li log_filename = _get_log_filename(options, offloader_type) 1229*9c5db199SXin Li log_formatter = logging.Formatter(_LOGGING_FORMAT) 1230*9c5db199SXin Li # Replace the default logging handler with a RotatingFileHandler. If 1231*9c5db199SXin Li # options.log_size is 0, the file size will not be limited. Keeps 1232*9c5db199SXin Li # one backup just in case. 1233*9c5db199SXin Li handler = logging.handlers.RotatingFileHandler( 1234*9c5db199SXin Li log_filename, maxBytes=1024 * options.log_size, backupCount=1) 1235*9c5db199SXin Li handler.setFormatter(log_formatter) 1236*9c5db199SXin Li logger = logging.getLogger() 1237*9c5db199SXin Li logger.setLevel(logging.DEBUG) 1238*9c5db199SXin Li logger.addHandler(handler) 1239*9c5db199SXin Li 1240*9c5db199SXin Li 1241*9c5db199SXin Lidef _get_log_filename(options, offloader_type): 1242*9c5db199SXin Li """Get log filename. 1243*9c5db199SXin Li 1244*9c5db199SXin Li @param options: Parsed options. 1245*9c5db199SXin Li @param offloader_type: Type of offloader action as string. 1246*9c5db199SXin Li """ 1247*9c5db199SXin Li if options.log_size > 0: 1248*9c5db199SXin Li log_timestamp = '' 1249*9c5db199SXin Li else: 1250*9c5db199SXin Li log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT) 1251*9c5db199SXin Li log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp) 1252*9c5db199SXin Li return os.path.join(_LOG_LOCATION, log_basename) 1253*9c5db199SXin Li 1254*9c5db199SXin Li 1255*9c5db199SXin Liif __name__ == '__main__': 1256*9c5db199SXin Li main() 1257