xref: /aosp_15_r20/external/autotest/site_utils/gs_offloader.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li#!/usr/bin/python3
2*9c5db199SXin Li#
3*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
5*9c5db199SXin Li# found in the LICENSE file.
6*9c5db199SXin Li
7*9c5db199SXin Li"""Script to archive old Autotest results to Google Storage.
8*9c5db199SXin Li
9*9c5db199SXin LiUses gsutil to archive files to the configured Google Storage bucket.
10*9c5db199SXin LiUpon successful copy, the local results directory is deleted.
11*9c5db199SXin Li"""
12*9c5db199SXin Li
13*9c5db199SXin Lifrom __future__ import absolute_import
14*9c5db199SXin Lifrom __future__ import division
15*9c5db199SXin Lifrom __future__ import print_function
16*9c5db199SXin Li
17*9c5db199SXin Liimport abc
18*9c5db199SXin Litry:
19*9c5db199SXin Li    import cachetools
20*9c5db199SXin Liexcept ImportError:
21*9c5db199SXin Li    cachetools = None
22*9c5db199SXin Liimport datetime
23*9c5db199SXin Liimport errno
24*9c5db199SXin Liimport glob
25*9c5db199SXin Liimport logging
26*9c5db199SXin Liimport logging.handlers
27*9c5db199SXin Liimport os
28*9c5db199SXin Liimport re
29*9c5db199SXin Liimport shutil
30*9c5db199SXin Liimport stat
31*9c5db199SXin Liimport subprocess
32*9c5db199SXin Liimport sys
33*9c5db199SXin Liimport tarfile
34*9c5db199SXin Liimport tempfile
35*9c5db199SXin Liimport time
36*9c5db199SXin Li
37*9c5db199SXin Lifrom optparse import OptionParser
38*9c5db199SXin Li
39*9c5db199SXin Liimport common
40*9c5db199SXin Lifrom autotest_lib.client.common_lib import file_utils
41*9c5db199SXin Lifrom autotest_lib.client.common_lib import global_config
42*9c5db199SXin Lifrom autotest_lib.client.common_lib import utils
43*9c5db199SXin Lifrom autotest_lib.site_utils import job_directories
44*9c5db199SXin Li# For unittest, the cloud_console.proto is not compiled yet.
45*9c5db199SXin Litry:
46*9c5db199SXin Li    from autotest_lib.site_utils import cloud_console_client
47*9c5db199SXin Liexcept ImportError:
48*9c5db199SXin Li    cloud_console_client = None
49*9c5db199SXin Lifrom autotest_lib.tko import models
50*9c5db199SXin Lifrom autotest_lib.utils import labellib
51*9c5db199SXin Lifrom autotest_lib.utils import gslib
52*9c5db199SXin Lifrom autotest_lib.utils.side_effects import config_loader
53*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import timeout_util
54*9c5db199SXin Li
55*9c5db199SXin Li# Autotest requires the psutil module from site-packages, so it must be imported
56*9c5db199SXin Li# after "import common".
57*9c5db199SXin Litry:
58*9c5db199SXin Li    # Does not exist, nor is needed, on moblab.
59*9c5db199SXin Li    import psutil
60*9c5db199SXin Liexcept ImportError:
61*9c5db199SXin Li    psutil = None
62*9c5db199SXin Li
63*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import parallel
64*9c5db199SXin Liimport six
65*9c5db199SXin Litry:
66*9c5db199SXin Li    from autotest_lib.utils.frozen_chromite.lib import metrics
67*9c5db199SXin Li    from autotest_lib.utils.frozen_chromite.lib import ts_mon_config
68*9c5db199SXin Liexcept ImportError:
69*9c5db199SXin Li    metrics = utils.metrics_mock
70*9c5db199SXin Li    ts_mon_config = utils.metrics_mock
71*9c5db199SXin Li
72*9c5db199SXin Li
73*9c5db199SXin LiGS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
74*9c5db199SXin Li        'CROS', 'gs_offloading_enabled', type=bool, default=True)
75*9c5db199SXin Li
76*9c5db199SXin Li# Nice setting for process, the higher the number the lower the priority.
77*9c5db199SXin LiNICENESS = 10
78*9c5db199SXin Li
79*9c5db199SXin Li# Maximum number of seconds to allow for offloading a single
80*9c5db199SXin Li# directory.
81*9c5db199SXin LiOFFLOAD_TIMEOUT_SECS = 60 * 60
82*9c5db199SXin Li
83*9c5db199SXin Li# Sleep time per loop.
84*9c5db199SXin LiSLEEP_TIME_SECS = 5
85*9c5db199SXin Li
86*9c5db199SXin Li# Minimum number of seconds between e-mail reports.
87*9c5db199SXin LiREPORT_INTERVAL_SECS = 60 * 60
88*9c5db199SXin Li
89*9c5db199SXin Li# Location of Autotest results on disk.
90*9c5db199SXin LiRESULTS_DIR = '/usr/local/autotest/results'
91*9c5db199SXin LiFAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS')
92*9c5db199SXin Li
93*9c5db199SXin LiFAILED_OFFLOADS_FILE_HEADER = '''
94*9c5db199SXin LiThis is the list of gs_offloader failed jobs.
95*9c5db199SXin LiLast offloader attempt at %s failed to offload %d files.
96*9c5db199SXin LiCheck http://go/cros-triage-gsoffloader to triage the issue
97*9c5db199SXin Li
98*9c5db199SXin Li
99*9c5db199SXin LiFirst failure       Count   Directory name
100*9c5db199SXin Li=================== ======  ==============================
101*9c5db199SXin Li'''
102*9c5db199SXin Li# --+----1----+----  ----+  ----+----1----+----2----+----3
103*9c5db199SXin Li
104*9c5db199SXin LiFAILED_OFFLOADS_LINE_FORMAT = '%19s  %5d  %-1s\n'
105*9c5db199SXin LiFAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
106*9c5db199SXin Li
107*9c5db199SXin LiUSE_RSYNC_ENABLED = global_config.global_config.get_config_value(
108*9c5db199SXin Li        'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
109*9c5db199SXin Li
110*9c5db199SXin LiLIMIT_FILE_COUNT = True
111*9c5db199SXin Li
112*9c5db199SXin Li# Use multiprocessing for gsutil uploading.
113*9c5db199SXin LiGS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value(
114*9c5db199SXin Li        'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
115*9c5db199SXin Li
116*9c5db199SXin Li
117*9c5db199SXin Li# metadata type
118*9c5db199SXin LiGS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
119*9c5db199SXin LiGS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
120*9c5db199SXin Li
121*9c5db199SXin Li# Autotest test to collect list of CTS tests
122*9c5db199SXin LiTEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only'
123*9c5db199SXin Li
124*9c5db199SXin Lidef _get_metrics_fields(dir_entry):
125*9c5db199SXin Li    """Get metrics fields for the given test result directory, including board
126*9c5db199SXin Li    and milestone.
127*9c5db199SXin Li
128*9c5db199SXin Li    @param dir_entry: Directory entry to offload.
129*9c5db199SXin Li    @return A dictionary for the metrics data to be uploaded.
130*9c5db199SXin Li    """
131*9c5db199SXin Li    fields = {'board': 'unknown',
132*9c5db199SXin Li              'milestone': 'unknown'}
133*9c5db199SXin Li    if dir_entry:
134*9c5db199SXin Li        # There could be multiple hosts in the job directory, use the first one
135*9c5db199SXin Li        # available.
136*9c5db199SXin Li        for host in glob.glob(os.path.join(dir_entry, '*')):
137*9c5db199SXin Li            try:
138*9c5db199SXin Li                keyval = models.test.parse_job_keyval(host)
139*9c5db199SXin Li            except ValueError:
140*9c5db199SXin Li                continue
141*9c5db199SXin Li            build = keyval.get('build')
142*9c5db199SXin Li            if build:
143*9c5db199SXin Li                try:
144*9c5db199SXin Li                    cros_version = labellib.parse_cros_version(build)
145*9c5db199SXin Li                    fields['board'] = cros_version.board
146*9c5db199SXin Li                    fields['milestone'] = cros_version.milestone
147*9c5db199SXin Li                    break
148*9c5db199SXin Li                except ValueError:
149*9c5db199SXin Li                    # Ignore version parsing error so it won't crash
150*9c5db199SXin Li                    # gs_offloader.
151*9c5db199SXin Li                    pass
152*9c5db199SXin Li
153*9c5db199SXin Li    return fields
154*9c5db199SXin Li
155*9c5db199SXin Li
156*9c5db199SXin Lidef _get_cmd_list(multiprocessing, dir_entry, gs_path):
157*9c5db199SXin Li    """Return the command to offload a specified directory.
158*9c5db199SXin Li
159*9c5db199SXin Li    @param multiprocessing: True to turn on -m option for gsutil.
160*9c5db199SXin Li    @param dir_entry: Directory entry/path that which we need a cmd_list
161*9c5db199SXin Li                      to offload.
162*9c5db199SXin Li    @param gs_path: Location in google storage where we will
163*9c5db199SXin Li                    offload the directory.
164*9c5db199SXin Li
165*9c5db199SXin Li    @return A command list to be executed by Popen.
166*9c5db199SXin Li    """
167*9c5db199SXin Li    cmd = ['gsutil']
168*9c5db199SXin Li    if multiprocessing:
169*9c5db199SXin Li        cmd.append('-m')
170*9c5db199SXin Li    if USE_RSYNC_ENABLED:
171*9c5db199SXin Li        cmd.append('rsync')
172*9c5db199SXin Li        target = os.path.join(gs_path, os.path.basename(dir_entry))
173*9c5db199SXin Li    else:
174*9c5db199SXin Li        cmd.append('cp')
175*9c5db199SXin Li        target = gs_path
176*9c5db199SXin Li    cmd += ['-eR', dir_entry, target]
177*9c5db199SXin Li    return cmd
178*9c5db199SXin Li
179*9c5db199SXin Li
180*9c5db199SXin Lidef _get_finish_cmd_list(gs_path):
181*9c5db199SXin Li    """Returns a command to remotely mark a given gs path as finished.
182*9c5db199SXin Li
183*9c5db199SXin Li    @param gs_path: Location in google storage where the offload directory
184*9c5db199SXin Li                    should be marked as finished.
185*9c5db199SXin Li
186*9c5db199SXin Li    @return A command list to be executed by Popen.
187*9c5db199SXin Li    """
188*9c5db199SXin Li    target = os.path.join(gs_path, '.finished_offload')
189*9c5db199SXin Li    return [
190*9c5db199SXin Li        'gsutil',
191*9c5db199SXin Li        'cp',
192*9c5db199SXin Li        '/dev/null',
193*9c5db199SXin Li        target,
194*9c5db199SXin Li        ]
195*9c5db199SXin Li
196*9c5db199SXin Li
197*9c5db199SXin Lidef sanitize_dir(dirpath):
198*9c5db199SXin Li    """Sanitize directory for gs upload.
199*9c5db199SXin Li
200*9c5db199SXin Li    Symlinks and FIFOS are converted to regular files to fix bugs.
201*9c5db199SXin Li
202*9c5db199SXin Li    @param dirpath: Directory entry to be sanitized.
203*9c5db199SXin Li    """
204*9c5db199SXin Li    if not os.path.exists(dirpath):
205*9c5db199SXin Li        return
206*9c5db199SXin Li    _escape_rename(dirpath)
207*9c5db199SXin Li    _escape_rename_dir_contents(dirpath)
208*9c5db199SXin Li    _sanitize_fifos(dirpath)
209*9c5db199SXin Li    _sanitize_symlinks(dirpath)
210*9c5db199SXin Li
211*9c5db199SXin Li
212*9c5db199SXin Lidef _escape_rename_dir_contents(dirpath):
213*9c5db199SXin Li    """Recursively rename directory to escape filenames for gs upload.
214*9c5db199SXin Li
215*9c5db199SXin Li    @param dirpath: Directory path string.
216*9c5db199SXin Li    """
217*9c5db199SXin Li    for filename in os.listdir(dirpath):
218*9c5db199SXin Li        path = os.path.join(dirpath, filename)
219*9c5db199SXin Li        _escape_rename(path)
220*9c5db199SXin Li    for filename in os.listdir(dirpath):
221*9c5db199SXin Li        path = os.path.join(dirpath, filename)
222*9c5db199SXin Li        if os.path.isdir(path):
223*9c5db199SXin Li            _escape_rename_dir_contents(path)
224*9c5db199SXin Li
225*9c5db199SXin Li
226*9c5db199SXin Lidef _escape_rename(path):
227*9c5db199SXin Li    """Rename file to escape filenames for gs upload.
228*9c5db199SXin Li
229*9c5db199SXin Li    @param path: File path string.
230*9c5db199SXin Li    """
231*9c5db199SXin Li    dirpath, filename = os.path.split(path)
232*9c5db199SXin Li    sanitized_filename = gslib.escape(filename)
233*9c5db199SXin Li    sanitized_path = os.path.join(dirpath, sanitized_filename)
234*9c5db199SXin Li    os.rename(path, sanitized_path)
235*9c5db199SXin Li
236*9c5db199SXin Li
237*9c5db199SXin Lidef _sanitize_fifos(dirpath):
238*9c5db199SXin Li    """Convert fifos to regular files (fixes crbug.com/684122).
239*9c5db199SXin Li
240*9c5db199SXin Li    @param dirpath: Directory path string.
241*9c5db199SXin Li    """
242*9c5db199SXin Li    for root, _, files in os.walk(dirpath):
243*9c5db199SXin Li        for filename in files:
244*9c5db199SXin Li            path = os.path.join(root, filename)
245*9c5db199SXin Li            file_stat = os.lstat(path)
246*9c5db199SXin Li            if stat.S_ISFIFO(file_stat.st_mode):
247*9c5db199SXin Li                _replace_fifo_with_file(path)
248*9c5db199SXin Li
249*9c5db199SXin Li
250*9c5db199SXin Lidef _replace_fifo_with_file(path):
251*9c5db199SXin Li    """Replace a fifo with a normal file.
252*9c5db199SXin Li
253*9c5db199SXin Li    @param path: Fifo path string.
254*9c5db199SXin Li    """
255*9c5db199SXin Li    logging.debug('Removing fifo %s', path)
256*9c5db199SXin Li    os.remove(path)
257*9c5db199SXin Li    logging.debug('Creating fifo marker %s', path)
258*9c5db199SXin Li    with open(path, 'w') as f:
259*9c5db199SXin Li        f.write('<FIFO>')
260*9c5db199SXin Li
261*9c5db199SXin Li
262*9c5db199SXin Lidef _sanitize_symlinks(dirpath):
263*9c5db199SXin Li    """Convert Symlinks to regular files (fixes crbug.com/692788).
264*9c5db199SXin Li
265*9c5db199SXin Li    @param dirpath: Directory path string.
266*9c5db199SXin Li    """
267*9c5db199SXin Li    for root, _, files in os.walk(dirpath):
268*9c5db199SXin Li        for filename in files:
269*9c5db199SXin Li            path = os.path.join(root, filename)
270*9c5db199SXin Li            file_stat = os.lstat(path)
271*9c5db199SXin Li            if stat.S_ISLNK(file_stat.st_mode):
272*9c5db199SXin Li                _replace_symlink_with_file(path)
273*9c5db199SXin Li
274*9c5db199SXin Li
275*9c5db199SXin Lidef _replace_symlink_with_file(path):
276*9c5db199SXin Li    """Replace a symlink with a normal file.
277*9c5db199SXin Li
278*9c5db199SXin Li    @param path: Symlink path string.
279*9c5db199SXin Li    """
280*9c5db199SXin Li    target = os.readlink(path)
281*9c5db199SXin Li    logging.debug('Removing symlink %s', path)
282*9c5db199SXin Li    os.remove(path)
283*9c5db199SXin Li    logging.debug('Creating symlink marker %s', path)
284*9c5db199SXin Li    with open(path, 'w') as f:
285*9c5db199SXin Li        f.write('<symlink to %s>' % target)
286*9c5db199SXin Li
287*9c5db199SXin Li
288*9c5db199SXin Li# Maximum number of files in the folder.
289*9c5db199SXin Li_MAX_FILE_COUNT = 3000
290*9c5db199SXin Li_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs']
291*9c5db199SXin Li
292*9c5db199SXin Li
293*9c5db199SXin Lidef _get_zippable_folders(dir_entry):
294*9c5db199SXin Li    folders_list = []
295*9c5db199SXin Li    for folder in os.listdir(dir_entry):
296*9c5db199SXin Li        folder_path = os.path.join(dir_entry, folder)
297*9c5db199SXin Li        if (not os.path.isfile(folder_path) and
298*9c5db199SXin Li                not folder in _FOLDERS_NEVER_ZIP):
299*9c5db199SXin Li            folders_list.append(folder_path)
300*9c5db199SXin Li    return folders_list
301*9c5db199SXin Li
302*9c5db199SXin Li
303*9c5db199SXin Lidef limit_file_count(dir_entry):
304*9c5db199SXin Li    """Limit the number of files in given directory.
305*9c5db199SXin Li
306*9c5db199SXin Li    The method checks the total number of files in the given directory.
307*9c5db199SXin Li    If the number is greater than _MAX_FILE_COUNT, the method will
308*9c5db199SXin Li    compress each folder in the given directory, except folders in
309*9c5db199SXin Li    _FOLDERS_NEVER_ZIP.
310*9c5db199SXin Li
311*9c5db199SXin Li    @param dir_entry: Directory entry to be checked.
312*9c5db199SXin Li    """
313*9c5db199SXin Li    try:
314*9c5db199SXin Li        count = _count_files(dir_entry)
315*9c5db199SXin Li    except ValueError:
316*9c5db199SXin Li        logging.warning('Fail to get the file count in folder %s.', dir_entry)
317*9c5db199SXin Li        return
318*9c5db199SXin Li    if count < _MAX_FILE_COUNT:
319*9c5db199SXin Li        return
320*9c5db199SXin Li
321*9c5db199SXin Li    # For test job, zip folders in a second level, e.g. 123-debug/host1.
322*9c5db199SXin Li    # This is to allow autoserv debug folder still be accessible.
323*9c5db199SXin Li    # For special task, it does not need to dig one level deeper.
324*9c5db199SXin Li    is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
325*9c5db199SXin Li                               dir_entry)
326*9c5db199SXin Li
327*9c5db199SXin Li    folders = _get_zippable_folders(dir_entry)
328*9c5db199SXin Li    if not is_special_task:
329*9c5db199SXin Li        subfolders = []
330*9c5db199SXin Li        for folder in folders:
331*9c5db199SXin Li            subfolders.extend(_get_zippable_folders(folder))
332*9c5db199SXin Li        folders = subfolders
333*9c5db199SXin Li
334*9c5db199SXin Li    for folder in folders:
335*9c5db199SXin Li        _make_into_tarball(folder)
336*9c5db199SXin Li
337*9c5db199SXin Li
338*9c5db199SXin Lidef _count_files(dirpath):
339*9c5db199SXin Li    """Count the number of files in a directory recursively.
340*9c5db199SXin Li
341*9c5db199SXin Li    @param dirpath: Directory path string.
342*9c5db199SXin Li    """
343*9c5db199SXin Li    return sum(len(files) for _path, _dirs, files in os.walk(dirpath))
344*9c5db199SXin Li
345*9c5db199SXin Li
346*9c5db199SXin Lidef _make_into_tarball(dirpath):
347*9c5db199SXin Li    """Make directory into tarball.
348*9c5db199SXin Li
349*9c5db199SXin Li    @param dirpath: Directory path string.
350*9c5db199SXin Li    """
351*9c5db199SXin Li    tarpath = '%s.tgz' % dirpath
352*9c5db199SXin Li    with tarfile.open(tarpath, 'w:gz') as tar:
353*9c5db199SXin Li        tar.add(dirpath, arcname=os.path.basename(dirpath))
354*9c5db199SXin Li    shutil.rmtree(dirpath)
355*9c5db199SXin Li
356*9c5db199SXin Li
357*9c5db199SXin Lidef correct_results_folder_permission(dir_entry):
358*9c5db199SXin Li    """Make sure the results folder has the right permission settings.
359*9c5db199SXin Li
360*9c5db199SXin Li    For tests running with server-side packaging, the results folder has
361*9c5db199SXin Li    the owner of root. This must be changed to the user running the
362*9c5db199SXin Li    autoserv process, so parsing job can access the results folder.
363*9c5db199SXin Li
364*9c5db199SXin Li    @param dir_entry: Path to the results folder.
365*9c5db199SXin Li    """
366*9c5db199SXin Li    if not dir_entry:
367*9c5db199SXin Li        return
368*9c5db199SXin Li
369*9c5db199SXin Li    logging.info('Trying to correct file permission of %s.', dir_entry)
370*9c5db199SXin Li    try:
371*9c5db199SXin Li        owner = '%s:%s' % (os.getuid(), os.getgid())
372*9c5db199SXin Li        subprocess.check_call(
373*9c5db199SXin Li                ['sudo', '-n', 'chown', '-R', owner, dir_entry])
374*9c5db199SXin Li        subprocess.check_call(['chmod', '-R', 'u+rw', dir_entry])
375*9c5db199SXin Li        subprocess.check_call(
376*9c5db199SXin Li                ['find', dir_entry, '-type', 'd',
377*9c5db199SXin Li                 '-exec', 'chmod', 'u+x', '{}', ';'])
378*9c5db199SXin Li    except subprocess.CalledProcessError as e:
379*9c5db199SXin Li        logging.error('Failed to modify permission for %s: %s',
380*9c5db199SXin Li                      dir_entry, e)
381*9c5db199SXin Li
382*9c5db199SXin Li
383*9c5db199SXin Lidef _get_swarming_req_dir(path):
384*9c5db199SXin Li    """
385*9c5db199SXin Li    Returns the parent directory of |path|, if |path| is a swarming task result.
386*9c5db199SXin Li
387*9c5db199SXin Li    @param path: Full path to the result of a task.
388*9c5db199SXin Li                      e.g. /tmp/results/swarming-44466815c4bc951/1
389*9c5db199SXin Li
390*9c5db199SXin Li    @return string of the parent dir or None if not a swarming task.
391*9c5db199SXin Li    """
392*9c5db199SXin Li    m_parent = re.match(
393*9c5db199SXin Li            '(?P<parent_dir>.*/swarming-[0-9a-fA-F]*0)/[1-9a-fA-F]$', path)
394*9c5db199SXin Li    if m_parent:
395*9c5db199SXin Li        return m_parent.group('parent_dir')
396*9c5db199SXin Li    return None
397*9c5db199SXin Li
398*9c5db199SXin Li
399*9c5db199SXin Lidef _parse_cts_job_results_file_path(path):
400*9c5db199SXin Li    """Parse CTS file paths an extract required information from them."""
401*9c5db199SXin Li
402*9c5db199SXin Li    # Autotest paths look like:
403*9c5db199SXin Li    # /317739475-chromeos-test/chromeos4-row9-rack11-host22/
404*9c5db199SXin Li    # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44
405*9c5db199SXin Li
406*9c5db199SXin Li    # Swarming paths look like:
407*9c5db199SXin Li    # /swarming-458e3a3a7fc6f210/1/autoserv_test/
408*9c5db199SXin Li    # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44
409*9c5db199SXin Li
410*9c5db199SXin Li    folders = path.split(os.sep)
411*9c5db199SXin Li    if 'swarming' in folders[1]:
412*9c5db199SXin Li        # Swarming job and attempt combined
413*9c5db199SXin Li        job_id = "%s-%s" % (folders[-7], folders[-6])
414*9c5db199SXin Li    else:
415*9c5db199SXin Li        job_id = folders[-6]
416*9c5db199SXin Li
417*9c5db199SXin Li    cts_package = folders[-4]
418*9c5db199SXin Li    timestamp = folders[-1]
419*9c5db199SXin Li
420*9c5db199SXin Li    return job_id, cts_package, timestamp
421*9c5db199SXin Li
422*9c5db199SXin Li
423*9c5db199SXin Lidef _emit_gs_returncode_metric(returncode):
424*9c5db199SXin Li    """Increment the gs_returncode counter based on |returncode|."""
425*9c5db199SXin Li    m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode'
426*9c5db199SXin Li    rcode = int(returncode)
427*9c5db199SXin Li    if rcode < 0 or rcode > 255:
428*9c5db199SXin Li        rcode = -1
429*9c5db199SXin Li    metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode})
430*9c5db199SXin Li
431*9c5db199SXin Li
432*9c5db199SXin Lidef _handle_dir_os_error(dir_entry, fix_permission=False):
433*9c5db199SXin Li    """Try to fix the result directory's permission issue if needed.
434*9c5db199SXin Li
435*9c5db199SXin Li    @param dir_entry: Directory entry to offload.
436*9c5db199SXin Li    @param fix_permission: True to change the directory's owner to the same one
437*9c5db199SXin Li            running gs_offloader.
438*9c5db199SXin Li    """
439*9c5db199SXin Li    if fix_permission:
440*9c5db199SXin Li        correct_results_folder_permission(dir_entry)
441*9c5db199SXin Li    m_permission_error = ('chromeos/autotest/errors/gs_offloader/'
442*9c5db199SXin Li                          'wrong_permissions_count')
443*9c5db199SXin Li    metrics_fields = _get_metrics_fields(dir_entry)
444*9c5db199SXin Li    metrics.Counter(m_permission_error).increment(fields=metrics_fields)
445*9c5db199SXin Li
446*9c5db199SXin Li
447*9c5db199SXin Liclass BaseGSOffloader(six.with_metaclass(abc.ABCMeta, object)):
448*9c5db199SXin Li
449*9c5db199SXin Li    """Google Storage offloader interface."""
450*9c5db199SXin Li
451*9c5db199SXin Li    def offload(self, dir_entry, dest_path, job_complete_time):
452*9c5db199SXin Li        """Safely offload a directory entry to Google Storage.
453*9c5db199SXin Li
454*9c5db199SXin Li        This method is responsible for copying the contents of
455*9c5db199SXin Li        `dir_entry` to Google storage at `dest_path`.
456*9c5db199SXin Li
457*9c5db199SXin Li        When successful, the method must delete all of `dir_entry`.
458*9c5db199SXin Li        On failure, `dir_entry` should be left undisturbed, in order
459*9c5db199SXin Li        to allow for retry.
460*9c5db199SXin Li
461*9c5db199SXin Li        Errors are conveyed simply and solely by two methods:
462*9c5db199SXin Li          * At the time of failure, write enough information to the log
463*9c5db199SXin Li            to allow later debug, if necessary.
464*9c5db199SXin Li          * Don't delete the content.
465*9c5db199SXin Li
466*9c5db199SXin Li        In order to guarantee robustness, this method must not raise any
467*9c5db199SXin Li        exceptions.
468*9c5db199SXin Li
469*9c5db199SXin Li        @param dir_entry: Directory entry to offload.
470*9c5db199SXin Li        @param dest_path: Location in google storage where we will
471*9c5db199SXin Li                          offload the directory.
472*9c5db199SXin Li        @param job_complete_time: The complete time of the job from the AFE
473*9c5db199SXin Li                                  database.
474*9c5db199SXin Li        """
475*9c5db199SXin Li        try:
476*9c5db199SXin Li            self._full_offload(dir_entry, dest_path, job_complete_time)
477*9c5db199SXin Li        except Exception as e:
478*9c5db199SXin Li            logging.debug('Exception in offload for %s', dir_entry)
479*9c5db199SXin Li            logging.debug('Ignoring this error: %s', str(e))
480*9c5db199SXin Li
481*9c5db199SXin Li    @abc.abstractmethod
482*9c5db199SXin Li    def _full_offload(self, dir_entry, dest_path, job_complete_time):
483*9c5db199SXin Li        """Offload a directory entry to Google Storage.
484*9c5db199SXin Li
485*9c5db199SXin Li        This method implements the actual offload behavior of its
486*9c5db199SXin Li        subclass.  To guarantee effective debug, this method should
487*9c5db199SXin Li        catch all exceptions, and perform any reasonable diagnosis
488*9c5db199SXin Li        or other handling.
489*9c5db199SXin Li
490*9c5db199SXin Li        @param dir_entry: Directory entry to offload.
491*9c5db199SXin Li        @param dest_path: Location in google storage where we will
492*9c5db199SXin Li                          offload the directory.
493*9c5db199SXin Li        @param job_complete_time: The complete time of the job from the AFE
494*9c5db199SXin Li                                  database.
495*9c5db199SXin Li        """
496*9c5db199SXin Li
497*9c5db199SXin Li
498*9c5db199SXin Liclass GSOffloader(BaseGSOffloader):
499*9c5db199SXin Li    """Google Storage Offloader."""
500*9c5db199SXin Li
501*9c5db199SXin Li    def __init__(self, gs_uri, multiprocessing, delete_age,
502*9c5db199SXin Li            console_client=None):
503*9c5db199SXin Li        """Returns the offload directory function for the given gs_uri
504*9c5db199SXin Li
505*9c5db199SXin Li        @param gs_uri: Google storage bucket uri to offload to.
506*9c5db199SXin Li        @param multiprocessing: True to turn on -m option for gsutil.
507*9c5db199SXin Li        @param console_client: The cloud console client. If None,
508*9c5db199SXin Li          cloud console APIs are  not called.
509*9c5db199SXin Li        """
510*9c5db199SXin Li        self._gs_uri = gs_uri
511*9c5db199SXin Li        self._multiprocessing = multiprocessing
512*9c5db199SXin Li        self._delete_age = delete_age
513*9c5db199SXin Li        self._console_client = console_client
514*9c5db199SXin Li
515*9c5db199SXin Li    @metrics.SecondsTimerDecorator(
516*9c5db199SXin Li            'chromeos/autotest/gs_offloader/job_offload_duration')
517*9c5db199SXin Li    def _full_offload(self, dir_entry, dest_path, job_complete_time):
518*9c5db199SXin Li        """Offload the specified directory entry to Google storage.
519*9c5db199SXin Li
520*9c5db199SXin Li        @param dir_entry: Directory entry to offload.
521*9c5db199SXin Li        @param dest_path: Location in google storage where we will
522*9c5db199SXin Li                          offload the directory.
523*9c5db199SXin Li        @param job_complete_time: The complete time of the job from the AFE
524*9c5db199SXin Li                                  database.
525*9c5db199SXin Li        """
526*9c5db199SXin Li        with tempfile.TemporaryFile('w+') as stdout_file, \
527*9c5db199SXin Li             tempfile.TemporaryFile('w+') as stderr_file:
528*9c5db199SXin Li            try:
529*9c5db199SXin Li                try:
530*9c5db199SXin Li                    self._try_offload(dir_entry, dest_path, stdout_file,
531*9c5db199SXin Li                                      stderr_file)
532*9c5db199SXin Li                except OSError as e:
533*9c5db199SXin Li                    # Correct file permission error of the directory, then raise
534*9c5db199SXin Li                    # the exception so gs_offloader can retry later.
535*9c5db199SXin Li                    _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
536*9c5db199SXin Li                    # Try again after the permission issue is fixed.
537*9c5db199SXin Li                    self._try_offload(dir_entry, dest_path, stdout_file,
538*9c5db199SXin Li                                      stderr_file)
539*9c5db199SXin Li            except _OffloadError as e:
540*9c5db199SXin Li                metrics_fields = _get_metrics_fields(dir_entry)
541*9c5db199SXin Li                m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error'
542*9c5db199SXin Li                metrics.Counter(m_any_error).increment(fields=metrics_fields)
543*9c5db199SXin Li
544*9c5db199SXin Li                # Rewind the log files for stdout and stderr and log
545*9c5db199SXin Li                # their contents.
546*9c5db199SXin Li                stdout_file.seek(0)
547*9c5db199SXin Li                stderr_file.seek(0)
548*9c5db199SXin Li                stderr_content = stderr_file.read()
549*9c5db199SXin Li                logging.warning('Error occurred when offloading %s:', dir_entry)
550*9c5db199SXin Li                logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
551*9c5db199SXin Li                                stderr_content)
552*9c5db199SXin Li
553*9c5db199SXin Li                # Some result files may have wrong file permission. Try
554*9c5db199SXin Li                # to correct such error so later try can success.
555*9c5db199SXin Li                # TODO(dshi): The code is added to correct result files
556*9c5db199SXin Li                # with wrong file permission caused by bug 511778. After
557*9c5db199SXin Li                # this code is pushed to lab and run for a while to
558*9c5db199SXin Li                # clean up these files, following code and function
559*9c5db199SXin Li                # correct_results_folder_permission can be deleted.
560*9c5db199SXin Li                if 'CommandException: Error opening file' in stderr_content:
561*9c5db199SXin Li                    correct_results_folder_permission(dir_entry)
562*9c5db199SXin Li            else:
563*9c5db199SXin Li                self._prune(dir_entry, job_complete_time)
564*9c5db199SXin Li                swarming_req_dir = _get_swarming_req_dir(dir_entry)
565*9c5db199SXin Li                if swarming_req_dir:
566*9c5db199SXin Li                    self._prune_swarming_req_dir(swarming_req_dir)
567*9c5db199SXin Li
568*9c5db199SXin Li
569*9c5db199SXin Li    def _try_offload(self, dir_entry, dest_path,
570*9c5db199SXin Li                 stdout_file, stderr_file):
571*9c5db199SXin Li        """Offload the specified directory entry to Google storage.
572*9c5db199SXin Li
573*9c5db199SXin Li        @param dir_entry: Directory entry to offload.
574*9c5db199SXin Li        @param dest_path: Location in google storage where we will
575*9c5db199SXin Li                          offload the directory.
576*9c5db199SXin Li        @param job_complete_time: The complete time of the job from the AFE
577*9c5db199SXin Li                                  database.
578*9c5db199SXin Li        @param stdout_file: Log file.
579*9c5db199SXin Li        @param stderr_file: Log file.
580*9c5db199SXin Li        """
581*9c5db199SXin Li        if _is_uploaded(dir_entry):
582*9c5db199SXin Li            return
583*9c5db199SXin Li        start_time = time.time()
584*9c5db199SXin Li        metrics_fields = _get_metrics_fields(dir_entry)
585*9c5db199SXin Li        error_obj = _OffloadError(start_time)
586*9c5db199SXin Li        config = config_loader.load(dir_entry)
587*9c5db199SXin Li        if config:
588*9c5db199SXin Li            # TODO(linxinan): use credential file assigned by the side_effect
589*9c5db199SXin Li            # config.
590*9c5db199SXin Li            if config.google_storage.bucket:
591*9c5db199SXin Li                gs_prefix = ('' if
592*9c5db199SXin Li                             config.google_storage.bucket.startswith('gs://')
593*9c5db199SXin Li                             else 'gs://')
594*9c5db199SXin Li                self._gs_uri = gs_prefix + config.google_storage.bucket
595*9c5db199SXin Li        else:
596*9c5db199SXin Li            # For now, the absence of config does not block gs_offloader
597*9c5db199SXin Li            # from uploading files via default credential.
598*9c5db199SXin Li            logging.debug('Failed to load the side effects config in %s.',
599*9c5db199SXin Li                          dir_entry)
600*9c5db199SXin Li        try:
601*9c5db199SXin Li            sanitize_dir(dir_entry)
602*9c5db199SXin Li            if LIMIT_FILE_COUNT:
603*9c5db199SXin Li                limit_file_count(dir_entry)
604*9c5db199SXin Li
605*9c5db199SXin Li            process = None
606*9c5db199SXin Li            with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS):
607*9c5db199SXin Li                gs_path = '%s%s' % (self._gs_uri, dest_path)
608*9c5db199SXin Li                cmd = _get_cmd_list(self._multiprocessing, dir_entry, gs_path)
609*9c5db199SXin Li                logging.debug('Attempting an offload command %s', cmd)
610*9c5db199SXin Li                process = subprocess.Popen(
611*9c5db199SXin Li                    cmd, stdout=stdout_file, stderr=stderr_file)
612*9c5db199SXin Li                process.wait()
613*9c5db199SXin Li                logging.debug('Offload command %s completed; '
614*9c5db199SXin Li                              'marking offload complete.', cmd)
615*9c5db199SXin Li                _mark_upload_finished(gs_path, stdout_file, stderr_file)
616*9c5db199SXin Li
617*9c5db199SXin Li            _emit_gs_returncode_metric(process.returncode)
618*9c5db199SXin Li            if process.returncode != 0:
619*9c5db199SXin Li                raise error_obj
620*9c5db199SXin Li            _emit_offload_metrics(dir_entry)
621*9c5db199SXin Li
622*9c5db199SXin Li            if self._console_client:
623*9c5db199SXin Li                gcs_uri = os.path.join(gs_path,
624*9c5db199SXin Li                        os.path.basename(dir_entry))
625*9c5db199SXin Li                if not self._console_client.send_test_job_offloaded_message(
626*9c5db199SXin Li                        gcs_uri):
627*9c5db199SXin Li                    raise error_obj
628*9c5db199SXin Li
629*9c5db199SXin Li            _mark_uploaded(dir_entry)
630*9c5db199SXin Li        except timeout_util.TimeoutError:
631*9c5db199SXin Li            m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
632*9c5db199SXin Li            metrics.Counter(m_timeout).increment(fields=metrics_fields)
633*9c5db199SXin Li            # If we finished the call to Popen(), we may need to
634*9c5db199SXin Li            # terminate the child process.  We don't bother calling
635*9c5db199SXin Li            # process.poll(); that inherently races because the child
636*9c5db199SXin Li            # can die any time it wants.
637*9c5db199SXin Li            if process:
638*9c5db199SXin Li                try:
639*9c5db199SXin Li                    process.terminate()
640*9c5db199SXin Li                except OSError:
641*9c5db199SXin Li                    # We don't expect any error other than "No such
642*9c5db199SXin Li                    # process".
643*9c5db199SXin Li                    pass
644*9c5db199SXin Li            logging.error('Offloading %s timed out after waiting %d '
645*9c5db199SXin Li                          'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
646*9c5db199SXin Li            raise error_obj
647*9c5db199SXin Li
648*9c5db199SXin Li    def _prune(self, dir_entry, job_complete_time):
649*9c5db199SXin Li        """Prune directory if it is uploaded and expired.
650*9c5db199SXin Li
651*9c5db199SXin Li        @param dir_entry: Directory entry to offload.
652*9c5db199SXin Li        @param job_complete_time: The complete time of the job from the AFE
653*9c5db199SXin Li                                  database.
654*9c5db199SXin Li        """
655*9c5db199SXin Li        if not (_is_uploaded(dir_entry)
656*9c5db199SXin Li                and job_directories.is_job_expired(self._delete_age,
657*9c5db199SXin Li                                                   job_complete_time)):
658*9c5db199SXin Li            return
659*9c5db199SXin Li        try:
660*9c5db199SXin Li            logging.debug('Pruning uploaded directory %s', dir_entry)
661*9c5db199SXin Li            shutil.rmtree(dir_entry)
662*9c5db199SXin Li            job_timestamp_cache.delete(dir_entry)
663*9c5db199SXin Li        except OSError as e:
664*9c5db199SXin Li            # The wrong file permission can lead call `shutil.rmtree(dir_entry)`
665*9c5db199SXin Li            # to raise OSError with message 'Permission denied'. Details can be
666*9c5db199SXin Li            # found in crbug.com/536151
667*9c5db199SXin Li            _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
668*9c5db199SXin Li            # Try again after the permission issue is fixed.
669*9c5db199SXin Li            shutil.rmtree(dir_entry)
670*9c5db199SXin Li
671*9c5db199SXin Li    def _prune_swarming_req_dir(self, swarming_req_dir):
672*9c5db199SXin Li        """Prune swarming request directory, if it is empty.
673*9c5db199SXin Li
674*9c5db199SXin Li        @param swarming_req_dir: Directory entry of a swarming request.
675*9c5db199SXin Li        """
676*9c5db199SXin Li        try:
677*9c5db199SXin Li            logging.debug('Pruning swarming request directory %s',
678*9c5db199SXin Li                          swarming_req_dir)
679*9c5db199SXin Li            os.rmdir(swarming_req_dir)
680*9c5db199SXin Li        except OSError as e:
681*9c5db199SXin Li            # Do nothing and leave this directory to next attempt to remove.
682*9c5db199SXin Li            logging.debug('Failed to prune swarming request directory %s',
683*9c5db199SXin Li                          swarming_req_dir)
684*9c5db199SXin Li
685*9c5db199SXin Li
686*9c5db199SXin Liclass _OffloadError(Exception):
687*9c5db199SXin Li    """Google Storage offload failed."""
688*9c5db199SXin Li
689*9c5db199SXin Li    def __init__(self, start_time):
690*9c5db199SXin Li        super(_OffloadError, self).__init__(start_time)
691*9c5db199SXin Li        self.start_time = start_time
692*9c5db199SXin Li
693*9c5db199SXin Li
694*9c5db199SXin Li
695*9c5db199SXin Liclass FakeGSOffloader(BaseGSOffloader):
696*9c5db199SXin Li
697*9c5db199SXin Li    """Fake Google Storage Offloader that only deletes directories."""
698*9c5db199SXin Li
699*9c5db199SXin Li    def _full_offload(self, dir_entry, dest_path, job_complete_time):
700*9c5db199SXin Li        """Pretend to offload a directory and delete it.
701*9c5db199SXin Li
702*9c5db199SXin Li        @param dir_entry: Directory entry to offload.
703*9c5db199SXin Li        @param dest_path: Location in google storage where we will
704*9c5db199SXin Li                          offload the directory.
705*9c5db199SXin Li        @param job_complete_time: The complete time of the job from the AFE
706*9c5db199SXin Li                                  database.
707*9c5db199SXin Li        """
708*9c5db199SXin Li        shutil.rmtree(dir_entry)
709*9c5db199SXin Li
710*9c5db199SXin Li
711*9c5db199SXin Liclass OptionalMemoryCache(object):
712*9c5db199SXin Li    """Implements memory cache if cachetools module can be loaded.
713*9c5db199SXin Li
714*9c5db199SXin Li   If the platform has cachetools available then the cache will
715*9c5db199SXin Li   be created, otherwise the get calls will always act as if there
716*9c5db199SXin Li   was a cache miss and the set/delete will be no-ops.
717*9c5db199SXin Li   """
718*9c5db199SXin Li    cache = None
719*9c5db199SXin Li
720*9c5db199SXin Li    def setup(self, age_to_delete):
721*9c5db199SXin Li        """Set up a TTL cache size based on how long the job will be handled.
722*9c5db199SXin Li
723*9c5db199SXin Li       Autotest jobs are handled by gs_offloader until they are deleted from
724*9c5db199SXin Li       local storage, base the cache size on how long that is.
725*9c5db199SXin Li
726*9c5db199SXin Li       @param age_to_delete: Number of days after which items in the cache
727*9c5db199SXin Li                             should expire.
728*9c5db199SXin Li       """
729*9c5db199SXin Li        if cachetools:
730*9c5db199SXin Li            # Min cache is 1000 items for 10 mins. If the age to delete is 0
731*9c5db199SXin Li            # days you still want a short / small cache.
732*9c5db199SXin Li            # 2000 items is a good approximation for the max number of jobs a
733*9c5db199SXin Li            # moblab # can produce in a day, lab offloads immediatly so
734*9c5db199SXin Li            # the number of carried jobs should be very small in the normal
735*9c5db199SXin Li            # case.
736*9c5db199SXin Li            ttl = max(age_to_delete * 24 * 60 * 60, 600)
737*9c5db199SXin Li            maxsize = max(age_to_delete * 2000, 1000)
738*9c5db199SXin Li            job_timestamp_cache.cache = cachetools.TTLCache(maxsize=maxsize,
739*9c5db199SXin Li                                                            ttl=ttl)
740*9c5db199SXin Li
741*9c5db199SXin Li    def get(self, key):
742*9c5db199SXin Li        """If we have a cache try to retrieve from it."""
743*9c5db199SXin Li        if self.cache is not None:
744*9c5db199SXin Li            result = self.cache.get(key)
745*9c5db199SXin Li            return result
746*9c5db199SXin Li        return None
747*9c5db199SXin Li
748*9c5db199SXin Li    def add(self, key, value):
749*9c5db199SXin Li        """If we have a cache try to store key/value."""
750*9c5db199SXin Li        if self.cache is not None:
751*9c5db199SXin Li            self.cache[key] = value
752*9c5db199SXin Li
753*9c5db199SXin Li    def delete(self, key):
754*9c5db199SXin Li        """If we have a cache try to remove a key."""
755*9c5db199SXin Li        if self.cache is not None:
756*9c5db199SXin Li            return self.cache.delete(key)
757*9c5db199SXin Li
758*9c5db199SXin Li
759*9c5db199SXin Lijob_timestamp_cache = OptionalMemoryCache()
760*9c5db199SXin Li
761*9c5db199SXin Li
762*9c5db199SXin Lidef _cached_get_timestamp_if_finished(job):
763*9c5db199SXin Li    """Retrieve a job finished timestamp from cache or AFE.
764*9c5db199SXin Li    @param job       _JobDirectory instance to retrieve
765*9c5db199SXin Li                     finished timestamp of..
766*9c5db199SXin Li
767*9c5db199SXin Li    @returns: None if the job is not finished, or the
768*9c5db199SXin Li              last job finished time recorded by Autotest.
769*9c5db199SXin Li    """
770*9c5db199SXin Li    job_timestamp = job_timestamp_cache.get(job.dirname)
771*9c5db199SXin Li    if not job_timestamp:
772*9c5db199SXin Li        job_timestamp = job.get_timestamp_if_finished()
773*9c5db199SXin Li        if job_timestamp:
774*9c5db199SXin Li            job_timestamp_cache.add(job.dirname, job_timestamp)
775*9c5db199SXin Li    return job_timestamp
776*9c5db199SXin Li
777*9c5db199SXin Li
778*9c5db199SXin Lidef _is_expired(job, age_limit):
779*9c5db199SXin Li    """Return whether job directory is expired for uploading
780*9c5db199SXin Li
781*9c5db199SXin Li    @param job: _JobDirectory instance.
782*9c5db199SXin Li    @param age_limit:  Minimum age in days at which a job may be offloaded.
783*9c5db199SXin Li    """
784*9c5db199SXin Li    job_timestamp = _cached_get_timestamp_if_finished(job)
785*9c5db199SXin Li    if not job_timestamp:
786*9c5db199SXin Li        return False
787*9c5db199SXin Li    return job_directories.is_job_expired(age_limit, job_timestamp)
788*9c5db199SXin Li
789*9c5db199SXin Li
790*9c5db199SXin Lidef _emit_offload_metrics(dirpath):
791*9c5db199SXin Li    """Emit gs offload metrics.
792*9c5db199SXin Li
793*9c5db199SXin Li    @param dirpath: Offloaded directory path.
794*9c5db199SXin Li    """
795*9c5db199SXin Li    dir_size = file_utils.get_directory_size_kibibytes(dirpath)
796*9c5db199SXin Li    metrics_fields = _get_metrics_fields(dirpath)
797*9c5db199SXin Li
798*9c5db199SXin Li    m_offload_count = (
799*9c5db199SXin Li            'chromeos/autotest/gs_offloader/jobs_offloaded')
800*9c5db199SXin Li    metrics.Counter(m_offload_count).increment(
801*9c5db199SXin Li            fields=metrics_fields)
802*9c5db199SXin Li    m_offload_size = ('chromeos/autotest/gs_offloader/'
803*9c5db199SXin Li                      'kilobytes_transferred')
804*9c5db199SXin Li    metrics.Counter(m_offload_size).increment_by(
805*9c5db199SXin Li            dir_size, fields=metrics_fields)
806*9c5db199SXin Li
807*9c5db199SXin Li
808*9c5db199SXin Lidef _is_uploaded(dirpath):
809*9c5db199SXin Li    """Return whether directory has been uploaded.
810*9c5db199SXin Li
811*9c5db199SXin Li    @param dirpath: Directory path string.
812*9c5db199SXin Li    """
813*9c5db199SXin Li    return os.path.isfile(_get_uploaded_marker_file(dirpath))
814*9c5db199SXin Li
815*9c5db199SXin Li
816*9c5db199SXin Lidef _mark_uploaded(dirpath):
817*9c5db199SXin Li    """Mark directory as uploaded.
818*9c5db199SXin Li
819*9c5db199SXin Li    @param dirpath: Directory path string.
820*9c5db199SXin Li    """
821*9c5db199SXin Li    logging.debug('Creating uploaded marker for directory %s', dirpath)
822*9c5db199SXin Li    with open(_get_uploaded_marker_file(dirpath), 'a'):
823*9c5db199SXin Li        pass
824*9c5db199SXin Li
825*9c5db199SXin Li
826*9c5db199SXin Lidef _mark_upload_finished(gs_path, stdout_file, stderr_file):
827*9c5db199SXin Li    """Mark a given gs_path upload as finished (remotely).
828*9c5db199SXin Li
829*9c5db199SXin Li    @param gs_path: gs:// url of the remote directory that is finished
830*9c5db199SXin Li                    upload.
831*9c5db199SXin Li    """
832*9c5db199SXin Li    cmd = _get_finish_cmd_list(gs_path)
833*9c5db199SXin Li    process = subprocess.Popen(cmd, stdout=stdout_file, stderr=stderr_file)
834*9c5db199SXin Li    process.wait()
835*9c5db199SXin Li    logging.debug('Finished marking as complete %s', cmd)
836*9c5db199SXin Li
837*9c5db199SXin Li
838*9c5db199SXin Lidef _get_uploaded_marker_file(dirpath):
839*9c5db199SXin Li    """Return path to upload marker file for directory.
840*9c5db199SXin Li
841*9c5db199SXin Li    @param dirpath: Directory path string.
842*9c5db199SXin Li    """
843*9c5db199SXin Li    return '%s/.GS_UPLOADED' % (dirpath,)
844*9c5db199SXin Li
845*9c5db199SXin Li
846*9c5db199SXin Lidef _format_job_for_failure_reporting(job):
847*9c5db199SXin Li    """Formats a _JobDirectory for reporting / logging.
848*9c5db199SXin Li
849*9c5db199SXin Li    @param job: The _JobDirectory to format.
850*9c5db199SXin Li    """
851*9c5db199SXin Li    d = datetime.datetime.fromtimestamp(job.first_offload_start)
852*9c5db199SXin Li    data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT),
853*9c5db199SXin Li            job.offload_count,
854*9c5db199SXin Li            job.dirname)
855*9c5db199SXin Li    return FAILED_OFFLOADS_LINE_FORMAT % data
856*9c5db199SXin Li
857*9c5db199SXin Li
858*9c5db199SXin Lidef wait_for_gs_write_access(gs_uri):
859*9c5db199SXin Li    """Verify and wait until we have write access to Google Storage.
860*9c5db199SXin Li
861*9c5db199SXin Li    @param gs_uri: The Google Storage URI we are trying to offload to.
862*9c5db199SXin Li    """
863*9c5db199SXin Li    # TODO (sbasi) Try to use the gsutil command to check write access.
864*9c5db199SXin Li    # Ensure we have write access to gs_uri.
865*9c5db199SXin Li    dummy_file = tempfile.NamedTemporaryFile()
866*9c5db199SXin Li    test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri)
867*9c5db199SXin Li    while True:
868*9c5db199SXin Li        logging.debug('Checking for write access with dummy file %s',
869*9c5db199SXin Li                      dummy_file.name)
870*9c5db199SXin Li        try:
871*9c5db199SXin Li            subprocess.check_call(test_cmd)
872*9c5db199SXin Li            subprocess.check_call(
873*9c5db199SXin Li                    ['gsutil', 'rm',
874*9c5db199SXin Li                     os.path.join(gs_uri,
875*9c5db199SXin Li                                  os.path.basename(dummy_file.name))])
876*9c5db199SXin Li            break
877*9c5db199SXin Li        except subprocess.CalledProcessError:
878*9c5db199SXin Li            t = 120
879*9c5db199SXin Li            logging.debug('Unable to offload dummy file to %s, sleeping for %s '
880*9c5db199SXin Li                          'seconds.', gs_uri, t)
881*9c5db199SXin Li            time.sleep(t)
882*9c5db199SXin Li    logging.debug('Dummy file write check to gs succeeded.')
883*9c5db199SXin Li
884*9c5db199SXin Li
885*9c5db199SXin Liclass Offloader(object):
886*9c5db199SXin Li    """State of the offload process.
887*9c5db199SXin Li
888*9c5db199SXin Li    Contains the following member fields:
889*9c5db199SXin Li      * _gs_offloader:  _BaseGSOffloader to use to offload a job directory.
890*9c5db199SXin Li      * _jobdir_classes:  List of classes of job directory to be
891*9c5db199SXin Li        offloaded.
892*9c5db199SXin Li      * _processes:  Maximum number of outstanding offload processes
893*9c5db199SXin Li        to allow during an offload cycle.
894*9c5db199SXin Li      * _age_limit:  Minimum age in days at which a job may be
895*9c5db199SXin Li        offloaded.
896*9c5db199SXin Li      * _open_jobs: a dictionary mapping directory paths to Job
897*9c5db199SXin Li        objects.
898*9c5db199SXin Li    """
899*9c5db199SXin Li
900*9c5db199SXin Li    def __init__(self, options):
901*9c5db199SXin Li        self._upload_age_limit = options.age_to_upload
902*9c5db199SXin Li        self._delete_age_limit = options.age_to_delete
903*9c5db199SXin Li        if options.delete_only:
904*9c5db199SXin Li            self._gs_offloader = FakeGSOffloader()
905*9c5db199SXin Li        else:
906*9c5db199SXin Li            self.gs_uri = utils.get_offload_gsuri()
907*9c5db199SXin Li            logging.debug('Offloading to: %s', self.gs_uri)
908*9c5db199SXin Li            multiprocessing = False
909*9c5db199SXin Li            if options.multiprocessing:
910*9c5db199SXin Li                multiprocessing = True
911*9c5db199SXin Li            elif options.multiprocessing is None:
912*9c5db199SXin Li                multiprocessing = GS_OFFLOADER_MULTIPROCESSING
913*9c5db199SXin Li            logging.info(
914*9c5db199SXin Li                    'Offloader multiprocessing is set to:%r', multiprocessing)
915*9c5db199SXin Li            console_client = None
916*9c5db199SXin Li            if (cloud_console_client and
917*9c5db199SXin Li                    cloud_console_client.is_cloud_notification_enabled()):
918*9c5db199SXin Li                console_client = cloud_console_client.PubSubBasedClient()
919*9c5db199SXin Li            self._gs_offloader = GSOffloader(
920*9c5db199SXin Li                    self.gs_uri, multiprocessing, self._delete_age_limit,
921*9c5db199SXin Li                    console_client)
922*9c5db199SXin Li        classlist = [
923*9c5db199SXin Li                job_directories.SwarmingJobDirectory,
924*9c5db199SXin Li        ]
925*9c5db199SXin Li        if options.process_hosts_only or options.process_all:
926*9c5db199SXin Li            classlist.append(job_directories.SpecialJobDirectory)
927*9c5db199SXin Li        if not options.process_hosts_only:
928*9c5db199SXin Li            classlist.append(job_directories.RegularJobDirectory)
929*9c5db199SXin Li        self._jobdir_classes = classlist
930*9c5db199SXin Li        assert self._jobdir_classes
931*9c5db199SXin Li        self._processes = options.parallelism
932*9c5db199SXin Li        self._open_jobs = {}
933*9c5db199SXin Li        self._pusub_topic = None
934*9c5db199SXin Li        self._offload_count_limit = 3
935*9c5db199SXin Li
936*9c5db199SXin Li
937*9c5db199SXin Li    def _add_new_jobs(self):
938*9c5db199SXin Li        """Find new job directories that need offloading.
939*9c5db199SXin Li
940*9c5db199SXin Li        Go through the file system looking for valid job directories
941*9c5db199SXin Li        that are currently not in `self._open_jobs`, and add them in.
942*9c5db199SXin Li
943*9c5db199SXin Li        """
944*9c5db199SXin Li        new_job_count = 0
945*9c5db199SXin Li        for cls in self._jobdir_classes:
946*9c5db199SXin Li            for resultsdir in cls.get_job_directories():
947*9c5db199SXin Li                if resultsdir in self._open_jobs:
948*9c5db199SXin Li                    continue
949*9c5db199SXin Li                self._open_jobs[resultsdir] = cls(resultsdir)
950*9c5db199SXin Li                new_job_count += 1
951*9c5db199SXin Li        logging.debug('Start of offload cycle - found %d new jobs',
952*9c5db199SXin Li                      new_job_count)
953*9c5db199SXin Li
954*9c5db199SXin Li
955*9c5db199SXin Li    def _remove_offloaded_jobs(self):
956*9c5db199SXin Li        """Removed offloaded jobs from `self._open_jobs`."""
957*9c5db199SXin Li        removed_job_count = 0
958*9c5db199SXin Li        # must be list to make a copy of the dictionary to allow deletion
959*9c5db199SXin Li        # during iterations.
960*9c5db199SXin Li        for jobkey, job in list(six.iteritems(self._open_jobs)):
961*9c5db199SXin Li            if (
962*9c5db199SXin Li                    not os.path.exists(job.dirname)
963*9c5db199SXin Li                    or _is_uploaded(job.dirname)):
964*9c5db199SXin Li                del self._open_jobs[jobkey]
965*9c5db199SXin Li                removed_job_count += 1
966*9c5db199SXin Li        logging.debug('End of offload cycle - cleared %d jobs, '
967*9c5db199SXin Li                      'carrying %d open jobs',
968*9c5db199SXin Li                      removed_job_count, len(self._open_jobs))
969*9c5db199SXin Li
970*9c5db199SXin Li
971*9c5db199SXin Li    def _report_failed_jobs(self):
972*9c5db199SXin Li        """Report status after attempting offload.
973*9c5db199SXin Li
974*9c5db199SXin Li        This function processes all jobs in `self._open_jobs`, assuming
975*9c5db199SXin Li        an attempt has just been made to offload all of them.
976*9c5db199SXin Li
977*9c5db199SXin Li        If any jobs have reportable errors, and we haven't generated
978*9c5db199SXin Li        an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
979*9c5db199SXin Li        send new e-mail describing the failures.
980*9c5db199SXin Li
981*9c5db199SXin Li        """
982*9c5db199SXin Li        failed_jobs = [j for j in self._open_jobs.values() if
983*9c5db199SXin Li                       j.first_offload_start]
984*9c5db199SXin Li        self._report_failed_jobs_count(failed_jobs)
985*9c5db199SXin Li        self._log_failed_jobs_locally(failed_jobs)
986*9c5db199SXin Li
987*9c5db199SXin Li
988*9c5db199SXin Li    def offload_once(self):
989*9c5db199SXin Li        """Perform one offload cycle.
990*9c5db199SXin Li
991*9c5db199SXin Li        Find all job directories for new jobs that we haven't seen
992*9c5db199SXin Li        before.  Then, attempt to offload the directories for any
993*9c5db199SXin Li        jobs that have finished running.  Offload of multiple jobs
994*9c5db199SXin Li        is done in parallel, up to `self._processes` at a time.
995*9c5db199SXin Li
996*9c5db199SXin Li        After we've tried uploading all directories, go through the list
997*9c5db199SXin Li        checking the status of all uploaded directories.  If necessary,
998*9c5db199SXin Li        report failures via e-mail.
999*9c5db199SXin Li
1000*9c5db199SXin Li        """
1001*9c5db199SXin Li        self._add_new_jobs()
1002*9c5db199SXin Li        self._report_current_jobs_count()
1003*9c5db199SXin Li        with parallel.BackgroundTaskRunner(
1004*9c5db199SXin Li                self._gs_offloader.offload, processes=self._processes) as queue:
1005*9c5db199SXin Li            for job in self._open_jobs.values():
1006*9c5db199SXin Li                _enqueue_offload(job, queue, self._upload_age_limit)
1007*9c5db199SXin Li        self._give_up_on_jobs_over_limit()
1008*9c5db199SXin Li        self._remove_offloaded_jobs()
1009*9c5db199SXin Li        self._report_failed_jobs()
1010*9c5db199SXin Li
1011*9c5db199SXin Li
1012*9c5db199SXin Li    def _give_up_on_jobs_over_limit(self):
1013*9c5db199SXin Li        """Give up on jobs that have gone over the offload limit.
1014*9c5db199SXin Li
1015*9c5db199SXin Li        We mark them as uploaded as we won't try to offload them any more.
1016*9c5db199SXin Li        """
1017*9c5db199SXin Li        for job in self._open_jobs.values():
1018*9c5db199SXin Li            if job.offload_count >= self._offload_count_limit:
1019*9c5db199SXin Li                _mark_uploaded(job.dirname)
1020*9c5db199SXin Li
1021*9c5db199SXin Li
1022*9c5db199SXin Li    def _log_failed_jobs_locally(self, failed_jobs,
1023*9c5db199SXin Li                                 log_file=FAILED_OFFLOADS_FILE):
1024*9c5db199SXin Li        """Updates a local file listing all the failed jobs.
1025*9c5db199SXin Li
1026*9c5db199SXin Li        The dropped file can be used by the developers to list jobs that we have
1027*9c5db199SXin Li        failed to upload.
1028*9c5db199SXin Li
1029*9c5db199SXin Li        @param failed_jobs: A list of failed _JobDirectory objects.
1030*9c5db199SXin Li        @param log_file: The file to log the failed jobs to.
1031*9c5db199SXin Li        """
1032*9c5db199SXin Li        now = datetime.datetime.now()
1033*9c5db199SXin Li        now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT)
1034*9c5db199SXin Li        formatted_jobs = [_format_job_for_failure_reporting(job)
1035*9c5db199SXin Li                            for job in failed_jobs]
1036*9c5db199SXin Li        formatted_jobs.sort()
1037*9c5db199SXin Li
1038*9c5db199SXin Li        with open(log_file, 'w') as logfile:
1039*9c5db199SXin Li            logfile.write(FAILED_OFFLOADS_FILE_HEADER %
1040*9c5db199SXin Li                          (now_str, len(failed_jobs)))
1041*9c5db199SXin Li            logfile.writelines(formatted_jobs)
1042*9c5db199SXin Li
1043*9c5db199SXin Li
1044*9c5db199SXin Li    def _report_current_jobs_count(self):
1045*9c5db199SXin Li        """Report the number of outstanding jobs to monarch."""
1046*9c5db199SXin Li        metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set(
1047*9c5db199SXin Li                len(self._open_jobs))
1048*9c5db199SXin Li
1049*9c5db199SXin Li
1050*9c5db199SXin Li    def _report_failed_jobs_count(self, failed_jobs):
1051*9c5db199SXin Li        """Report the number of outstanding failed offload jobs to monarch.
1052*9c5db199SXin Li
1053*9c5db199SXin Li        @param: List of failed jobs.
1054*9c5db199SXin Li        """
1055*9c5db199SXin Li        metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set(
1056*9c5db199SXin Li                len(failed_jobs))
1057*9c5db199SXin Li
1058*9c5db199SXin Li
1059*9c5db199SXin Lidef _enqueue_offload(job, queue, age_limit):
1060*9c5db199SXin Li    """Enqueue the job for offload, if it's eligible.
1061*9c5db199SXin Li
1062*9c5db199SXin Li    The job is eligible for offloading if the database has marked
1063*9c5db199SXin Li    it finished, and the job is older than the `age_limit`
1064*9c5db199SXin Li    parameter.
1065*9c5db199SXin Li
1066*9c5db199SXin Li    If the job is eligible, offload processing is requested by
1067*9c5db199SXin Li    passing the `queue` parameter's `put()` method a sequence with
1068*9c5db199SXin Li    the job's `dirname` attribute and its directory name.
1069*9c5db199SXin Li
1070*9c5db199SXin Li    @param job       _JobDirectory instance to offload.
1071*9c5db199SXin Li    @param queue     If the job should be offloaded, put the offload
1072*9c5db199SXin Li                     parameters into this queue for processing.
1073*9c5db199SXin Li    @param age_limit Minimum age for a job to be offloaded.  A value
1074*9c5db199SXin Li                     of 0 means that the job will be offloaded as
1075*9c5db199SXin Li                     soon as it is finished.
1076*9c5db199SXin Li
1077*9c5db199SXin Li    """
1078*9c5db199SXin Li    if not job.offload_count:
1079*9c5db199SXin Li        if not _is_expired(job, age_limit):
1080*9c5db199SXin Li            return
1081*9c5db199SXin Li        job.first_offload_start = time.time()
1082*9c5db199SXin Li    job.offload_count += 1
1083*9c5db199SXin Li    if job.process_gs_instructions():
1084*9c5db199SXin Li        timestamp = _cached_get_timestamp_if_finished(job)
1085*9c5db199SXin Li        queue.put([job.dirname, os.path.dirname(job.dirname), timestamp])
1086*9c5db199SXin Li
1087*9c5db199SXin Li
1088*9c5db199SXin Lidef parse_options():
1089*9c5db199SXin Li    """Parse the args passed into gs_offloader."""
1090*9c5db199SXin Li    defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (
1091*9c5db199SXin Li            utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
1092*9c5db199SXin Li    usage = 'usage: %prog [options]\n' + defaults
1093*9c5db199SXin Li    parser = OptionParser(usage)
1094*9c5db199SXin Li    parser.add_option('-a', '--all', dest='process_all',
1095*9c5db199SXin Li                      action='store_true',
1096*9c5db199SXin Li                      help='Offload all files in the results directory.')
1097*9c5db199SXin Li    parser.add_option('-s', '--hosts', dest='process_hosts_only',
1098*9c5db199SXin Li                      action='store_true',
1099*9c5db199SXin Li                      help='Offload only the special tasks result files '
1100*9c5db199SXin Li                      'located in the results/hosts subdirectory')
1101*9c5db199SXin Li    parser.add_option('-p', '--parallelism', dest='parallelism',
1102*9c5db199SXin Li                      type='int', default=1,
1103*9c5db199SXin Li                      help='Number of parallel workers to use.')
1104*9c5db199SXin Li    parser.add_option('-o', '--delete_only', dest='delete_only',
1105*9c5db199SXin Li                      action='store_true',
1106*9c5db199SXin Li                      help='GS Offloader will only the delete the '
1107*9c5db199SXin Li                      'directories and will not offload them to google '
1108*9c5db199SXin Li                      'storage. NOTE: If global_config variable '
1109*9c5db199SXin Li                      'CROS.gs_offloading_enabled is False, --delete_only '
1110*9c5db199SXin Li                      'is automatically True.',
1111*9c5db199SXin Li                      default=not GS_OFFLOADING_ENABLED)
1112*9c5db199SXin Li    parser.add_option('-d', '--days_old', dest='days_old',
1113*9c5db199SXin Li                      help='Minimum job age in days before a result can be '
1114*9c5db199SXin Li                      'offloaded.', type='int', default=0)
1115*9c5db199SXin Li    parser.add_option('-l', '--log_size', dest='log_size',
1116*9c5db199SXin Li                      help='Limit the offloader logs to a specified '
1117*9c5db199SXin Li                      'number of Mega Bytes.', type='int', default=0)
1118*9c5db199SXin Li    parser.add_option('-m', dest='multiprocessing', action='store_true',
1119*9c5db199SXin Li                      help='Turn on -m option for gsutil. If not set, the '
1120*9c5db199SXin Li                      'global config setting gs_offloader_multiprocessing '
1121*9c5db199SXin Li                      'under CROS section is applied.')
1122*9c5db199SXin Li    parser.add_option('-i', '--offload_once', dest='offload_once',
1123*9c5db199SXin Li                      action='store_true',
1124*9c5db199SXin Li                      help='Upload all available results and then exit.')
1125*9c5db199SXin Li    parser.add_option('-y', '--normal_priority', dest='normal_priority',
1126*9c5db199SXin Li                      action='store_true',
1127*9c5db199SXin Li                      help='Upload using normal process priority.')
1128*9c5db199SXin Li    parser.add_option('-u', '--age_to_upload', dest='age_to_upload',
1129*9c5db199SXin Li                      help='Minimum job age in days before a result can be '
1130*9c5db199SXin Li                      'offloaded, but not removed from local storage',
1131*9c5db199SXin Li                      type='int', default=None)
1132*9c5db199SXin Li    parser.add_option('-n', '--age_to_delete', dest='age_to_delete',
1133*9c5db199SXin Li                      help='Minimum job age in days before a result can be '
1134*9c5db199SXin Li                      'removed from local storage',
1135*9c5db199SXin Li                      type='int', default=None)
1136*9c5db199SXin Li    parser.add_option(
1137*9c5db199SXin Li            '--metrics-file',
1138*9c5db199SXin Li            help='If provided, drop metrics to this local file instead of '
1139*9c5db199SXin Li                 'reporting to ts_mon',
1140*9c5db199SXin Li            type=str,
1141*9c5db199SXin Li            default=None,
1142*9c5db199SXin Li    )
1143*9c5db199SXin Li    parser.add_option('-t', '--enable_timestamp_cache',
1144*9c5db199SXin Li                      dest='enable_timestamp_cache',
1145*9c5db199SXin Li                      action='store_true',
1146*9c5db199SXin Li                      help='Cache the finished timestamps from AFE.')
1147*9c5db199SXin Li
1148*9c5db199SXin Li    options = parser.parse_args()[0]
1149*9c5db199SXin Li    if options.process_all and options.process_hosts_only:
1150*9c5db199SXin Li        parser.print_help()
1151*9c5db199SXin Li        print ('Cannot process all files and only the hosts '
1152*9c5db199SXin Li               'subdirectory. Please remove an argument.')
1153*9c5db199SXin Li        sys.exit(1)
1154*9c5db199SXin Li
1155*9c5db199SXin Li    if options.days_old and (options.age_to_upload or options.age_to_delete):
1156*9c5db199SXin Li        parser.print_help()
1157*9c5db199SXin Li        print('Use the days_old option or the age_to_* options but not both')
1158*9c5db199SXin Li        sys.exit(1)
1159*9c5db199SXin Li
1160*9c5db199SXin Li    if options.age_to_upload == None:
1161*9c5db199SXin Li        options.age_to_upload = options.days_old
1162*9c5db199SXin Li    if options.age_to_delete == None:
1163*9c5db199SXin Li        options.age_to_delete = options.days_old
1164*9c5db199SXin Li
1165*9c5db199SXin Li    return options
1166*9c5db199SXin Li
1167*9c5db199SXin Li
1168*9c5db199SXin Lidef main():
1169*9c5db199SXin Li    """Main method of gs_offloader."""
1170*9c5db199SXin Li    options = parse_options()
1171*9c5db199SXin Li
1172*9c5db199SXin Li    if options.process_all:
1173*9c5db199SXin Li        offloader_type = 'all'
1174*9c5db199SXin Li    elif options.process_hosts_only:
1175*9c5db199SXin Li        offloader_type = 'hosts'
1176*9c5db199SXin Li    else:
1177*9c5db199SXin Li        offloader_type = 'jobs'
1178*9c5db199SXin Li
1179*9c5db199SXin Li    _setup_logging(options, offloader_type)
1180*9c5db199SXin Li
1181*9c5db199SXin Li    if options.enable_timestamp_cache:
1182*9c5db199SXin Li        # Extend the cache expiry time by another 1% so the timstamps
1183*9c5db199SXin Li        # are available as the results are purged.
1184*9c5db199SXin Li        job_timestamp_cache.setup(options.age_to_delete * 1.01)
1185*9c5db199SXin Li
1186*9c5db199SXin Li    # Nice our process (carried to subprocesses) so we don't overload
1187*9c5db199SXin Li    # the system.
1188*9c5db199SXin Li    if not options.normal_priority:
1189*9c5db199SXin Li        logging.debug('Set process to nice value: %d', NICENESS)
1190*9c5db199SXin Li        os.nice(NICENESS)
1191*9c5db199SXin Li    if psutil:
1192*9c5db199SXin Li        proc = psutil.Process()
1193*9c5db199SXin Li        logging.debug('Set process to ionice IDLE')
1194*9c5db199SXin Li        proc.ionice(psutil.IOPRIO_CLASS_IDLE)
1195*9c5db199SXin Li
1196*9c5db199SXin Li    # os.listdir returns relative paths, so change to where we need to
1197*9c5db199SXin Li    # be to avoid an os.path.join on each loop.
1198*9c5db199SXin Li    logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
1199*9c5db199SXin Li    os.chdir(RESULTS_DIR)
1200*9c5db199SXin Li
1201*9c5db199SXin Li    service_name = 'gs_offloader(%s)' % offloader_type
1202*9c5db199SXin Li    with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True,
1203*9c5db199SXin Li                                             short_lived=False,
1204*9c5db199SXin Li                                             debug_file=options.metrics_file):
1205*9c5db199SXin Li        with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'):
1206*9c5db199SXin Li            offloader = Offloader(options)
1207*9c5db199SXin Li            if not options.delete_only:
1208*9c5db199SXin Li                wait_for_gs_write_access(offloader.gs_uri)
1209*9c5db199SXin Li            while True:
1210*9c5db199SXin Li                offloader.offload_once()
1211*9c5db199SXin Li                if options.offload_once:
1212*9c5db199SXin Li                    break
1213*9c5db199SXin Li                time.sleep(SLEEP_TIME_SECS)
1214*9c5db199SXin Li
1215*9c5db199SXin Li
1216*9c5db199SXin Li_LOG_LOCATION = '/usr/local/autotest/logs/'
1217*9c5db199SXin Li_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
1218*9c5db199SXin Li_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
1219*9c5db199SXin Li_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
1220*9c5db199SXin Li
1221*9c5db199SXin Li
1222*9c5db199SXin Lidef _setup_logging(options, offloader_type):
1223*9c5db199SXin Li    """Set up logging.
1224*9c5db199SXin Li
1225*9c5db199SXin Li    @param options: Parsed options.
1226*9c5db199SXin Li    @param offloader_type: Type of offloader action as string.
1227*9c5db199SXin Li    """
1228*9c5db199SXin Li    log_filename = _get_log_filename(options, offloader_type)
1229*9c5db199SXin Li    log_formatter = logging.Formatter(_LOGGING_FORMAT)
1230*9c5db199SXin Li    # Replace the default logging handler with a RotatingFileHandler. If
1231*9c5db199SXin Li    # options.log_size is 0, the file size will not be limited. Keeps
1232*9c5db199SXin Li    # one backup just in case.
1233*9c5db199SXin Li    handler = logging.handlers.RotatingFileHandler(
1234*9c5db199SXin Li            log_filename, maxBytes=1024 * options.log_size, backupCount=1)
1235*9c5db199SXin Li    handler.setFormatter(log_formatter)
1236*9c5db199SXin Li    logger = logging.getLogger()
1237*9c5db199SXin Li    logger.setLevel(logging.DEBUG)
1238*9c5db199SXin Li    logger.addHandler(handler)
1239*9c5db199SXin Li
1240*9c5db199SXin Li
1241*9c5db199SXin Lidef _get_log_filename(options, offloader_type):
1242*9c5db199SXin Li    """Get log filename.
1243*9c5db199SXin Li
1244*9c5db199SXin Li    @param options: Parsed options.
1245*9c5db199SXin Li    @param offloader_type: Type of offloader action as string.
1246*9c5db199SXin Li    """
1247*9c5db199SXin Li    if options.log_size > 0:
1248*9c5db199SXin Li        log_timestamp = ''
1249*9c5db199SXin Li    else:
1250*9c5db199SXin Li        log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT)
1251*9c5db199SXin Li    log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
1252*9c5db199SXin Li    return os.path.join(_LOG_LOCATION, log_basename)
1253*9c5db199SXin Li
1254*9c5db199SXin Li
1255*9c5db199SXin Liif __name__ == '__main__':
1256*9c5db199SXin Li    main()
1257