xref: /aosp_15_r20/external/toolchain-utils/bestflags/pipeline_process.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# Copyright 2013 The ChromiumOS Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Pipeline process that encapsulates the actual content.
5
6Part of the Chrome build flags optimization.
7
8The actual stages include the builder and the executor.
9"""
10
11__author__ = "[email protected] (Yuheng Long)"
12
13import multiprocessing
14
15
16# Pick an integer at random.
17POISONPILL = 975
18
19
20class PipelineProcess(multiprocessing.Process):
21    """A process that encapsulates the actual content pipeline stage.
22
23    The actual pipeline stage can be the builder or the tester.  This process
24    continuously pull tasks from the queue until a poison pill is received.
25    Once a job is received, it will hand it to the actual stage for processing.
26
27    Each pipeline stage contains three modules.
28    The first module continuously pulls task from the input queue. It searches the
29    cache to check whether the task has encountered before. If so, duplicate
30    computation can be avoided.
31    The second module consists of a pool of workers that do the actual work, e.g.,
32    the worker will compile the source code and get the image in the builder
33    pipeline stage.
34    The third module is a helper that put the result cost to the cost field of the
35    duplicate tasks. For example, if two tasks are equivalent, only one task, say
36    t1 will be executed and the other task, say t2 will not be executed. The third
37    mode gets the result from t1, when it is available and set the cost of t2 to
38    be the same as that of t1.
39    """
40
41    def __init__(
42        self,
43        num_processes,
44        name,
45        cache,
46        stage,
47        task_queue,
48        helper,
49        worker,
50        result_queue,
51    ):
52        """Set up input/output queue and the actual method to be called.
53
54        Args:
55          num_processes: Number of helpers subprocessors this stage has.
56          name: The name of this stage.
57          cache: The computed tasks encountered before.
58          stage: An int value that specifies the stage for this pipeline stage, for
59            example, build stage or test stage. This value will be used to retrieve
60            the keys in different stage. I.e., the flags set is the key in build
61            stage and the checksum is the key in the test stage. The key is used to
62            detect duplicates.
63          task_queue: The input task queue for this pipeline stage.
64          helper: The method hosted by the helper module to fill up the cost of the
65            duplicate tasks.
66          worker: The method hosted by the worker pools to do the actual work, e.g.,
67            compile the image.
68          result_queue: The output task queue for this pipeline stage.
69        """
70
71        multiprocessing.Process.__init__(self)
72
73        self._name = name
74        self._task_queue = task_queue
75        self._result_queue = result_queue
76
77        self._helper = helper
78        self._worker = worker
79
80        self._cache = cache
81        self._stage = stage
82        self._num_processes = num_processes
83
84        # the queues used by the modules for communication
85        manager = multiprocessing.Manager()
86        self._helper_queue = manager.Queue()
87        self._work_queue = manager.Queue()
88
89    def run(self):
90        """Busy pulling the next task from the queue for execution.
91
92        Once a job is pulled, this stage invokes the actual stage method and submits
93        the result to the next pipeline stage.
94
95        The process will terminate on receiving the poison pill from previous stage.
96        """
97
98        # the worker pool
99        work_pool = multiprocessing.Pool(self._num_processes)
100
101        # the helper process
102        helper_process = multiprocessing.Process(
103            target=self._helper,
104            args=(
105                self._stage,
106                self._cache,
107                self._helper_queue,
108                self._work_queue,
109                self._result_queue,
110            ),
111        )
112        helper_process.start()
113        mycache = self._cache.keys()
114
115        while True:
116            task = self._task_queue.get()
117            if task == POISONPILL:
118                # Poison pill means shutdown
119                self._result_queue.put(POISONPILL)
120                break
121
122            task_key = task.GetIdentifier(self._stage)
123            if task_key in mycache:
124                # The task has been encountered before. It will be sent to the helper
125                # module for further processing.
126                self._helper_queue.put(task)
127            else:
128                # Let the workers do the actual work.
129                work_pool.apply_async(
130                    self._worker,
131                    args=(
132                        self._stage,
133                        task,
134                        self._work_queue,
135                        self._result_queue,
136                    ),
137                )
138                mycache.append(task_key)
139
140        # Shutdown the workers pool and the helper process.
141        work_pool.close()
142        work_pool.join()
143
144        self._helper_queue.put(POISONPILL)
145        helper_process.join()
146