xref: /aosp_15_r20/external/autotest/utils/tko_publish.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/python3
2"""
3This script will scan an autotest server results directory for job result
4directories that have completed and that have not yet been published on
5a remote dashboard server matching given filtering options and for those it
6finds it will rsync them to the tko server and mark them as published (it uses
7a <jobdir>/.tko_published flag file to determine if a jobdir results directory
8has been published yet).
9"""
10
11from __future__ import absolute_import
12from __future__ import division
13from __future__ import print_function
14
15import sys, os, re, optparse
16
17import common
18from autotest_lib.client.common_lib import utils
19from autotest_lib.server import frontend
20
21options = optparse.Values()
22
23USAGE="""tko-publish [options] <resultsdir> <rsync-destination-path>
24
25Where:
26<resultsdir>              A path to the directory having the job results
27                          directories to publish.
28
29<rsync-destination-path>  A valid rsync destination path where to upload the
30                          job result directories.
31                          Example: [email protected]:/home/autotest/results"""
32PUBLISH_FLAGFILE = '.tko_published'
33RSYNC_COMMAND = 'rsync -aqz "%s" "%s"'
34
35
36def get_job_dirs(path):
37    regex = re.compile('[1-9][0-9]*-')
38    jobdirs = []
39
40    for dir in os.listdir(path):
41        # skip directories not matching the job result dir pattern
42        if not regex.match(dir):
43            continue
44
45        dir = os.path.join(options.resultsdir, dir)
46        if (os.path.isdir(dir)
47                and not os.path.exists(os.path.join(dir, PUBLISH_FLAGFILE))):
48            jobdirs.append(dir)
49
50    return jobdirs
51
52
53def publish_job(jobdir):
54    cmd = RSYNC_COMMAND % (jobdir, options.dest)
55    utils.system(cmd)
56
57    # mark the jobdir as published
58    fd = open(os.path.join(jobdir, PUBLISH_FLAGFILE), 'w')
59    fd.close()
60    print('Published', jobdir)
61
62
63def main():
64    jobdirs = get_job_dirs(options.resultsdir)
65
66    afe = frontend.AFE()
67    # the way AFE API is right now is to give a whole list of jobs and can't
68    # get specific jobs so minimize the queries caching the result
69    finished_jobs = afe.get_jobs(finished=True)
70
71    if options.jobname_pattern:
72        jobname_pattern = re.compile(options.jobname_pattern)
73    else:
74        jobname_pattern = None
75
76    # for each unpublished possible jobdir find it in the database and see
77    # if it is completed
78    for jobdir in jobdirs:
79        job_id = int(os.path.basename(jobdir).split('-')[0])
80        job = [job for job in finished_jobs if job.id == job_id]
81
82        if len(job) != 1:
83            continue
84
85        if jobname_pattern:
86            # does it match the jobname pattern?
87            if not jobname_pattern.match(job[0].name):
88                continue
89
90        # does it match the wanted job owner
91        if options.job_owner and options.job_owner != job[0].owner:
92            continue
93
94        publish_job(jobdir)
95
96
97if __name__ == '__main__':
98    parser = optparse.OptionParser(usage=USAGE)
99    parser.add_option('--jobname-pattern', dest='jobname_pattern',
100                      help='Regexp pattern to match against job names, by '
101                      "default there won't be any matching done",
102                      default=None)
103    parser.add_option('--job-owner', dest='job_owner', default=None,
104                      help='Job owner username to match against for the '
105                      'published jobs, by default no matching is done.')
106    options, args = parser.parse_args()
107
108    if len(args) < 2:
109        print(USAGE)
110        sys.exit(-1)
111
112    options.resultsdir = args[0]
113    options.dest = args[1]
114    main()
115