1#!/usr/bin/python3 2# 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Script to archive old Autotest results to Google Storage. 8 9Uses gsutil to archive files to the configured Google Storage bucket. 10Upon successful copy, the local results directory is deleted. 11""" 12 13from __future__ import absolute_import 14from __future__ import division 15from __future__ import print_function 16 17import abc 18try: 19 import cachetools 20except ImportError: 21 cachetools = None 22import datetime 23import errno 24import glob 25import logging 26import logging.handlers 27import os 28import re 29import shutil 30import stat 31import subprocess 32import sys 33import tarfile 34import tempfile 35import time 36 37from optparse import OptionParser 38 39import common 40from autotest_lib.client.common_lib import file_utils 41from autotest_lib.client.common_lib import global_config 42from autotest_lib.client.common_lib import utils 43from autotest_lib.site_utils import job_directories 44# For unittest, the cloud_console.proto is not compiled yet. 45try: 46 from autotest_lib.site_utils import cloud_console_client 47except ImportError: 48 cloud_console_client = None 49from autotest_lib.tko import models 50from autotest_lib.utils import labellib 51from autotest_lib.utils import gslib 52from autotest_lib.utils.side_effects import config_loader 53from autotest_lib.utils.frozen_chromite.lib import timeout_util 54 55# Autotest requires the psutil module from site-packages, so it must be imported 56# after "import common". 57try: 58 # Does not exist, nor is needed, on moblab. 59 import psutil 60except ImportError: 61 psutil = None 62 63from autotest_lib.utils.frozen_chromite.lib import parallel 64import six 65try: 66 from autotest_lib.utils.frozen_chromite.lib import metrics 67 from autotest_lib.utils.frozen_chromite.lib import ts_mon_config 68except ImportError: 69 metrics = utils.metrics_mock 70 ts_mon_config = utils.metrics_mock 71 72 73GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value( 74 'CROS', 'gs_offloading_enabled', type=bool, default=True) 75 76# Nice setting for process, the higher the number the lower the priority. 77NICENESS = 10 78 79# Maximum number of seconds to allow for offloading a single 80# directory. 81OFFLOAD_TIMEOUT_SECS = 60 * 60 82 83# Sleep time per loop. 84SLEEP_TIME_SECS = 5 85 86# Minimum number of seconds between e-mail reports. 87REPORT_INTERVAL_SECS = 60 * 60 88 89# Location of Autotest results on disk. 90RESULTS_DIR = '/usr/local/autotest/results' 91FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS') 92 93FAILED_OFFLOADS_FILE_HEADER = ''' 94This is the list of gs_offloader failed jobs. 95Last offloader attempt at %s failed to offload %d files. 96Check http://go/cros-triage-gsoffloader to triage the issue 97 98 99First failure Count Directory name 100=================== ====== ============================== 101''' 102# --+----1----+---- ----+ ----+----1----+----2----+----3 103 104FAILED_OFFLOADS_LINE_FORMAT = '%19s %5d %-1s\n' 105FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' 106 107USE_RSYNC_ENABLED = global_config.global_config.get_config_value( 108 'CROS', 'gs_offloader_use_rsync', type=bool, default=False) 109 110LIMIT_FILE_COUNT = True 111 112# Use multiprocessing for gsutil uploading. 113GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value( 114 'CROS', 'gs_offloader_multiprocessing', type=bool, default=False) 115 116 117# metadata type 118GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success' 119GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure' 120 121# Autotest test to collect list of CTS tests 122TEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only' 123 124def _get_metrics_fields(dir_entry): 125 """Get metrics fields for the given test result directory, including board 126 and milestone. 127 128 @param dir_entry: Directory entry to offload. 129 @return A dictionary for the metrics data to be uploaded. 130 """ 131 fields = {'board': 'unknown', 132 'milestone': 'unknown'} 133 if dir_entry: 134 # There could be multiple hosts in the job directory, use the first one 135 # available. 136 for host in glob.glob(os.path.join(dir_entry, '*')): 137 try: 138 keyval = models.test.parse_job_keyval(host) 139 except ValueError: 140 continue 141 build = keyval.get('build') 142 if build: 143 try: 144 cros_version = labellib.parse_cros_version(build) 145 fields['board'] = cros_version.board 146 fields['milestone'] = cros_version.milestone 147 break 148 except ValueError: 149 # Ignore version parsing error so it won't crash 150 # gs_offloader. 151 pass 152 153 return fields 154 155 156def _get_cmd_list(multiprocessing, dir_entry, gs_path): 157 """Return the command to offload a specified directory. 158 159 @param multiprocessing: True to turn on -m option for gsutil. 160 @param dir_entry: Directory entry/path that which we need a cmd_list 161 to offload. 162 @param gs_path: Location in google storage where we will 163 offload the directory. 164 165 @return A command list to be executed by Popen. 166 """ 167 cmd = ['gsutil'] 168 if multiprocessing: 169 cmd.append('-m') 170 if USE_RSYNC_ENABLED: 171 cmd.append('rsync') 172 target = os.path.join(gs_path, os.path.basename(dir_entry)) 173 else: 174 cmd.append('cp') 175 target = gs_path 176 cmd += ['-eR', dir_entry, target] 177 return cmd 178 179 180def _get_finish_cmd_list(gs_path): 181 """Returns a command to remotely mark a given gs path as finished. 182 183 @param gs_path: Location in google storage where the offload directory 184 should be marked as finished. 185 186 @return A command list to be executed by Popen. 187 """ 188 target = os.path.join(gs_path, '.finished_offload') 189 return [ 190 'gsutil', 191 'cp', 192 '/dev/null', 193 target, 194 ] 195 196 197def sanitize_dir(dirpath): 198 """Sanitize directory for gs upload. 199 200 Symlinks and FIFOS are converted to regular files to fix bugs. 201 202 @param dirpath: Directory entry to be sanitized. 203 """ 204 if not os.path.exists(dirpath): 205 return 206 _escape_rename(dirpath) 207 _escape_rename_dir_contents(dirpath) 208 _sanitize_fifos(dirpath) 209 _sanitize_symlinks(dirpath) 210 211 212def _escape_rename_dir_contents(dirpath): 213 """Recursively rename directory to escape filenames for gs upload. 214 215 @param dirpath: Directory path string. 216 """ 217 for filename in os.listdir(dirpath): 218 path = os.path.join(dirpath, filename) 219 _escape_rename(path) 220 for filename in os.listdir(dirpath): 221 path = os.path.join(dirpath, filename) 222 if os.path.isdir(path): 223 _escape_rename_dir_contents(path) 224 225 226def _escape_rename(path): 227 """Rename file to escape filenames for gs upload. 228 229 @param path: File path string. 230 """ 231 dirpath, filename = os.path.split(path) 232 sanitized_filename = gslib.escape(filename) 233 sanitized_path = os.path.join(dirpath, sanitized_filename) 234 os.rename(path, sanitized_path) 235 236 237def _sanitize_fifos(dirpath): 238 """Convert fifos to regular files (fixes crbug.com/684122). 239 240 @param dirpath: Directory path string. 241 """ 242 for root, _, files in os.walk(dirpath): 243 for filename in files: 244 path = os.path.join(root, filename) 245 file_stat = os.lstat(path) 246 if stat.S_ISFIFO(file_stat.st_mode): 247 _replace_fifo_with_file(path) 248 249 250def _replace_fifo_with_file(path): 251 """Replace a fifo with a normal file. 252 253 @param path: Fifo path string. 254 """ 255 logging.debug('Removing fifo %s', path) 256 os.remove(path) 257 logging.debug('Creating fifo marker %s', path) 258 with open(path, 'w') as f: 259 f.write('<FIFO>') 260 261 262def _sanitize_symlinks(dirpath): 263 """Convert Symlinks to regular files (fixes crbug.com/692788). 264 265 @param dirpath: Directory path string. 266 """ 267 for root, _, files in os.walk(dirpath): 268 for filename in files: 269 path = os.path.join(root, filename) 270 file_stat = os.lstat(path) 271 if stat.S_ISLNK(file_stat.st_mode): 272 _replace_symlink_with_file(path) 273 274 275def _replace_symlink_with_file(path): 276 """Replace a symlink with a normal file. 277 278 @param path: Symlink path string. 279 """ 280 target = os.readlink(path) 281 logging.debug('Removing symlink %s', path) 282 os.remove(path) 283 logging.debug('Creating symlink marker %s', path) 284 with open(path, 'w') as f: 285 f.write('<symlink to %s>' % target) 286 287 288# Maximum number of files in the folder. 289_MAX_FILE_COUNT = 3000 290_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs'] 291 292 293def _get_zippable_folders(dir_entry): 294 folders_list = [] 295 for folder in os.listdir(dir_entry): 296 folder_path = os.path.join(dir_entry, folder) 297 if (not os.path.isfile(folder_path) and 298 not folder in _FOLDERS_NEVER_ZIP): 299 folders_list.append(folder_path) 300 return folders_list 301 302 303def limit_file_count(dir_entry): 304 """Limit the number of files in given directory. 305 306 The method checks the total number of files in the given directory. 307 If the number is greater than _MAX_FILE_COUNT, the method will 308 compress each folder in the given directory, except folders in 309 _FOLDERS_NEVER_ZIP. 310 311 @param dir_entry: Directory entry to be checked. 312 """ 313 try: 314 count = _count_files(dir_entry) 315 except ValueError: 316 logging.warning('Fail to get the file count in folder %s.', dir_entry) 317 return 318 if count < _MAX_FILE_COUNT: 319 return 320 321 # For test job, zip folders in a second level, e.g. 123-debug/host1. 322 # This is to allow autoserv debug folder still be accessible. 323 # For special task, it does not need to dig one level deeper. 324 is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN, 325 dir_entry) 326 327 folders = _get_zippable_folders(dir_entry) 328 if not is_special_task: 329 subfolders = [] 330 for folder in folders: 331 subfolders.extend(_get_zippable_folders(folder)) 332 folders = subfolders 333 334 for folder in folders: 335 _make_into_tarball(folder) 336 337 338def _count_files(dirpath): 339 """Count the number of files in a directory recursively. 340 341 @param dirpath: Directory path string. 342 """ 343 return sum(len(files) for _path, _dirs, files in os.walk(dirpath)) 344 345 346def _make_into_tarball(dirpath): 347 """Make directory into tarball. 348 349 @param dirpath: Directory path string. 350 """ 351 tarpath = '%s.tgz' % dirpath 352 with tarfile.open(tarpath, 'w:gz') as tar: 353 tar.add(dirpath, arcname=os.path.basename(dirpath)) 354 shutil.rmtree(dirpath) 355 356 357def correct_results_folder_permission(dir_entry): 358 """Make sure the results folder has the right permission settings. 359 360 For tests running with server-side packaging, the results folder has 361 the owner of root. This must be changed to the user running the 362 autoserv process, so parsing job can access the results folder. 363 364 @param dir_entry: Path to the results folder. 365 """ 366 if not dir_entry: 367 return 368 369 logging.info('Trying to correct file permission of %s.', dir_entry) 370 try: 371 owner = '%s:%s' % (os.getuid(), os.getgid()) 372 subprocess.check_call( 373 ['sudo', '-n', 'chown', '-R', owner, dir_entry]) 374 subprocess.check_call(['chmod', '-R', 'u+rw', dir_entry]) 375 subprocess.check_call( 376 ['find', dir_entry, '-type', 'd', 377 '-exec', 'chmod', 'u+x', '{}', ';']) 378 except subprocess.CalledProcessError as e: 379 logging.error('Failed to modify permission for %s: %s', 380 dir_entry, e) 381 382 383def _get_swarming_req_dir(path): 384 """ 385 Returns the parent directory of |path|, if |path| is a swarming task result. 386 387 @param path: Full path to the result of a task. 388 e.g. /tmp/results/swarming-44466815c4bc951/1 389 390 @return string of the parent dir or None if not a swarming task. 391 """ 392 m_parent = re.match( 393 '(?P<parent_dir>.*/swarming-[0-9a-fA-F]*0)/[1-9a-fA-F]$', path) 394 if m_parent: 395 return m_parent.group('parent_dir') 396 return None 397 398 399def _parse_cts_job_results_file_path(path): 400 """Parse CTS file paths an extract required information from them.""" 401 402 # Autotest paths look like: 403 # /317739475-chromeos-test/chromeos4-row9-rack11-host22/ 404 # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44 405 406 # Swarming paths look like: 407 # /swarming-458e3a3a7fc6f210/1/autoserv_test/ 408 # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44 409 410 folders = path.split(os.sep) 411 if 'swarming' in folders[1]: 412 # Swarming job and attempt combined 413 job_id = "%s-%s" % (folders[-7], folders[-6]) 414 else: 415 job_id = folders[-6] 416 417 cts_package = folders[-4] 418 timestamp = folders[-1] 419 420 return job_id, cts_package, timestamp 421 422 423def _emit_gs_returncode_metric(returncode): 424 """Increment the gs_returncode counter based on |returncode|.""" 425 m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode' 426 rcode = int(returncode) 427 if rcode < 0 or rcode > 255: 428 rcode = -1 429 metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode}) 430 431 432def _handle_dir_os_error(dir_entry, fix_permission=False): 433 """Try to fix the result directory's permission issue if needed. 434 435 @param dir_entry: Directory entry to offload. 436 @param fix_permission: True to change the directory's owner to the same one 437 running gs_offloader. 438 """ 439 if fix_permission: 440 correct_results_folder_permission(dir_entry) 441 m_permission_error = ('chromeos/autotest/errors/gs_offloader/' 442 'wrong_permissions_count') 443 metrics_fields = _get_metrics_fields(dir_entry) 444 metrics.Counter(m_permission_error).increment(fields=metrics_fields) 445 446 447class BaseGSOffloader(six.with_metaclass(abc.ABCMeta, object)): 448 449 """Google Storage offloader interface.""" 450 451 def offload(self, dir_entry, dest_path, job_complete_time): 452 """Safely offload a directory entry to Google Storage. 453 454 This method is responsible for copying the contents of 455 `dir_entry` to Google storage at `dest_path`. 456 457 When successful, the method must delete all of `dir_entry`. 458 On failure, `dir_entry` should be left undisturbed, in order 459 to allow for retry. 460 461 Errors are conveyed simply and solely by two methods: 462 * At the time of failure, write enough information to the log 463 to allow later debug, if necessary. 464 * Don't delete the content. 465 466 In order to guarantee robustness, this method must not raise any 467 exceptions. 468 469 @param dir_entry: Directory entry to offload. 470 @param dest_path: Location in google storage where we will 471 offload the directory. 472 @param job_complete_time: The complete time of the job from the AFE 473 database. 474 """ 475 try: 476 self._full_offload(dir_entry, dest_path, job_complete_time) 477 except Exception as e: 478 logging.debug('Exception in offload for %s', dir_entry) 479 logging.debug('Ignoring this error: %s', str(e)) 480 481 @abc.abstractmethod 482 def _full_offload(self, dir_entry, dest_path, job_complete_time): 483 """Offload a directory entry to Google Storage. 484 485 This method implements the actual offload behavior of its 486 subclass. To guarantee effective debug, this method should 487 catch all exceptions, and perform any reasonable diagnosis 488 or other handling. 489 490 @param dir_entry: Directory entry to offload. 491 @param dest_path: Location in google storage where we will 492 offload the directory. 493 @param job_complete_time: The complete time of the job from the AFE 494 database. 495 """ 496 497 498class GSOffloader(BaseGSOffloader): 499 """Google Storage Offloader.""" 500 501 def __init__(self, gs_uri, multiprocessing, delete_age, 502 console_client=None): 503 """Returns the offload directory function for the given gs_uri 504 505 @param gs_uri: Google storage bucket uri to offload to. 506 @param multiprocessing: True to turn on -m option for gsutil. 507 @param console_client: The cloud console client. If None, 508 cloud console APIs are not called. 509 """ 510 self._gs_uri = gs_uri 511 self._multiprocessing = multiprocessing 512 self._delete_age = delete_age 513 self._console_client = console_client 514 515 @metrics.SecondsTimerDecorator( 516 'chromeos/autotest/gs_offloader/job_offload_duration') 517 def _full_offload(self, dir_entry, dest_path, job_complete_time): 518 """Offload the specified directory entry to Google storage. 519 520 @param dir_entry: Directory entry to offload. 521 @param dest_path: Location in google storage where we will 522 offload the directory. 523 @param job_complete_time: The complete time of the job from the AFE 524 database. 525 """ 526 with tempfile.TemporaryFile('w+') as stdout_file, \ 527 tempfile.TemporaryFile('w+') as stderr_file: 528 try: 529 try: 530 self._try_offload(dir_entry, dest_path, stdout_file, 531 stderr_file) 532 except OSError as e: 533 # Correct file permission error of the directory, then raise 534 # the exception so gs_offloader can retry later. 535 _handle_dir_os_error(dir_entry, e.errno==errno.EACCES) 536 # Try again after the permission issue is fixed. 537 self._try_offload(dir_entry, dest_path, stdout_file, 538 stderr_file) 539 except _OffloadError as e: 540 metrics_fields = _get_metrics_fields(dir_entry) 541 m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error' 542 metrics.Counter(m_any_error).increment(fields=metrics_fields) 543 544 # Rewind the log files for stdout and stderr and log 545 # their contents. 546 stdout_file.seek(0) 547 stderr_file.seek(0) 548 stderr_content = stderr_file.read() 549 logging.warning('Error occurred when offloading %s:', dir_entry) 550 logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(), 551 stderr_content) 552 553 # Some result files may have wrong file permission. Try 554 # to correct such error so later try can success. 555 # TODO(dshi): The code is added to correct result files 556 # with wrong file permission caused by bug 511778. After 557 # this code is pushed to lab and run for a while to 558 # clean up these files, following code and function 559 # correct_results_folder_permission can be deleted. 560 if 'CommandException: Error opening file' in stderr_content: 561 correct_results_folder_permission(dir_entry) 562 else: 563 self._prune(dir_entry, job_complete_time) 564 swarming_req_dir = _get_swarming_req_dir(dir_entry) 565 if swarming_req_dir: 566 self._prune_swarming_req_dir(swarming_req_dir) 567 568 569 def _try_offload(self, dir_entry, dest_path, 570 stdout_file, stderr_file): 571 """Offload the specified directory entry to Google storage. 572 573 @param dir_entry: Directory entry to offload. 574 @param dest_path: Location in google storage where we will 575 offload the directory. 576 @param job_complete_time: The complete time of the job from the AFE 577 database. 578 @param stdout_file: Log file. 579 @param stderr_file: Log file. 580 """ 581 if _is_uploaded(dir_entry): 582 return 583 start_time = time.time() 584 metrics_fields = _get_metrics_fields(dir_entry) 585 error_obj = _OffloadError(start_time) 586 config = config_loader.load(dir_entry) 587 if config: 588 # TODO(linxinan): use credential file assigned by the side_effect 589 # config. 590 if config.google_storage.bucket: 591 gs_prefix = ('' if 592 config.google_storage.bucket.startswith('gs://') 593 else 'gs://') 594 self._gs_uri = gs_prefix + config.google_storage.bucket 595 else: 596 # For now, the absence of config does not block gs_offloader 597 # from uploading files via default credential. 598 logging.debug('Failed to load the side effects config in %s.', 599 dir_entry) 600 try: 601 sanitize_dir(dir_entry) 602 if LIMIT_FILE_COUNT: 603 limit_file_count(dir_entry) 604 605 process = None 606 with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS): 607 gs_path = '%s%s' % (self._gs_uri, dest_path) 608 cmd = _get_cmd_list(self._multiprocessing, dir_entry, gs_path) 609 logging.debug('Attempting an offload command %s', cmd) 610 process = subprocess.Popen( 611 cmd, stdout=stdout_file, stderr=stderr_file) 612 process.wait() 613 logging.debug('Offload command %s completed; ' 614 'marking offload complete.', cmd) 615 _mark_upload_finished(gs_path, stdout_file, stderr_file) 616 617 _emit_gs_returncode_metric(process.returncode) 618 if process.returncode != 0: 619 raise error_obj 620 _emit_offload_metrics(dir_entry) 621 622 if self._console_client: 623 gcs_uri = os.path.join(gs_path, 624 os.path.basename(dir_entry)) 625 if not self._console_client.send_test_job_offloaded_message( 626 gcs_uri): 627 raise error_obj 628 629 _mark_uploaded(dir_entry) 630 except timeout_util.TimeoutError: 631 m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count' 632 metrics.Counter(m_timeout).increment(fields=metrics_fields) 633 # If we finished the call to Popen(), we may need to 634 # terminate the child process. We don't bother calling 635 # process.poll(); that inherently races because the child 636 # can die any time it wants. 637 if process: 638 try: 639 process.terminate() 640 except OSError: 641 # We don't expect any error other than "No such 642 # process". 643 pass 644 logging.error('Offloading %s timed out after waiting %d ' 645 'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS) 646 raise error_obj 647 648 def _prune(self, dir_entry, job_complete_time): 649 """Prune directory if it is uploaded and expired. 650 651 @param dir_entry: Directory entry to offload. 652 @param job_complete_time: The complete time of the job from the AFE 653 database. 654 """ 655 if not (_is_uploaded(dir_entry) 656 and job_directories.is_job_expired(self._delete_age, 657 job_complete_time)): 658 return 659 try: 660 logging.debug('Pruning uploaded directory %s', dir_entry) 661 shutil.rmtree(dir_entry) 662 job_timestamp_cache.delete(dir_entry) 663 except OSError as e: 664 # The wrong file permission can lead call `shutil.rmtree(dir_entry)` 665 # to raise OSError with message 'Permission denied'. Details can be 666 # found in crbug.com/536151 667 _handle_dir_os_error(dir_entry, e.errno==errno.EACCES) 668 # Try again after the permission issue is fixed. 669 shutil.rmtree(dir_entry) 670 671 def _prune_swarming_req_dir(self, swarming_req_dir): 672 """Prune swarming request directory, if it is empty. 673 674 @param swarming_req_dir: Directory entry of a swarming request. 675 """ 676 try: 677 logging.debug('Pruning swarming request directory %s', 678 swarming_req_dir) 679 os.rmdir(swarming_req_dir) 680 except OSError as e: 681 # Do nothing and leave this directory to next attempt to remove. 682 logging.debug('Failed to prune swarming request directory %s', 683 swarming_req_dir) 684 685 686class _OffloadError(Exception): 687 """Google Storage offload failed.""" 688 689 def __init__(self, start_time): 690 super(_OffloadError, self).__init__(start_time) 691 self.start_time = start_time 692 693 694 695class FakeGSOffloader(BaseGSOffloader): 696 697 """Fake Google Storage Offloader that only deletes directories.""" 698 699 def _full_offload(self, dir_entry, dest_path, job_complete_time): 700 """Pretend to offload a directory and delete it. 701 702 @param dir_entry: Directory entry to offload. 703 @param dest_path: Location in google storage where we will 704 offload the directory. 705 @param job_complete_time: The complete time of the job from the AFE 706 database. 707 """ 708 shutil.rmtree(dir_entry) 709 710 711class OptionalMemoryCache(object): 712 """Implements memory cache if cachetools module can be loaded. 713 714 If the platform has cachetools available then the cache will 715 be created, otherwise the get calls will always act as if there 716 was a cache miss and the set/delete will be no-ops. 717 """ 718 cache = None 719 720 def setup(self, age_to_delete): 721 """Set up a TTL cache size based on how long the job will be handled. 722 723 Autotest jobs are handled by gs_offloader until they are deleted from 724 local storage, base the cache size on how long that is. 725 726 @param age_to_delete: Number of days after which items in the cache 727 should expire. 728 """ 729 if cachetools: 730 # Min cache is 1000 items for 10 mins. If the age to delete is 0 731 # days you still want a short / small cache. 732 # 2000 items is a good approximation for the max number of jobs a 733 # moblab # can produce in a day, lab offloads immediatly so 734 # the number of carried jobs should be very small in the normal 735 # case. 736 ttl = max(age_to_delete * 24 * 60 * 60, 600) 737 maxsize = max(age_to_delete * 2000, 1000) 738 job_timestamp_cache.cache = cachetools.TTLCache(maxsize=maxsize, 739 ttl=ttl) 740 741 def get(self, key): 742 """If we have a cache try to retrieve from it.""" 743 if self.cache is not None: 744 result = self.cache.get(key) 745 return result 746 return None 747 748 def add(self, key, value): 749 """If we have a cache try to store key/value.""" 750 if self.cache is not None: 751 self.cache[key] = value 752 753 def delete(self, key): 754 """If we have a cache try to remove a key.""" 755 if self.cache is not None: 756 return self.cache.delete(key) 757 758 759job_timestamp_cache = OptionalMemoryCache() 760 761 762def _cached_get_timestamp_if_finished(job): 763 """Retrieve a job finished timestamp from cache or AFE. 764 @param job _JobDirectory instance to retrieve 765 finished timestamp of.. 766 767 @returns: None if the job is not finished, or the 768 last job finished time recorded by Autotest. 769 """ 770 job_timestamp = job_timestamp_cache.get(job.dirname) 771 if not job_timestamp: 772 job_timestamp = job.get_timestamp_if_finished() 773 if job_timestamp: 774 job_timestamp_cache.add(job.dirname, job_timestamp) 775 return job_timestamp 776 777 778def _is_expired(job, age_limit): 779 """Return whether job directory is expired for uploading 780 781 @param job: _JobDirectory instance. 782 @param age_limit: Minimum age in days at which a job may be offloaded. 783 """ 784 job_timestamp = _cached_get_timestamp_if_finished(job) 785 if not job_timestamp: 786 return False 787 return job_directories.is_job_expired(age_limit, job_timestamp) 788 789 790def _emit_offload_metrics(dirpath): 791 """Emit gs offload metrics. 792 793 @param dirpath: Offloaded directory path. 794 """ 795 dir_size = file_utils.get_directory_size_kibibytes(dirpath) 796 metrics_fields = _get_metrics_fields(dirpath) 797 798 m_offload_count = ( 799 'chromeos/autotest/gs_offloader/jobs_offloaded') 800 metrics.Counter(m_offload_count).increment( 801 fields=metrics_fields) 802 m_offload_size = ('chromeos/autotest/gs_offloader/' 803 'kilobytes_transferred') 804 metrics.Counter(m_offload_size).increment_by( 805 dir_size, fields=metrics_fields) 806 807 808def _is_uploaded(dirpath): 809 """Return whether directory has been uploaded. 810 811 @param dirpath: Directory path string. 812 """ 813 return os.path.isfile(_get_uploaded_marker_file(dirpath)) 814 815 816def _mark_uploaded(dirpath): 817 """Mark directory as uploaded. 818 819 @param dirpath: Directory path string. 820 """ 821 logging.debug('Creating uploaded marker for directory %s', dirpath) 822 with open(_get_uploaded_marker_file(dirpath), 'a'): 823 pass 824 825 826def _mark_upload_finished(gs_path, stdout_file, stderr_file): 827 """Mark a given gs_path upload as finished (remotely). 828 829 @param gs_path: gs:// url of the remote directory that is finished 830 upload. 831 """ 832 cmd = _get_finish_cmd_list(gs_path) 833 process = subprocess.Popen(cmd, stdout=stdout_file, stderr=stderr_file) 834 process.wait() 835 logging.debug('Finished marking as complete %s', cmd) 836 837 838def _get_uploaded_marker_file(dirpath): 839 """Return path to upload marker file for directory. 840 841 @param dirpath: Directory path string. 842 """ 843 return '%s/.GS_UPLOADED' % (dirpath,) 844 845 846def _format_job_for_failure_reporting(job): 847 """Formats a _JobDirectory for reporting / logging. 848 849 @param job: The _JobDirectory to format. 850 """ 851 d = datetime.datetime.fromtimestamp(job.first_offload_start) 852 data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT), 853 job.offload_count, 854 job.dirname) 855 return FAILED_OFFLOADS_LINE_FORMAT % data 856 857 858def wait_for_gs_write_access(gs_uri): 859 """Verify and wait until we have write access to Google Storage. 860 861 @param gs_uri: The Google Storage URI we are trying to offload to. 862 """ 863 # TODO (sbasi) Try to use the gsutil command to check write access. 864 # Ensure we have write access to gs_uri. 865 dummy_file = tempfile.NamedTemporaryFile() 866 test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri) 867 while True: 868 logging.debug('Checking for write access with dummy file %s', 869 dummy_file.name) 870 try: 871 subprocess.check_call(test_cmd) 872 subprocess.check_call( 873 ['gsutil', 'rm', 874 os.path.join(gs_uri, 875 os.path.basename(dummy_file.name))]) 876 break 877 except subprocess.CalledProcessError: 878 t = 120 879 logging.debug('Unable to offload dummy file to %s, sleeping for %s ' 880 'seconds.', gs_uri, t) 881 time.sleep(t) 882 logging.debug('Dummy file write check to gs succeeded.') 883 884 885class Offloader(object): 886 """State of the offload process. 887 888 Contains the following member fields: 889 * _gs_offloader: _BaseGSOffloader to use to offload a job directory. 890 * _jobdir_classes: List of classes of job directory to be 891 offloaded. 892 * _processes: Maximum number of outstanding offload processes 893 to allow during an offload cycle. 894 * _age_limit: Minimum age in days at which a job may be 895 offloaded. 896 * _open_jobs: a dictionary mapping directory paths to Job 897 objects. 898 """ 899 900 def __init__(self, options): 901 self._upload_age_limit = options.age_to_upload 902 self._delete_age_limit = options.age_to_delete 903 if options.delete_only: 904 self._gs_offloader = FakeGSOffloader() 905 else: 906 self.gs_uri = utils.get_offload_gsuri() 907 logging.debug('Offloading to: %s', self.gs_uri) 908 multiprocessing = False 909 if options.multiprocessing: 910 multiprocessing = True 911 elif options.multiprocessing is None: 912 multiprocessing = GS_OFFLOADER_MULTIPROCESSING 913 logging.info( 914 'Offloader multiprocessing is set to:%r', multiprocessing) 915 console_client = None 916 if (cloud_console_client and 917 cloud_console_client.is_cloud_notification_enabled()): 918 console_client = cloud_console_client.PubSubBasedClient() 919 self._gs_offloader = GSOffloader( 920 self.gs_uri, multiprocessing, self._delete_age_limit, 921 console_client) 922 classlist = [ 923 job_directories.SwarmingJobDirectory, 924 ] 925 if options.process_hosts_only or options.process_all: 926 classlist.append(job_directories.SpecialJobDirectory) 927 if not options.process_hosts_only: 928 classlist.append(job_directories.RegularJobDirectory) 929 self._jobdir_classes = classlist 930 assert self._jobdir_classes 931 self._processes = options.parallelism 932 self._open_jobs = {} 933 self._pusub_topic = None 934 self._offload_count_limit = 3 935 936 937 def _add_new_jobs(self): 938 """Find new job directories that need offloading. 939 940 Go through the file system looking for valid job directories 941 that are currently not in `self._open_jobs`, and add them in. 942 943 """ 944 new_job_count = 0 945 for cls in self._jobdir_classes: 946 for resultsdir in cls.get_job_directories(): 947 if resultsdir in self._open_jobs: 948 continue 949 self._open_jobs[resultsdir] = cls(resultsdir) 950 new_job_count += 1 951 logging.debug('Start of offload cycle - found %d new jobs', 952 new_job_count) 953 954 955 def _remove_offloaded_jobs(self): 956 """Removed offloaded jobs from `self._open_jobs`.""" 957 removed_job_count = 0 958 # must be list to make a copy of the dictionary to allow deletion 959 # during iterations. 960 for jobkey, job in list(six.iteritems(self._open_jobs)): 961 if ( 962 not os.path.exists(job.dirname) 963 or _is_uploaded(job.dirname)): 964 del self._open_jobs[jobkey] 965 removed_job_count += 1 966 logging.debug('End of offload cycle - cleared %d jobs, ' 967 'carrying %d open jobs', 968 removed_job_count, len(self._open_jobs)) 969 970 971 def _report_failed_jobs(self): 972 """Report status after attempting offload. 973 974 This function processes all jobs in `self._open_jobs`, assuming 975 an attempt has just been made to offload all of them. 976 977 If any jobs have reportable errors, and we haven't generated 978 an e-mail report in the last `REPORT_INTERVAL_SECS` seconds, 979 send new e-mail describing the failures. 980 981 """ 982 failed_jobs = [j for j in self._open_jobs.values() if 983 j.first_offload_start] 984 self._report_failed_jobs_count(failed_jobs) 985 self._log_failed_jobs_locally(failed_jobs) 986 987 988 def offload_once(self): 989 """Perform one offload cycle. 990 991 Find all job directories for new jobs that we haven't seen 992 before. Then, attempt to offload the directories for any 993 jobs that have finished running. Offload of multiple jobs 994 is done in parallel, up to `self._processes` at a time. 995 996 After we've tried uploading all directories, go through the list 997 checking the status of all uploaded directories. If necessary, 998 report failures via e-mail. 999 1000 """ 1001 self._add_new_jobs() 1002 self._report_current_jobs_count() 1003 with parallel.BackgroundTaskRunner( 1004 self._gs_offloader.offload, processes=self._processes) as queue: 1005 for job in self._open_jobs.values(): 1006 _enqueue_offload(job, queue, self._upload_age_limit) 1007 self._give_up_on_jobs_over_limit() 1008 self._remove_offloaded_jobs() 1009 self._report_failed_jobs() 1010 1011 1012 def _give_up_on_jobs_over_limit(self): 1013 """Give up on jobs that have gone over the offload limit. 1014 1015 We mark them as uploaded as we won't try to offload them any more. 1016 """ 1017 for job in self._open_jobs.values(): 1018 if job.offload_count >= self._offload_count_limit: 1019 _mark_uploaded(job.dirname) 1020 1021 1022 def _log_failed_jobs_locally(self, failed_jobs, 1023 log_file=FAILED_OFFLOADS_FILE): 1024 """Updates a local file listing all the failed jobs. 1025 1026 The dropped file can be used by the developers to list jobs that we have 1027 failed to upload. 1028 1029 @param failed_jobs: A list of failed _JobDirectory objects. 1030 @param log_file: The file to log the failed jobs to. 1031 """ 1032 now = datetime.datetime.now() 1033 now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT) 1034 formatted_jobs = [_format_job_for_failure_reporting(job) 1035 for job in failed_jobs] 1036 formatted_jobs.sort() 1037 1038 with open(log_file, 'w') as logfile: 1039 logfile.write(FAILED_OFFLOADS_FILE_HEADER % 1040 (now_str, len(failed_jobs))) 1041 logfile.writelines(formatted_jobs) 1042 1043 1044 def _report_current_jobs_count(self): 1045 """Report the number of outstanding jobs to monarch.""" 1046 metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set( 1047 len(self._open_jobs)) 1048 1049 1050 def _report_failed_jobs_count(self, failed_jobs): 1051 """Report the number of outstanding failed offload jobs to monarch. 1052 1053 @param: List of failed jobs. 1054 """ 1055 metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set( 1056 len(failed_jobs)) 1057 1058 1059def _enqueue_offload(job, queue, age_limit): 1060 """Enqueue the job for offload, if it's eligible. 1061 1062 The job is eligible for offloading if the database has marked 1063 it finished, and the job is older than the `age_limit` 1064 parameter. 1065 1066 If the job is eligible, offload processing is requested by 1067 passing the `queue` parameter's `put()` method a sequence with 1068 the job's `dirname` attribute and its directory name. 1069 1070 @param job _JobDirectory instance to offload. 1071 @param queue If the job should be offloaded, put the offload 1072 parameters into this queue for processing. 1073 @param age_limit Minimum age for a job to be offloaded. A value 1074 of 0 means that the job will be offloaded as 1075 soon as it is finished. 1076 1077 """ 1078 if not job.offload_count: 1079 if not _is_expired(job, age_limit): 1080 return 1081 job.first_offload_start = time.time() 1082 job.offload_count += 1 1083 if job.process_gs_instructions(): 1084 timestamp = _cached_get_timestamp_if_finished(job) 1085 queue.put([job.dirname, os.path.dirname(job.dirname), timestamp]) 1086 1087 1088def parse_options(): 1089 """Parse the args passed into gs_offloader.""" 1090 defaults = 'Defaults:\n Destination: %s\n Results Path: %s' % ( 1091 utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR) 1092 usage = 'usage: %prog [options]\n' + defaults 1093 parser = OptionParser(usage) 1094 parser.add_option('-a', '--all', dest='process_all', 1095 action='store_true', 1096 help='Offload all files in the results directory.') 1097 parser.add_option('-s', '--hosts', dest='process_hosts_only', 1098 action='store_true', 1099 help='Offload only the special tasks result files ' 1100 'located in the results/hosts subdirectory') 1101 parser.add_option('-p', '--parallelism', dest='parallelism', 1102 type='int', default=1, 1103 help='Number of parallel workers to use.') 1104 parser.add_option('-o', '--delete_only', dest='delete_only', 1105 action='store_true', 1106 help='GS Offloader will only the delete the ' 1107 'directories and will not offload them to google ' 1108 'storage. NOTE: If global_config variable ' 1109 'CROS.gs_offloading_enabled is False, --delete_only ' 1110 'is automatically True.', 1111 default=not GS_OFFLOADING_ENABLED) 1112 parser.add_option('-d', '--days_old', dest='days_old', 1113 help='Minimum job age in days before a result can be ' 1114 'offloaded.', type='int', default=0) 1115 parser.add_option('-l', '--log_size', dest='log_size', 1116 help='Limit the offloader logs to a specified ' 1117 'number of Mega Bytes.', type='int', default=0) 1118 parser.add_option('-m', dest='multiprocessing', action='store_true', 1119 help='Turn on -m option for gsutil. If not set, the ' 1120 'global config setting gs_offloader_multiprocessing ' 1121 'under CROS section is applied.') 1122 parser.add_option('-i', '--offload_once', dest='offload_once', 1123 action='store_true', 1124 help='Upload all available results and then exit.') 1125 parser.add_option('-y', '--normal_priority', dest='normal_priority', 1126 action='store_true', 1127 help='Upload using normal process priority.') 1128 parser.add_option('-u', '--age_to_upload', dest='age_to_upload', 1129 help='Minimum job age in days before a result can be ' 1130 'offloaded, but not removed from local storage', 1131 type='int', default=None) 1132 parser.add_option('-n', '--age_to_delete', dest='age_to_delete', 1133 help='Minimum job age in days before a result can be ' 1134 'removed from local storage', 1135 type='int', default=None) 1136 parser.add_option( 1137 '--metrics-file', 1138 help='If provided, drop metrics to this local file instead of ' 1139 'reporting to ts_mon', 1140 type=str, 1141 default=None, 1142 ) 1143 parser.add_option('-t', '--enable_timestamp_cache', 1144 dest='enable_timestamp_cache', 1145 action='store_true', 1146 help='Cache the finished timestamps from AFE.') 1147 1148 options = parser.parse_args()[0] 1149 if options.process_all and options.process_hosts_only: 1150 parser.print_help() 1151 print ('Cannot process all files and only the hosts ' 1152 'subdirectory. Please remove an argument.') 1153 sys.exit(1) 1154 1155 if options.days_old and (options.age_to_upload or options.age_to_delete): 1156 parser.print_help() 1157 print('Use the days_old option or the age_to_* options but not both') 1158 sys.exit(1) 1159 1160 if options.age_to_upload == None: 1161 options.age_to_upload = options.days_old 1162 if options.age_to_delete == None: 1163 options.age_to_delete = options.days_old 1164 1165 return options 1166 1167 1168def main(): 1169 """Main method of gs_offloader.""" 1170 options = parse_options() 1171 1172 if options.process_all: 1173 offloader_type = 'all' 1174 elif options.process_hosts_only: 1175 offloader_type = 'hosts' 1176 else: 1177 offloader_type = 'jobs' 1178 1179 _setup_logging(options, offloader_type) 1180 1181 if options.enable_timestamp_cache: 1182 # Extend the cache expiry time by another 1% so the timstamps 1183 # are available as the results are purged. 1184 job_timestamp_cache.setup(options.age_to_delete * 1.01) 1185 1186 # Nice our process (carried to subprocesses) so we don't overload 1187 # the system. 1188 if not options.normal_priority: 1189 logging.debug('Set process to nice value: %d', NICENESS) 1190 os.nice(NICENESS) 1191 if psutil: 1192 proc = psutil.Process() 1193 logging.debug('Set process to ionice IDLE') 1194 proc.ionice(psutil.IOPRIO_CLASS_IDLE) 1195 1196 # os.listdir returns relative paths, so change to where we need to 1197 # be to avoid an os.path.join on each loop. 1198 logging.debug('Offloading Autotest results in %s', RESULTS_DIR) 1199 os.chdir(RESULTS_DIR) 1200 1201 service_name = 'gs_offloader(%s)' % offloader_type 1202 with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True, 1203 short_lived=False, 1204 debug_file=options.metrics_file): 1205 with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'): 1206 offloader = Offloader(options) 1207 if not options.delete_only: 1208 wait_for_gs_write_access(offloader.gs_uri) 1209 while True: 1210 offloader.offload_once() 1211 if options.offload_once: 1212 break 1213 time.sleep(SLEEP_TIME_SECS) 1214 1215 1216_LOG_LOCATION = '/usr/local/autotest/logs/' 1217_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt' 1218_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S' 1219_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' 1220 1221 1222def _setup_logging(options, offloader_type): 1223 """Set up logging. 1224 1225 @param options: Parsed options. 1226 @param offloader_type: Type of offloader action as string. 1227 """ 1228 log_filename = _get_log_filename(options, offloader_type) 1229 log_formatter = logging.Formatter(_LOGGING_FORMAT) 1230 # Replace the default logging handler with a RotatingFileHandler. If 1231 # options.log_size is 0, the file size will not be limited. Keeps 1232 # one backup just in case. 1233 handler = logging.handlers.RotatingFileHandler( 1234 log_filename, maxBytes=1024 * options.log_size, backupCount=1) 1235 handler.setFormatter(log_formatter) 1236 logger = logging.getLogger() 1237 logger.setLevel(logging.DEBUG) 1238 logger.addHandler(handler) 1239 1240 1241def _get_log_filename(options, offloader_type): 1242 """Get log filename. 1243 1244 @param options: Parsed options. 1245 @param offloader_type: Type of offloader action as string. 1246 """ 1247 if options.log_size > 0: 1248 log_timestamp = '' 1249 else: 1250 log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT) 1251 log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp) 1252 return os.path.join(_LOG_LOCATION, log_basename) 1253 1254 1255if __name__ == '__main__': 1256 main() 1257