xref: /aosp_15_r20/external/autotest/server/subcommand.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2from __future__ import absolute_import
3from __future__ import division
4from __future__ import print_function
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
7
8import sys, os, signal, time, six.moves.cPickle, logging
9
10from autotest_lib.client.common_lib import error, utils
11from autotest_lib.client.common_lib.cros import retry
12from six.moves import zip
13
14
15# entry points that use subcommand must set this to their logging manager
16# to get log redirection for subcommands
17logging_manager_object = None
18
19
20def parallel(tasklist, timeout=None, return_results=False):
21    """
22    Run a set of predefined subcommands in parallel.
23
24    @param tasklist: A list of subcommand instances to execute.
25    @param timeout: Number of seconds after which the commands should timeout.
26    @param return_results: If True instead of an AutoServError being raised
27            on any error a list of the results|exceptions from the tasks is
28            returned.  [default: False]
29    """
30    run_error = False
31    for task in tasklist:
32        task.fork_start()
33
34    remaining_timeout = None
35    if timeout:
36        endtime = time.time() + timeout
37
38    results = []
39    for task in tasklist:
40        if timeout:
41            remaining_timeout = max(endtime - time.time(), 1)
42        try:
43            status = task.fork_waitfor(timeout=remaining_timeout)
44        except error.AutoservSubcommandError:
45            run_error = True
46        else:
47            if status != 0:
48                run_error = True
49
50        results.append(
51                six.moves.cPickle.load(task.result_pickle, encoding='utf-8'))
52        if hasattr(task.result_pickle, 'close'):
53            task.result_pickle.close()
54
55    if return_results:
56        return results
57    elif run_error:
58        message = 'One or more subcommands failed:\n'
59        for task, result in zip(tasklist, results):
60            message += 'task: %s returned/raised: %r\n' % (task, result)
61        raise error.AutoservError(message)
62
63
64def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x),
65                    log=True, timeout=None, return_results=False):
66    """
67    Each element in the arglist used to create a subcommand object,
68    where that arg is used both as a subdir name, and a single argument
69    to pass to "function".
70
71    We create a subcommand object for each element in the list,
72    then execute those subcommand objects in parallel.
73
74    NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
75
76    @param function: A callable to run in parallel once per arg in arglist.
77    @param arglist: A list of arguments to be used one per subcommand
78    @param subdir_name_constructor: A function that returns a name for the
79            result sub-directory created per subcommand.
80            Signature is:
81                subdir_name_constructor(arg)
82            where arg is the argument passed to function.
83    @param log: If True, output will be written to output in a subdirectory
84            named after each subcommand's arg.
85    @param timeout: Number of seconds after which the commands should timeout.
86    @param return_results: If True instead of an AutoServError being raised
87            on any error a list of the results|exceptions from the function
88            called on each arg is returned.  [default: False]
89
90    @returns None or a list of results/exceptions.
91    """
92    if not arglist:
93        logging.warning('parallel_simple was called with an empty arglist, '
94                        'did you forget to pass in a list of machines?')
95
96    # Bypass the multithreading if only one machine.
97    if len(arglist) == 1:
98        arg = arglist[0]
99        if return_results:
100            try:
101                result = function(arg)
102            except Exception as e:
103                return [e]
104            return [result]
105        else:
106            function(arg)
107            return
108
109    subcommands = []
110    for arg in arglist:
111        args = [arg]
112        subdir = subdir_name_constructor(arg) if log else None
113        subcommands.append(subcommand(function, args, subdir))
114    return parallel(subcommands, timeout, return_results=return_results)
115
116
117class subcommand(object):
118    fork_hooks, join_hooks = [], []
119
120    def __init__(self, func, args, subdir = None):
121        # func(args) - the subcommand to run
122        # subdir     - the subdirectory to log results in
123        if subdir:
124            self.subdir = os.path.abspath(subdir)
125            if not os.path.exists(self.subdir):
126                os.mkdir(self.subdir)
127            self.debug = os.path.join(self.subdir, 'debug')
128            if not os.path.exists(self.debug):
129                os.mkdir(self.debug)
130        else:
131            self.subdir = None
132            self.debug = None
133
134        self.func = func
135        self.args = args
136        self.pid = None
137        self.returncode = None
138
139
140    def __str__(self):
141        return str('subcommand(func=%s,  args=%s, subdir=%s)' %
142                   (self.func, self.args, self.subdir))
143
144
145    @classmethod
146    def register_fork_hook(cls, hook):
147        """ Register a function to be called from the child process after
148        forking. """
149        cls.fork_hooks.append(hook)
150
151
152    @classmethod
153    def register_join_hook(cls, hook):
154        """ Register a function to be called when from the child process
155        just before the child process terminates (joins to the parent). """
156        cls.join_hooks.append(hook)
157
158
159    def redirect_output(self):
160        if self.subdir and logging_manager_object:
161            tag = os.path.basename(self.subdir)
162            logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
163
164
165    def fork_start(self):
166        sys.stdout.flush()
167        sys.stderr.flush()
168        r, w = os.pipe()
169        self.returncode = None
170        self.pid = os.fork()
171
172        if self.pid:                            # I am the parent
173            os.close(w)
174            self.result_pickle = os.fdopen(r, 'rb')
175            return
176        else:
177            os.close(r)
178
179        # We are the child from this point on. Never return.
180        signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
181        if self.subdir:
182            os.chdir(self.subdir)
183        self.redirect_output()
184
185        try:
186            for hook in self.fork_hooks:
187                hook(self)
188            result = self.func(*self.args)
189            os.write(w, six.moves.cPickle.dumps(result, six.moves.cPickle.HIGHEST_PROTOCOL))
190            exit_code = 0
191        except Exception as e:
192            logging.exception('function failed')
193            exit_code = 1
194            os.write(w, six.moves.cPickle.dumps(e, six.moves.cPickle.HIGHEST_PROTOCOL))
195
196        os.close(w)
197
198        try:
199            for hook in self.join_hooks:
200                hook(self)
201        finally:
202            sys.stdout.flush()
203            sys.stderr.flush()
204            os._exit(exit_code)
205
206
207    def _handle_exitstatus(self, sts):
208        """
209        This is partially borrowed from subprocess.Popen.
210        """
211        if os.WIFSIGNALED(sts):
212            self.returncode = -os.WTERMSIG(sts)
213        elif os.WIFEXITED(sts):
214            self.returncode = os.WEXITSTATUS(sts)
215        else:
216            # Should never happen
217            raise RuntimeError("Unknown child exit status!")
218
219        if self.returncode != 0:
220            print("subcommand failed pid %d" % self.pid)
221            print("%s" % (self.func,))
222            print("rc=%d" % self.returncode)
223            print()
224            if self.debug:
225                stderr_file = os.path.join(self.debug, 'autoserv.stderr')
226                if os.path.exists(stderr_file):
227                    for line in open(stderr_file).readlines():
228                        print(line, end=' ')
229            print("\n--------------------------------------------\n")
230            raise error.AutoservSubcommandError(self.func, self.returncode)
231
232
233    def poll(self):
234        """
235        This is borrowed from subprocess.Popen.
236        """
237        if self.returncode is None:
238            try:
239                pid, sts = os.waitpid(self.pid, os.WNOHANG)
240                if pid == self.pid:
241                    self._handle_exitstatus(sts)
242            except os.error:
243                pass
244        return self.returncode
245
246
247    def wait(self):
248        """
249        This is borrowed from subprocess.Popen.
250        """
251        if self.returncode is None:
252            pid, sts = os.waitpid(self.pid, 0)
253            self._handle_exitstatus(sts)
254        return self.returncode
255
256
257    def fork_waitfor(self, timeout=None):
258        if not timeout:
259            return self.wait()
260        else:
261            _, result = retry.timeout(self.wait, timeout_sec=timeout)
262
263            if result is None:
264                utils.nuke_pid(self.pid)
265                print("subcommand failed pid %d" % self.pid)
266                print("%s" % (self.func,))
267                print("timeout after %ds" % timeout)
268                print()
269                result = self.wait()
270
271            return result
272