xref: /aosp_15_r20/external/autotest/site_utils/gs_offloader.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/python3
2#
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Script to archive old Autotest results to Google Storage.
8
9Uses gsutil to archive files to the configured Google Storage bucket.
10Upon successful copy, the local results directory is deleted.
11"""
12
13from __future__ import absolute_import
14from __future__ import division
15from __future__ import print_function
16
17import abc
18try:
19    import cachetools
20except ImportError:
21    cachetools = None
22import datetime
23import errno
24import glob
25import logging
26import logging.handlers
27import os
28import re
29import shutil
30import stat
31import subprocess
32import sys
33import tarfile
34import tempfile
35import time
36
37from optparse import OptionParser
38
39import common
40from autotest_lib.client.common_lib import file_utils
41from autotest_lib.client.common_lib import global_config
42from autotest_lib.client.common_lib import utils
43from autotest_lib.site_utils import job_directories
44# For unittest, the cloud_console.proto is not compiled yet.
45try:
46    from autotest_lib.site_utils import cloud_console_client
47except ImportError:
48    cloud_console_client = None
49from autotest_lib.tko import models
50from autotest_lib.utils import labellib
51from autotest_lib.utils import gslib
52from autotest_lib.utils.side_effects import config_loader
53from autotest_lib.utils.frozen_chromite.lib import timeout_util
54
55# Autotest requires the psutil module from site-packages, so it must be imported
56# after "import common".
57try:
58    # Does not exist, nor is needed, on moblab.
59    import psutil
60except ImportError:
61    psutil = None
62
63from autotest_lib.utils.frozen_chromite.lib import parallel
64import six
65try:
66    from autotest_lib.utils.frozen_chromite.lib import metrics
67    from autotest_lib.utils.frozen_chromite.lib import ts_mon_config
68except ImportError:
69    metrics = utils.metrics_mock
70    ts_mon_config = utils.metrics_mock
71
72
73GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
74        'CROS', 'gs_offloading_enabled', type=bool, default=True)
75
76# Nice setting for process, the higher the number the lower the priority.
77NICENESS = 10
78
79# Maximum number of seconds to allow for offloading a single
80# directory.
81OFFLOAD_TIMEOUT_SECS = 60 * 60
82
83# Sleep time per loop.
84SLEEP_TIME_SECS = 5
85
86# Minimum number of seconds between e-mail reports.
87REPORT_INTERVAL_SECS = 60 * 60
88
89# Location of Autotest results on disk.
90RESULTS_DIR = '/usr/local/autotest/results'
91FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS')
92
93FAILED_OFFLOADS_FILE_HEADER = '''
94This is the list of gs_offloader failed jobs.
95Last offloader attempt at %s failed to offload %d files.
96Check http://go/cros-triage-gsoffloader to triage the issue
97
98
99First failure       Count   Directory name
100=================== ======  ==============================
101'''
102# --+----1----+----  ----+  ----+----1----+----2----+----3
103
104FAILED_OFFLOADS_LINE_FORMAT = '%19s  %5d  %-1s\n'
105FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
106
107USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
108        'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
109
110LIMIT_FILE_COUNT = True
111
112# Use multiprocessing for gsutil uploading.
113GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value(
114        'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
115
116
117# metadata type
118GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
119GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
120
121# Autotest test to collect list of CTS tests
122TEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only'
123
124def _get_metrics_fields(dir_entry):
125    """Get metrics fields for the given test result directory, including board
126    and milestone.
127
128    @param dir_entry: Directory entry to offload.
129    @return A dictionary for the metrics data to be uploaded.
130    """
131    fields = {'board': 'unknown',
132              'milestone': 'unknown'}
133    if dir_entry:
134        # There could be multiple hosts in the job directory, use the first one
135        # available.
136        for host in glob.glob(os.path.join(dir_entry, '*')):
137            try:
138                keyval = models.test.parse_job_keyval(host)
139            except ValueError:
140                continue
141            build = keyval.get('build')
142            if build:
143                try:
144                    cros_version = labellib.parse_cros_version(build)
145                    fields['board'] = cros_version.board
146                    fields['milestone'] = cros_version.milestone
147                    break
148                except ValueError:
149                    # Ignore version parsing error so it won't crash
150                    # gs_offloader.
151                    pass
152
153    return fields
154
155
156def _get_cmd_list(multiprocessing, dir_entry, gs_path):
157    """Return the command to offload a specified directory.
158
159    @param multiprocessing: True to turn on -m option for gsutil.
160    @param dir_entry: Directory entry/path that which we need a cmd_list
161                      to offload.
162    @param gs_path: Location in google storage where we will
163                    offload the directory.
164
165    @return A command list to be executed by Popen.
166    """
167    cmd = ['gsutil']
168    if multiprocessing:
169        cmd.append('-m')
170    if USE_RSYNC_ENABLED:
171        cmd.append('rsync')
172        target = os.path.join(gs_path, os.path.basename(dir_entry))
173    else:
174        cmd.append('cp')
175        target = gs_path
176    cmd += ['-eR', dir_entry, target]
177    return cmd
178
179
180def _get_finish_cmd_list(gs_path):
181    """Returns a command to remotely mark a given gs path as finished.
182
183    @param gs_path: Location in google storage where the offload directory
184                    should be marked as finished.
185
186    @return A command list to be executed by Popen.
187    """
188    target = os.path.join(gs_path, '.finished_offload')
189    return [
190        'gsutil',
191        'cp',
192        '/dev/null',
193        target,
194        ]
195
196
197def sanitize_dir(dirpath):
198    """Sanitize directory for gs upload.
199
200    Symlinks and FIFOS are converted to regular files to fix bugs.
201
202    @param dirpath: Directory entry to be sanitized.
203    """
204    if not os.path.exists(dirpath):
205        return
206    _escape_rename(dirpath)
207    _escape_rename_dir_contents(dirpath)
208    _sanitize_fifos(dirpath)
209    _sanitize_symlinks(dirpath)
210
211
212def _escape_rename_dir_contents(dirpath):
213    """Recursively rename directory to escape filenames for gs upload.
214
215    @param dirpath: Directory path string.
216    """
217    for filename in os.listdir(dirpath):
218        path = os.path.join(dirpath, filename)
219        _escape_rename(path)
220    for filename in os.listdir(dirpath):
221        path = os.path.join(dirpath, filename)
222        if os.path.isdir(path):
223            _escape_rename_dir_contents(path)
224
225
226def _escape_rename(path):
227    """Rename file to escape filenames for gs upload.
228
229    @param path: File path string.
230    """
231    dirpath, filename = os.path.split(path)
232    sanitized_filename = gslib.escape(filename)
233    sanitized_path = os.path.join(dirpath, sanitized_filename)
234    os.rename(path, sanitized_path)
235
236
237def _sanitize_fifos(dirpath):
238    """Convert fifos to regular files (fixes crbug.com/684122).
239
240    @param dirpath: Directory path string.
241    """
242    for root, _, files in os.walk(dirpath):
243        for filename in files:
244            path = os.path.join(root, filename)
245            file_stat = os.lstat(path)
246            if stat.S_ISFIFO(file_stat.st_mode):
247                _replace_fifo_with_file(path)
248
249
250def _replace_fifo_with_file(path):
251    """Replace a fifo with a normal file.
252
253    @param path: Fifo path string.
254    """
255    logging.debug('Removing fifo %s', path)
256    os.remove(path)
257    logging.debug('Creating fifo marker %s', path)
258    with open(path, 'w') as f:
259        f.write('<FIFO>')
260
261
262def _sanitize_symlinks(dirpath):
263    """Convert Symlinks to regular files (fixes crbug.com/692788).
264
265    @param dirpath: Directory path string.
266    """
267    for root, _, files in os.walk(dirpath):
268        for filename in files:
269            path = os.path.join(root, filename)
270            file_stat = os.lstat(path)
271            if stat.S_ISLNK(file_stat.st_mode):
272                _replace_symlink_with_file(path)
273
274
275def _replace_symlink_with_file(path):
276    """Replace a symlink with a normal file.
277
278    @param path: Symlink path string.
279    """
280    target = os.readlink(path)
281    logging.debug('Removing symlink %s', path)
282    os.remove(path)
283    logging.debug('Creating symlink marker %s', path)
284    with open(path, 'w') as f:
285        f.write('<symlink to %s>' % target)
286
287
288# Maximum number of files in the folder.
289_MAX_FILE_COUNT = 3000
290_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs']
291
292
293def _get_zippable_folders(dir_entry):
294    folders_list = []
295    for folder in os.listdir(dir_entry):
296        folder_path = os.path.join(dir_entry, folder)
297        if (not os.path.isfile(folder_path) and
298                not folder in _FOLDERS_NEVER_ZIP):
299            folders_list.append(folder_path)
300    return folders_list
301
302
303def limit_file_count(dir_entry):
304    """Limit the number of files in given directory.
305
306    The method checks the total number of files in the given directory.
307    If the number is greater than _MAX_FILE_COUNT, the method will
308    compress each folder in the given directory, except folders in
309    _FOLDERS_NEVER_ZIP.
310
311    @param dir_entry: Directory entry to be checked.
312    """
313    try:
314        count = _count_files(dir_entry)
315    except ValueError:
316        logging.warning('Fail to get the file count in folder %s.', dir_entry)
317        return
318    if count < _MAX_FILE_COUNT:
319        return
320
321    # For test job, zip folders in a second level, e.g. 123-debug/host1.
322    # This is to allow autoserv debug folder still be accessible.
323    # For special task, it does not need to dig one level deeper.
324    is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
325                               dir_entry)
326
327    folders = _get_zippable_folders(dir_entry)
328    if not is_special_task:
329        subfolders = []
330        for folder in folders:
331            subfolders.extend(_get_zippable_folders(folder))
332        folders = subfolders
333
334    for folder in folders:
335        _make_into_tarball(folder)
336
337
338def _count_files(dirpath):
339    """Count the number of files in a directory recursively.
340
341    @param dirpath: Directory path string.
342    """
343    return sum(len(files) for _path, _dirs, files in os.walk(dirpath))
344
345
346def _make_into_tarball(dirpath):
347    """Make directory into tarball.
348
349    @param dirpath: Directory path string.
350    """
351    tarpath = '%s.tgz' % dirpath
352    with tarfile.open(tarpath, 'w:gz') as tar:
353        tar.add(dirpath, arcname=os.path.basename(dirpath))
354    shutil.rmtree(dirpath)
355
356
357def correct_results_folder_permission(dir_entry):
358    """Make sure the results folder has the right permission settings.
359
360    For tests running with server-side packaging, the results folder has
361    the owner of root. This must be changed to the user running the
362    autoserv process, so parsing job can access the results folder.
363
364    @param dir_entry: Path to the results folder.
365    """
366    if not dir_entry:
367        return
368
369    logging.info('Trying to correct file permission of %s.', dir_entry)
370    try:
371        owner = '%s:%s' % (os.getuid(), os.getgid())
372        subprocess.check_call(
373                ['sudo', '-n', 'chown', '-R', owner, dir_entry])
374        subprocess.check_call(['chmod', '-R', 'u+rw', dir_entry])
375        subprocess.check_call(
376                ['find', dir_entry, '-type', 'd',
377                 '-exec', 'chmod', 'u+x', '{}', ';'])
378    except subprocess.CalledProcessError as e:
379        logging.error('Failed to modify permission for %s: %s',
380                      dir_entry, e)
381
382
383def _get_swarming_req_dir(path):
384    """
385    Returns the parent directory of |path|, if |path| is a swarming task result.
386
387    @param path: Full path to the result of a task.
388                      e.g. /tmp/results/swarming-44466815c4bc951/1
389
390    @return string of the parent dir or None if not a swarming task.
391    """
392    m_parent = re.match(
393            '(?P<parent_dir>.*/swarming-[0-9a-fA-F]*0)/[1-9a-fA-F]$', path)
394    if m_parent:
395        return m_parent.group('parent_dir')
396    return None
397
398
399def _parse_cts_job_results_file_path(path):
400    """Parse CTS file paths an extract required information from them."""
401
402    # Autotest paths look like:
403    # /317739475-chromeos-test/chromeos4-row9-rack11-host22/
404    # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44
405
406    # Swarming paths look like:
407    # /swarming-458e3a3a7fc6f210/1/autoserv_test/
408    # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44
409
410    folders = path.split(os.sep)
411    if 'swarming' in folders[1]:
412        # Swarming job and attempt combined
413        job_id = "%s-%s" % (folders[-7], folders[-6])
414    else:
415        job_id = folders[-6]
416
417    cts_package = folders[-4]
418    timestamp = folders[-1]
419
420    return job_id, cts_package, timestamp
421
422
423def _emit_gs_returncode_metric(returncode):
424    """Increment the gs_returncode counter based on |returncode|."""
425    m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode'
426    rcode = int(returncode)
427    if rcode < 0 or rcode > 255:
428        rcode = -1
429    metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode})
430
431
432def _handle_dir_os_error(dir_entry, fix_permission=False):
433    """Try to fix the result directory's permission issue if needed.
434
435    @param dir_entry: Directory entry to offload.
436    @param fix_permission: True to change the directory's owner to the same one
437            running gs_offloader.
438    """
439    if fix_permission:
440        correct_results_folder_permission(dir_entry)
441    m_permission_error = ('chromeos/autotest/errors/gs_offloader/'
442                          'wrong_permissions_count')
443    metrics_fields = _get_metrics_fields(dir_entry)
444    metrics.Counter(m_permission_error).increment(fields=metrics_fields)
445
446
447class BaseGSOffloader(six.with_metaclass(abc.ABCMeta, object)):
448
449    """Google Storage offloader interface."""
450
451    def offload(self, dir_entry, dest_path, job_complete_time):
452        """Safely offload a directory entry to Google Storage.
453
454        This method is responsible for copying the contents of
455        `dir_entry` to Google storage at `dest_path`.
456
457        When successful, the method must delete all of `dir_entry`.
458        On failure, `dir_entry` should be left undisturbed, in order
459        to allow for retry.
460
461        Errors are conveyed simply and solely by two methods:
462          * At the time of failure, write enough information to the log
463            to allow later debug, if necessary.
464          * Don't delete the content.
465
466        In order to guarantee robustness, this method must not raise any
467        exceptions.
468
469        @param dir_entry: Directory entry to offload.
470        @param dest_path: Location in google storage where we will
471                          offload the directory.
472        @param job_complete_time: The complete time of the job from the AFE
473                                  database.
474        """
475        try:
476            self._full_offload(dir_entry, dest_path, job_complete_time)
477        except Exception as e:
478            logging.debug('Exception in offload for %s', dir_entry)
479            logging.debug('Ignoring this error: %s', str(e))
480
481    @abc.abstractmethod
482    def _full_offload(self, dir_entry, dest_path, job_complete_time):
483        """Offload a directory entry to Google Storage.
484
485        This method implements the actual offload behavior of its
486        subclass.  To guarantee effective debug, this method should
487        catch all exceptions, and perform any reasonable diagnosis
488        or other handling.
489
490        @param dir_entry: Directory entry to offload.
491        @param dest_path: Location in google storage where we will
492                          offload the directory.
493        @param job_complete_time: The complete time of the job from the AFE
494                                  database.
495        """
496
497
498class GSOffloader(BaseGSOffloader):
499    """Google Storage Offloader."""
500
501    def __init__(self, gs_uri, multiprocessing, delete_age,
502            console_client=None):
503        """Returns the offload directory function for the given gs_uri
504
505        @param gs_uri: Google storage bucket uri to offload to.
506        @param multiprocessing: True to turn on -m option for gsutil.
507        @param console_client: The cloud console client. If None,
508          cloud console APIs are  not called.
509        """
510        self._gs_uri = gs_uri
511        self._multiprocessing = multiprocessing
512        self._delete_age = delete_age
513        self._console_client = console_client
514
515    @metrics.SecondsTimerDecorator(
516            'chromeos/autotest/gs_offloader/job_offload_duration')
517    def _full_offload(self, dir_entry, dest_path, job_complete_time):
518        """Offload the specified directory entry to Google storage.
519
520        @param dir_entry: Directory entry to offload.
521        @param dest_path: Location in google storage where we will
522                          offload the directory.
523        @param job_complete_time: The complete time of the job from the AFE
524                                  database.
525        """
526        with tempfile.TemporaryFile('w+') as stdout_file, \
527             tempfile.TemporaryFile('w+') as stderr_file:
528            try:
529                try:
530                    self._try_offload(dir_entry, dest_path, stdout_file,
531                                      stderr_file)
532                except OSError as e:
533                    # Correct file permission error of the directory, then raise
534                    # the exception so gs_offloader can retry later.
535                    _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
536                    # Try again after the permission issue is fixed.
537                    self._try_offload(dir_entry, dest_path, stdout_file,
538                                      stderr_file)
539            except _OffloadError as e:
540                metrics_fields = _get_metrics_fields(dir_entry)
541                m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error'
542                metrics.Counter(m_any_error).increment(fields=metrics_fields)
543
544                # Rewind the log files for stdout and stderr and log
545                # their contents.
546                stdout_file.seek(0)
547                stderr_file.seek(0)
548                stderr_content = stderr_file.read()
549                logging.warning('Error occurred when offloading %s:', dir_entry)
550                logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
551                                stderr_content)
552
553                # Some result files may have wrong file permission. Try
554                # to correct such error so later try can success.
555                # TODO(dshi): The code is added to correct result files
556                # with wrong file permission caused by bug 511778. After
557                # this code is pushed to lab and run for a while to
558                # clean up these files, following code and function
559                # correct_results_folder_permission can be deleted.
560                if 'CommandException: Error opening file' in stderr_content:
561                    correct_results_folder_permission(dir_entry)
562            else:
563                self._prune(dir_entry, job_complete_time)
564                swarming_req_dir = _get_swarming_req_dir(dir_entry)
565                if swarming_req_dir:
566                    self._prune_swarming_req_dir(swarming_req_dir)
567
568
569    def _try_offload(self, dir_entry, dest_path,
570                 stdout_file, stderr_file):
571        """Offload the specified directory entry to Google storage.
572
573        @param dir_entry: Directory entry to offload.
574        @param dest_path: Location in google storage where we will
575                          offload the directory.
576        @param job_complete_time: The complete time of the job from the AFE
577                                  database.
578        @param stdout_file: Log file.
579        @param stderr_file: Log file.
580        """
581        if _is_uploaded(dir_entry):
582            return
583        start_time = time.time()
584        metrics_fields = _get_metrics_fields(dir_entry)
585        error_obj = _OffloadError(start_time)
586        config = config_loader.load(dir_entry)
587        if config:
588            # TODO(linxinan): use credential file assigned by the side_effect
589            # config.
590            if config.google_storage.bucket:
591                gs_prefix = ('' if
592                             config.google_storage.bucket.startswith('gs://')
593                             else 'gs://')
594                self._gs_uri = gs_prefix + config.google_storage.bucket
595        else:
596            # For now, the absence of config does not block gs_offloader
597            # from uploading files via default credential.
598            logging.debug('Failed to load the side effects config in %s.',
599                          dir_entry)
600        try:
601            sanitize_dir(dir_entry)
602            if LIMIT_FILE_COUNT:
603                limit_file_count(dir_entry)
604
605            process = None
606            with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS):
607                gs_path = '%s%s' % (self._gs_uri, dest_path)
608                cmd = _get_cmd_list(self._multiprocessing, dir_entry, gs_path)
609                logging.debug('Attempting an offload command %s', cmd)
610                process = subprocess.Popen(
611                    cmd, stdout=stdout_file, stderr=stderr_file)
612                process.wait()
613                logging.debug('Offload command %s completed; '
614                              'marking offload complete.', cmd)
615                _mark_upload_finished(gs_path, stdout_file, stderr_file)
616
617            _emit_gs_returncode_metric(process.returncode)
618            if process.returncode != 0:
619                raise error_obj
620            _emit_offload_metrics(dir_entry)
621
622            if self._console_client:
623                gcs_uri = os.path.join(gs_path,
624                        os.path.basename(dir_entry))
625                if not self._console_client.send_test_job_offloaded_message(
626                        gcs_uri):
627                    raise error_obj
628
629            _mark_uploaded(dir_entry)
630        except timeout_util.TimeoutError:
631            m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
632            metrics.Counter(m_timeout).increment(fields=metrics_fields)
633            # If we finished the call to Popen(), we may need to
634            # terminate the child process.  We don't bother calling
635            # process.poll(); that inherently races because the child
636            # can die any time it wants.
637            if process:
638                try:
639                    process.terminate()
640                except OSError:
641                    # We don't expect any error other than "No such
642                    # process".
643                    pass
644            logging.error('Offloading %s timed out after waiting %d '
645                          'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
646            raise error_obj
647
648    def _prune(self, dir_entry, job_complete_time):
649        """Prune directory if it is uploaded and expired.
650
651        @param dir_entry: Directory entry to offload.
652        @param job_complete_time: The complete time of the job from the AFE
653                                  database.
654        """
655        if not (_is_uploaded(dir_entry)
656                and job_directories.is_job_expired(self._delete_age,
657                                                   job_complete_time)):
658            return
659        try:
660            logging.debug('Pruning uploaded directory %s', dir_entry)
661            shutil.rmtree(dir_entry)
662            job_timestamp_cache.delete(dir_entry)
663        except OSError as e:
664            # The wrong file permission can lead call `shutil.rmtree(dir_entry)`
665            # to raise OSError with message 'Permission denied'. Details can be
666            # found in crbug.com/536151
667            _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
668            # Try again after the permission issue is fixed.
669            shutil.rmtree(dir_entry)
670
671    def _prune_swarming_req_dir(self, swarming_req_dir):
672        """Prune swarming request directory, if it is empty.
673
674        @param swarming_req_dir: Directory entry of a swarming request.
675        """
676        try:
677            logging.debug('Pruning swarming request directory %s',
678                          swarming_req_dir)
679            os.rmdir(swarming_req_dir)
680        except OSError as e:
681            # Do nothing and leave this directory to next attempt to remove.
682            logging.debug('Failed to prune swarming request directory %s',
683                          swarming_req_dir)
684
685
686class _OffloadError(Exception):
687    """Google Storage offload failed."""
688
689    def __init__(self, start_time):
690        super(_OffloadError, self).__init__(start_time)
691        self.start_time = start_time
692
693
694
695class FakeGSOffloader(BaseGSOffloader):
696
697    """Fake Google Storage Offloader that only deletes directories."""
698
699    def _full_offload(self, dir_entry, dest_path, job_complete_time):
700        """Pretend to offload a directory and delete it.
701
702        @param dir_entry: Directory entry to offload.
703        @param dest_path: Location in google storage where we will
704                          offload the directory.
705        @param job_complete_time: The complete time of the job from the AFE
706                                  database.
707        """
708        shutil.rmtree(dir_entry)
709
710
711class OptionalMemoryCache(object):
712    """Implements memory cache if cachetools module can be loaded.
713
714   If the platform has cachetools available then the cache will
715   be created, otherwise the get calls will always act as if there
716   was a cache miss and the set/delete will be no-ops.
717   """
718    cache = None
719
720    def setup(self, age_to_delete):
721        """Set up a TTL cache size based on how long the job will be handled.
722
723       Autotest jobs are handled by gs_offloader until they are deleted from
724       local storage, base the cache size on how long that is.
725
726       @param age_to_delete: Number of days after which items in the cache
727                             should expire.
728       """
729        if cachetools:
730            # Min cache is 1000 items for 10 mins. If the age to delete is 0
731            # days you still want a short / small cache.
732            # 2000 items is a good approximation for the max number of jobs a
733            # moblab # can produce in a day, lab offloads immediatly so
734            # the number of carried jobs should be very small in the normal
735            # case.
736            ttl = max(age_to_delete * 24 * 60 * 60, 600)
737            maxsize = max(age_to_delete * 2000, 1000)
738            job_timestamp_cache.cache = cachetools.TTLCache(maxsize=maxsize,
739                                                            ttl=ttl)
740
741    def get(self, key):
742        """If we have a cache try to retrieve from it."""
743        if self.cache is not None:
744            result = self.cache.get(key)
745            return result
746        return None
747
748    def add(self, key, value):
749        """If we have a cache try to store key/value."""
750        if self.cache is not None:
751            self.cache[key] = value
752
753    def delete(self, key):
754        """If we have a cache try to remove a key."""
755        if self.cache is not None:
756            return self.cache.delete(key)
757
758
759job_timestamp_cache = OptionalMemoryCache()
760
761
762def _cached_get_timestamp_if_finished(job):
763    """Retrieve a job finished timestamp from cache or AFE.
764    @param job       _JobDirectory instance to retrieve
765                     finished timestamp of..
766
767    @returns: None if the job is not finished, or the
768              last job finished time recorded by Autotest.
769    """
770    job_timestamp = job_timestamp_cache.get(job.dirname)
771    if not job_timestamp:
772        job_timestamp = job.get_timestamp_if_finished()
773        if job_timestamp:
774            job_timestamp_cache.add(job.dirname, job_timestamp)
775    return job_timestamp
776
777
778def _is_expired(job, age_limit):
779    """Return whether job directory is expired for uploading
780
781    @param job: _JobDirectory instance.
782    @param age_limit:  Minimum age in days at which a job may be offloaded.
783    """
784    job_timestamp = _cached_get_timestamp_if_finished(job)
785    if not job_timestamp:
786        return False
787    return job_directories.is_job_expired(age_limit, job_timestamp)
788
789
790def _emit_offload_metrics(dirpath):
791    """Emit gs offload metrics.
792
793    @param dirpath: Offloaded directory path.
794    """
795    dir_size = file_utils.get_directory_size_kibibytes(dirpath)
796    metrics_fields = _get_metrics_fields(dirpath)
797
798    m_offload_count = (
799            'chromeos/autotest/gs_offloader/jobs_offloaded')
800    metrics.Counter(m_offload_count).increment(
801            fields=metrics_fields)
802    m_offload_size = ('chromeos/autotest/gs_offloader/'
803                      'kilobytes_transferred')
804    metrics.Counter(m_offload_size).increment_by(
805            dir_size, fields=metrics_fields)
806
807
808def _is_uploaded(dirpath):
809    """Return whether directory has been uploaded.
810
811    @param dirpath: Directory path string.
812    """
813    return os.path.isfile(_get_uploaded_marker_file(dirpath))
814
815
816def _mark_uploaded(dirpath):
817    """Mark directory as uploaded.
818
819    @param dirpath: Directory path string.
820    """
821    logging.debug('Creating uploaded marker for directory %s', dirpath)
822    with open(_get_uploaded_marker_file(dirpath), 'a'):
823        pass
824
825
826def _mark_upload_finished(gs_path, stdout_file, stderr_file):
827    """Mark a given gs_path upload as finished (remotely).
828
829    @param gs_path: gs:// url of the remote directory that is finished
830                    upload.
831    """
832    cmd = _get_finish_cmd_list(gs_path)
833    process = subprocess.Popen(cmd, stdout=stdout_file, stderr=stderr_file)
834    process.wait()
835    logging.debug('Finished marking as complete %s', cmd)
836
837
838def _get_uploaded_marker_file(dirpath):
839    """Return path to upload marker file for directory.
840
841    @param dirpath: Directory path string.
842    """
843    return '%s/.GS_UPLOADED' % (dirpath,)
844
845
846def _format_job_for_failure_reporting(job):
847    """Formats a _JobDirectory for reporting / logging.
848
849    @param job: The _JobDirectory to format.
850    """
851    d = datetime.datetime.fromtimestamp(job.first_offload_start)
852    data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT),
853            job.offload_count,
854            job.dirname)
855    return FAILED_OFFLOADS_LINE_FORMAT % data
856
857
858def wait_for_gs_write_access(gs_uri):
859    """Verify and wait until we have write access to Google Storage.
860
861    @param gs_uri: The Google Storage URI we are trying to offload to.
862    """
863    # TODO (sbasi) Try to use the gsutil command to check write access.
864    # Ensure we have write access to gs_uri.
865    dummy_file = tempfile.NamedTemporaryFile()
866    test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri)
867    while True:
868        logging.debug('Checking for write access with dummy file %s',
869                      dummy_file.name)
870        try:
871            subprocess.check_call(test_cmd)
872            subprocess.check_call(
873                    ['gsutil', 'rm',
874                     os.path.join(gs_uri,
875                                  os.path.basename(dummy_file.name))])
876            break
877        except subprocess.CalledProcessError:
878            t = 120
879            logging.debug('Unable to offload dummy file to %s, sleeping for %s '
880                          'seconds.', gs_uri, t)
881            time.sleep(t)
882    logging.debug('Dummy file write check to gs succeeded.')
883
884
885class Offloader(object):
886    """State of the offload process.
887
888    Contains the following member fields:
889      * _gs_offloader:  _BaseGSOffloader to use to offload a job directory.
890      * _jobdir_classes:  List of classes of job directory to be
891        offloaded.
892      * _processes:  Maximum number of outstanding offload processes
893        to allow during an offload cycle.
894      * _age_limit:  Minimum age in days at which a job may be
895        offloaded.
896      * _open_jobs: a dictionary mapping directory paths to Job
897        objects.
898    """
899
900    def __init__(self, options):
901        self._upload_age_limit = options.age_to_upload
902        self._delete_age_limit = options.age_to_delete
903        if options.delete_only:
904            self._gs_offloader = FakeGSOffloader()
905        else:
906            self.gs_uri = utils.get_offload_gsuri()
907            logging.debug('Offloading to: %s', self.gs_uri)
908            multiprocessing = False
909            if options.multiprocessing:
910                multiprocessing = True
911            elif options.multiprocessing is None:
912                multiprocessing = GS_OFFLOADER_MULTIPROCESSING
913            logging.info(
914                    'Offloader multiprocessing is set to:%r', multiprocessing)
915            console_client = None
916            if (cloud_console_client and
917                    cloud_console_client.is_cloud_notification_enabled()):
918                console_client = cloud_console_client.PubSubBasedClient()
919            self._gs_offloader = GSOffloader(
920                    self.gs_uri, multiprocessing, self._delete_age_limit,
921                    console_client)
922        classlist = [
923                job_directories.SwarmingJobDirectory,
924        ]
925        if options.process_hosts_only or options.process_all:
926            classlist.append(job_directories.SpecialJobDirectory)
927        if not options.process_hosts_only:
928            classlist.append(job_directories.RegularJobDirectory)
929        self._jobdir_classes = classlist
930        assert self._jobdir_classes
931        self._processes = options.parallelism
932        self._open_jobs = {}
933        self._pusub_topic = None
934        self._offload_count_limit = 3
935
936
937    def _add_new_jobs(self):
938        """Find new job directories that need offloading.
939
940        Go through the file system looking for valid job directories
941        that are currently not in `self._open_jobs`, and add them in.
942
943        """
944        new_job_count = 0
945        for cls in self._jobdir_classes:
946            for resultsdir in cls.get_job_directories():
947                if resultsdir in self._open_jobs:
948                    continue
949                self._open_jobs[resultsdir] = cls(resultsdir)
950                new_job_count += 1
951        logging.debug('Start of offload cycle - found %d new jobs',
952                      new_job_count)
953
954
955    def _remove_offloaded_jobs(self):
956        """Removed offloaded jobs from `self._open_jobs`."""
957        removed_job_count = 0
958        # must be list to make a copy of the dictionary to allow deletion
959        # during iterations.
960        for jobkey, job in list(six.iteritems(self._open_jobs)):
961            if (
962                    not os.path.exists(job.dirname)
963                    or _is_uploaded(job.dirname)):
964                del self._open_jobs[jobkey]
965                removed_job_count += 1
966        logging.debug('End of offload cycle - cleared %d jobs, '
967                      'carrying %d open jobs',
968                      removed_job_count, len(self._open_jobs))
969
970
971    def _report_failed_jobs(self):
972        """Report status after attempting offload.
973
974        This function processes all jobs in `self._open_jobs`, assuming
975        an attempt has just been made to offload all of them.
976
977        If any jobs have reportable errors, and we haven't generated
978        an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
979        send new e-mail describing the failures.
980
981        """
982        failed_jobs = [j for j in self._open_jobs.values() if
983                       j.first_offload_start]
984        self._report_failed_jobs_count(failed_jobs)
985        self._log_failed_jobs_locally(failed_jobs)
986
987
988    def offload_once(self):
989        """Perform one offload cycle.
990
991        Find all job directories for new jobs that we haven't seen
992        before.  Then, attempt to offload the directories for any
993        jobs that have finished running.  Offload of multiple jobs
994        is done in parallel, up to `self._processes` at a time.
995
996        After we've tried uploading all directories, go through the list
997        checking the status of all uploaded directories.  If necessary,
998        report failures via e-mail.
999
1000        """
1001        self._add_new_jobs()
1002        self._report_current_jobs_count()
1003        with parallel.BackgroundTaskRunner(
1004                self._gs_offloader.offload, processes=self._processes) as queue:
1005            for job in self._open_jobs.values():
1006                _enqueue_offload(job, queue, self._upload_age_limit)
1007        self._give_up_on_jobs_over_limit()
1008        self._remove_offloaded_jobs()
1009        self._report_failed_jobs()
1010
1011
1012    def _give_up_on_jobs_over_limit(self):
1013        """Give up on jobs that have gone over the offload limit.
1014
1015        We mark them as uploaded as we won't try to offload them any more.
1016        """
1017        for job in self._open_jobs.values():
1018            if job.offload_count >= self._offload_count_limit:
1019                _mark_uploaded(job.dirname)
1020
1021
1022    def _log_failed_jobs_locally(self, failed_jobs,
1023                                 log_file=FAILED_OFFLOADS_FILE):
1024        """Updates a local file listing all the failed jobs.
1025
1026        The dropped file can be used by the developers to list jobs that we have
1027        failed to upload.
1028
1029        @param failed_jobs: A list of failed _JobDirectory objects.
1030        @param log_file: The file to log the failed jobs to.
1031        """
1032        now = datetime.datetime.now()
1033        now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT)
1034        formatted_jobs = [_format_job_for_failure_reporting(job)
1035                            for job in failed_jobs]
1036        formatted_jobs.sort()
1037
1038        with open(log_file, 'w') as logfile:
1039            logfile.write(FAILED_OFFLOADS_FILE_HEADER %
1040                          (now_str, len(failed_jobs)))
1041            logfile.writelines(formatted_jobs)
1042
1043
1044    def _report_current_jobs_count(self):
1045        """Report the number of outstanding jobs to monarch."""
1046        metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set(
1047                len(self._open_jobs))
1048
1049
1050    def _report_failed_jobs_count(self, failed_jobs):
1051        """Report the number of outstanding failed offload jobs to monarch.
1052
1053        @param: List of failed jobs.
1054        """
1055        metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set(
1056                len(failed_jobs))
1057
1058
1059def _enqueue_offload(job, queue, age_limit):
1060    """Enqueue the job for offload, if it's eligible.
1061
1062    The job is eligible for offloading if the database has marked
1063    it finished, and the job is older than the `age_limit`
1064    parameter.
1065
1066    If the job is eligible, offload processing is requested by
1067    passing the `queue` parameter's `put()` method a sequence with
1068    the job's `dirname` attribute and its directory name.
1069
1070    @param job       _JobDirectory instance to offload.
1071    @param queue     If the job should be offloaded, put the offload
1072                     parameters into this queue for processing.
1073    @param age_limit Minimum age for a job to be offloaded.  A value
1074                     of 0 means that the job will be offloaded as
1075                     soon as it is finished.
1076
1077    """
1078    if not job.offload_count:
1079        if not _is_expired(job, age_limit):
1080            return
1081        job.first_offload_start = time.time()
1082    job.offload_count += 1
1083    if job.process_gs_instructions():
1084        timestamp = _cached_get_timestamp_if_finished(job)
1085        queue.put([job.dirname, os.path.dirname(job.dirname), timestamp])
1086
1087
1088def parse_options():
1089    """Parse the args passed into gs_offloader."""
1090    defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (
1091            utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
1092    usage = 'usage: %prog [options]\n' + defaults
1093    parser = OptionParser(usage)
1094    parser.add_option('-a', '--all', dest='process_all',
1095                      action='store_true',
1096                      help='Offload all files in the results directory.')
1097    parser.add_option('-s', '--hosts', dest='process_hosts_only',
1098                      action='store_true',
1099                      help='Offload only the special tasks result files '
1100                      'located in the results/hosts subdirectory')
1101    parser.add_option('-p', '--parallelism', dest='parallelism',
1102                      type='int', default=1,
1103                      help='Number of parallel workers to use.')
1104    parser.add_option('-o', '--delete_only', dest='delete_only',
1105                      action='store_true',
1106                      help='GS Offloader will only the delete the '
1107                      'directories and will not offload them to google '
1108                      'storage. NOTE: If global_config variable '
1109                      'CROS.gs_offloading_enabled is False, --delete_only '
1110                      'is automatically True.',
1111                      default=not GS_OFFLOADING_ENABLED)
1112    parser.add_option('-d', '--days_old', dest='days_old',
1113                      help='Minimum job age in days before a result can be '
1114                      'offloaded.', type='int', default=0)
1115    parser.add_option('-l', '--log_size', dest='log_size',
1116                      help='Limit the offloader logs to a specified '
1117                      'number of Mega Bytes.', type='int', default=0)
1118    parser.add_option('-m', dest='multiprocessing', action='store_true',
1119                      help='Turn on -m option for gsutil. If not set, the '
1120                      'global config setting gs_offloader_multiprocessing '
1121                      'under CROS section is applied.')
1122    parser.add_option('-i', '--offload_once', dest='offload_once',
1123                      action='store_true',
1124                      help='Upload all available results and then exit.')
1125    parser.add_option('-y', '--normal_priority', dest='normal_priority',
1126                      action='store_true',
1127                      help='Upload using normal process priority.')
1128    parser.add_option('-u', '--age_to_upload', dest='age_to_upload',
1129                      help='Minimum job age in days before a result can be '
1130                      'offloaded, but not removed from local storage',
1131                      type='int', default=None)
1132    parser.add_option('-n', '--age_to_delete', dest='age_to_delete',
1133                      help='Minimum job age in days before a result can be '
1134                      'removed from local storage',
1135                      type='int', default=None)
1136    parser.add_option(
1137            '--metrics-file',
1138            help='If provided, drop metrics to this local file instead of '
1139                 'reporting to ts_mon',
1140            type=str,
1141            default=None,
1142    )
1143    parser.add_option('-t', '--enable_timestamp_cache',
1144                      dest='enable_timestamp_cache',
1145                      action='store_true',
1146                      help='Cache the finished timestamps from AFE.')
1147
1148    options = parser.parse_args()[0]
1149    if options.process_all and options.process_hosts_only:
1150        parser.print_help()
1151        print ('Cannot process all files and only the hosts '
1152               'subdirectory. Please remove an argument.')
1153        sys.exit(1)
1154
1155    if options.days_old and (options.age_to_upload or options.age_to_delete):
1156        parser.print_help()
1157        print('Use the days_old option or the age_to_* options but not both')
1158        sys.exit(1)
1159
1160    if options.age_to_upload == None:
1161        options.age_to_upload = options.days_old
1162    if options.age_to_delete == None:
1163        options.age_to_delete = options.days_old
1164
1165    return options
1166
1167
1168def main():
1169    """Main method of gs_offloader."""
1170    options = parse_options()
1171
1172    if options.process_all:
1173        offloader_type = 'all'
1174    elif options.process_hosts_only:
1175        offloader_type = 'hosts'
1176    else:
1177        offloader_type = 'jobs'
1178
1179    _setup_logging(options, offloader_type)
1180
1181    if options.enable_timestamp_cache:
1182        # Extend the cache expiry time by another 1% so the timstamps
1183        # are available as the results are purged.
1184        job_timestamp_cache.setup(options.age_to_delete * 1.01)
1185
1186    # Nice our process (carried to subprocesses) so we don't overload
1187    # the system.
1188    if not options.normal_priority:
1189        logging.debug('Set process to nice value: %d', NICENESS)
1190        os.nice(NICENESS)
1191    if psutil:
1192        proc = psutil.Process()
1193        logging.debug('Set process to ionice IDLE')
1194        proc.ionice(psutil.IOPRIO_CLASS_IDLE)
1195
1196    # os.listdir returns relative paths, so change to where we need to
1197    # be to avoid an os.path.join on each loop.
1198    logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
1199    os.chdir(RESULTS_DIR)
1200
1201    service_name = 'gs_offloader(%s)' % offloader_type
1202    with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True,
1203                                             short_lived=False,
1204                                             debug_file=options.metrics_file):
1205        with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'):
1206            offloader = Offloader(options)
1207            if not options.delete_only:
1208                wait_for_gs_write_access(offloader.gs_uri)
1209            while True:
1210                offloader.offload_once()
1211                if options.offload_once:
1212                    break
1213                time.sleep(SLEEP_TIME_SECS)
1214
1215
1216_LOG_LOCATION = '/usr/local/autotest/logs/'
1217_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
1218_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
1219_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
1220
1221
1222def _setup_logging(options, offloader_type):
1223    """Set up logging.
1224
1225    @param options: Parsed options.
1226    @param offloader_type: Type of offloader action as string.
1227    """
1228    log_filename = _get_log_filename(options, offloader_type)
1229    log_formatter = logging.Formatter(_LOGGING_FORMAT)
1230    # Replace the default logging handler with a RotatingFileHandler. If
1231    # options.log_size is 0, the file size will not be limited. Keeps
1232    # one backup just in case.
1233    handler = logging.handlers.RotatingFileHandler(
1234            log_filename, maxBytes=1024 * options.log_size, backupCount=1)
1235    handler.setFormatter(log_formatter)
1236    logger = logging.getLogger()
1237    logger.setLevel(logging.DEBUG)
1238    logger.addHandler(handler)
1239
1240
1241def _get_log_filename(options, offloader_type):
1242    """Get log filename.
1243
1244    @param options: Parsed options.
1245    @param offloader_type: Type of offloader action as string.
1246    """
1247    if options.log_size > 0:
1248        log_timestamp = ''
1249    else:
1250        log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT)
1251    log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
1252    return os.path.join(_LOG_LOCATION, log_basename)
1253
1254
1255if __name__ == '__main__':
1256    main()
1257