xref: /aosp_15_r20/external/autotest/utils/parallel.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2from __future__ import absolute_import
3from __future__ import division
4from __future__ import print_function
5import os, sys
6import six
7from six.moves import range
8
9
10class ParallelError(Exception):
11    def __init__(self, str, errors):
12        self.str = str
13        self.errors = errors
14        Exception.__init__(self, str)
15
16
17class ParallelExecute(object):
18    def __init__(self, functions, max_simultaneous_procs=20):
19        """\
20        This takes in a dictionary of functions which map to a set of
21        functions that they depend on.
22
23        functions: This is either a list of or dictionary of functions to
24                   be run.  If it's a dictionary, the value should be a set
25                   of other functions this function is dependent on.  If its
26                   a list (or tuple or anything iterable that returns a
27                   single element each iteration), then it's assumed that
28                   there are no dependencies.
29
30        max_simultaneous_procs: Throttle the number of processes we have
31                                running at once.
32        """
33        if not isinstance(functions, dict):
34            function_list = functions
35            functions = {}
36            for fn in function_list:
37                functions[fn] = set()
38
39        dependents = {}
40        for fn, deps in six.iteritems(functions):
41            dependents[fn] = []
42        for fn, deps in six.iteritems(functions):
43            for dep in deps:
44                dependents[dep].append(fn)
45
46        self.max_procs = max_simultaneous_procs
47        self.functions = functions
48        self.dependents = dependents
49        self.pid_map = {}
50        self.ready_to_run = []
51
52
53    def _run(self, function):
54        self.functions.pop(function)
55        pid = os.fork()
56        if pid:
57            self.pid_map[pid] = function
58        else:
59            function()
60            sys.exit(0)
61
62
63    def run_until_completion(self):
64        for fn, deps in six.iteritems(self.functions):
65            if len(deps) == 0:
66                self.ready_to_run.append(fn)
67
68        errors = []
69        while len(self.pid_map) > 0 or len(self.ready_to_run) > 0:
70            max_allowed = self.max_procs - len(self.pid_map)
71            max_able = len(self.ready_to_run)
72            for i in range(min(max_allowed, max_able)):
73                self._run(self.ready_to_run.pop())
74
75            # Handle one proc that's finished.
76            pid, status = os.wait()
77            fn = self.pid_map.pop(pid)
78            if status != 0:
79                errors.append("%s failed" % fn.__name__)
80                continue
81
82            for dependent in self.dependents[fn]:
83                self.functions[dependent].remove(fn)
84                if len(self.functions[dependent]) == 0:
85                    self.ready_to_run.append(dependent)
86
87        if len(self.functions) > 0 and len(errors) == 0:
88            errors.append("Deadlock detected")
89
90        if len(errors) > 0:
91            msg = "Errors occurred during execution:"
92            msg = '\n'.join([msg] + errors)
93            raise ParallelError(msg, errors)
94
95
96def redirect_io(log_file='/dev/null'):
97    # Always redirect stdin.
98    in_fd = os.open('/dev/null', os.O_RDONLY)
99    try:
100        os.dup2(in_fd, 0)
101    finally:
102        os.close(in_fd)
103
104    out_fd = os.open(log_file, os.O_WRONLY | os.O_CREAT)
105    try:
106        os.dup2(out_fd, 2)
107        os.dup2(out_fd, 1)
108    finally:
109        os.close(out_fd)
110
111    sys.stdin = os.fdopen(0, 'r')
112    sys.stdout = os.fdopen(1, 'w')
113    sys.stderr = os.fdopen(2, 'w')
114