xref: /aosp_15_r20/external/autotest/utils/parallel.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Lifrom __future__ import absolute_import
3*9c5db199SXin Lifrom __future__ import division
4*9c5db199SXin Lifrom __future__ import print_function
5*9c5db199SXin Liimport os, sys
6*9c5db199SXin Liimport six
7*9c5db199SXin Lifrom six.moves import range
8*9c5db199SXin Li
9*9c5db199SXin Li
10*9c5db199SXin Liclass ParallelError(Exception):
11*9c5db199SXin Li    def __init__(self, str, errors):
12*9c5db199SXin Li        self.str = str
13*9c5db199SXin Li        self.errors = errors
14*9c5db199SXin Li        Exception.__init__(self, str)
15*9c5db199SXin Li
16*9c5db199SXin Li
17*9c5db199SXin Liclass ParallelExecute(object):
18*9c5db199SXin Li    def __init__(self, functions, max_simultaneous_procs=20):
19*9c5db199SXin Li        """\
20*9c5db199SXin Li        This takes in a dictionary of functions which map to a set of
21*9c5db199SXin Li        functions that they depend on.
22*9c5db199SXin Li
23*9c5db199SXin Li        functions: This is either a list of or dictionary of functions to
24*9c5db199SXin Li                   be run.  If it's a dictionary, the value should be a set
25*9c5db199SXin Li                   of other functions this function is dependent on.  If its
26*9c5db199SXin Li                   a list (or tuple or anything iterable that returns a
27*9c5db199SXin Li                   single element each iteration), then it's assumed that
28*9c5db199SXin Li                   there are no dependencies.
29*9c5db199SXin Li
30*9c5db199SXin Li        max_simultaneous_procs: Throttle the number of processes we have
31*9c5db199SXin Li                                running at once.
32*9c5db199SXin Li        """
33*9c5db199SXin Li        if not isinstance(functions, dict):
34*9c5db199SXin Li            function_list = functions
35*9c5db199SXin Li            functions = {}
36*9c5db199SXin Li            for fn in function_list:
37*9c5db199SXin Li                functions[fn] = set()
38*9c5db199SXin Li
39*9c5db199SXin Li        dependents = {}
40*9c5db199SXin Li        for fn, deps in six.iteritems(functions):
41*9c5db199SXin Li            dependents[fn] = []
42*9c5db199SXin Li        for fn, deps in six.iteritems(functions):
43*9c5db199SXin Li            for dep in deps:
44*9c5db199SXin Li                dependents[dep].append(fn)
45*9c5db199SXin Li
46*9c5db199SXin Li        self.max_procs = max_simultaneous_procs
47*9c5db199SXin Li        self.functions = functions
48*9c5db199SXin Li        self.dependents = dependents
49*9c5db199SXin Li        self.pid_map = {}
50*9c5db199SXin Li        self.ready_to_run = []
51*9c5db199SXin Li
52*9c5db199SXin Li
53*9c5db199SXin Li    def _run(self, function):
54*9c5db199SXin Li        self.functions.pop(function)
55*9c5db199SXin Li        pid = os.fork()
56*9c5db199SXin Li        if pid:
57*9c5db199SXin Li            self.pid_map[pid] = function
58*9c5db199SXin Li        else:
59*9c5db199SXin Li            function()
60*9c5db199SXin Li            sys.exit(0)
61*9c5db199SXin Li
62*9c5db199SXin Li
63*9c5db199SXin Li    def run_until_completion(self):
64*9c5db199SXin Li        for fn, deps in six.iteritems(self.functions):
65*9c5db199SXin Li            if len(deps) == 0:
66*9c5db199SXin Li                self.ready_to_run.append(fn)
67*9c5db199SXin Li
68*9c5db199SXin Li        errors = []
69*9c5db199SXin Li        while len(self.pid_map) > 0 or len(self.ready_to_run) > 0:
70*9c5db199SXin Li            max_allowed = self.max_procs - len(self.pid_map)
71*9c5db199SXin Li            max_able = len(self.ready_to_run)
72*9c5db199SXin Li            for i in range(min(max_allowed, max_able)):
73*9c5db199SXin Li                self._run(self.ready_to_run.pop())
74*9c5db199SXin Li
75*9c5db199SXin Li            # Handle one proc that's finished.
76*9c5db199SXin Li            pid, status = os.wait()
77*9c5db199SXin Li            fn = self.pid_map.pop(pid)
78*9c5db199SXin Li            if status != 0:
79*9c5db199SXin Li                errors.append("%s failed" % fn.__name__)
80*9c5db199SXin Li                continue
81*9c5db199SXin Li
82*9c5db199SXin Li            for dependent in self.dependents[fn]:
83*9c5db199SXin Li                self.functions[dependent].remove(fn)
84*9c5db199SXin Li                if len(self.functions[dependent]) == 0:
85*9c5db199SXin Li                    self.ready_to_run.append(dependent)
86*9c5db199SXin Li
87*9c5db199SXin Li        if len(self.functions) > 0 and len(errors) == 0:
88*9c5db199SXin Li            errors.append("Deadlock detected")
89*9c5db199SXin Li
90*9c5db199SXin Li        if len(errors) > 0:
91*9c5db199SXin Li            msg = "Errors occurred during execution:"
92*9c5db199SXin Li            msg = '\n'.join([msg] + errors)
93*9c5db199SXin Li            raise ParallelError(msg, errors)
94*9c5db199SXin Li
95*9c5db199SXin Li
96*9c5db199SXin Lidef redirect_io(log_file='/dev/null'):
97*9c5db199SXin Li    # Always redirect stdin.
98*9c5db199SXin Li    in_fd = os.open('/dev/null', os.O_RDONLY)
99*9c5db199SXin Li    try:
100*9c5db199SXin Li        os.dup2(in_fd, 0)
101*9c5db199SXin Li    finally:
102*9c5db199SXin Li        os.close(in_fd)
103*9c5db199SXin Li
104*9c5db199SXin Li    out_fd = os.open(log_file, os.O_WRONLY | os.O_CREAT)
105*9c5db199SXin Li    try:
106*9c5db199SXin Li        os.dup2(out_fd, 2)
107*9c5db199SXin Li        os.dup2(out_fd, 1)
108*9c5db199SXin Li    finally:
109*9c5db199SXin Li        os.close(out_fd)
110*9c5db199SXin Li
111*9c5db199SXin Li    sys.stdin = os.fdopen(0, 'r')
112*9c5db199SXin Li    sys.stdout = os.fdopen(1, 'w')
113*9c5db199SXin Li    sys.stderr = os.fdopen(2, 'w')
114