xref: /aosp_15_r20/external/toolchain-utils/bestflags/steering_test.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# Copyright 2013 The ChromiumOS Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Steering stage unittest.
5
6Part of the Chrome build flags optimization.
7"""
8
9__author__ = "[email protected] (Yuheng Long)"
10
11import multiprocessing
12import unittest
13
14from generation import Generation
15from mock_task import IdentifierMockTask
16import pipeline_process
17import steering
18
19
20# Pick an integer at random.
21STEERING_TEST_STAGE = -8
22
23# The number of generations to be used to do the testing.
24NUMBER_OF_GENERATIONS = 20
25
26# The number of tasks to be put in each generation to be tested.
27NUMBER_OF_TASKS = 20
28
29# The stride of permutation used to shuffle the input list of tasks. Should be
30# relatively prime with NUMBER_OF_TASKS.
31STRIDE = 7
32
33
34class MockGeneration(Generation):
35    """This class emulates an actual generation.
36
37    It will output the next_generations when the method Next is called. The
38    next_generations is initiated when the MockGeneration instance is constructed.
39    """
40
41    def __init__(self, tasks, next_generations):
42        """Set up the next generations for this task.
43
44        Args:
45          tasks: A set of tasks to be run.
46          next_generations: A list of generations as the next generation of the
47            current generation.
48        """
49        Generation.__init__(self, tasks, None)
50        self._next_generations = next_generations
51
52    def Next(self, _):
53        return self._next_generations
54
55    def IsImproved(self):
56        if self._next_generations:
57            return True
58        return False
59
60
61class SteeringTest(unittest.TestCase):
62    """This class test the steering method.
63
64    The steering algorithm should return if there is no new task in the initial
65    generation. The steering algorithm should send all the tasks to the next stage
66    and should terminate once there is no pending generation. A generation is
67    pending if it contains pending task. A task is pending if its (test) result
68    is not ready.
69    """
70
71    def testSteering(self):
72        """Test that the steering algorithm processes all the tasks properly.
73
74        Test that the steering algorithm sends all the tasks to the next stage. Test
75        that the steering algorithm terminates once all the tasks have been
76        processed, i.e., the results for the tasks are all ready.
77        """
78
79        # A list of generations used to test the steering stage.
80        generations = []
81
82        task_index = 0
83        previous_generations = None
84
85        # Generate a sequence of generations to be tested. Each generation will
86        # output the next generation in reverse order of the list when the "Next"
87        # method is called.
88        for _ in range(NUMBER_OF_GENERATIONS):
89            # Use a consecutive sequence of numbers as identifiers for the set of
90            # tasks put into a generation.
91            test_ranges = range(task_index, task_index + NUMBER_OF_TASKS)
92            tasks = [
93                IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges
94            ]
95            steering_tasks = set(tasks)
96
97            # Let the previous generation as the offspring generation of the current
98            # generation.
99            current_generation = MockGeneration(
100                steering_tasks, previous_generations
101            )
102            generations.insert(0, current_generation)
103            previous_generations = [current_generation]
104
105            task_index += NUMBER_OF_TASKS
106
107        # If there is no generation at all, the unittest returns right away.
108        if not current_generation:
109            return
110
111        # Set up the input and result queue for the steering method.
112        manager = multiprocessing.Manager()
113        input_queue = manager.Queue()
114        result_queue = manager.Queue()
115
116        steering_process = multiprocessing.Process(
117            target=steering.Steering,
118            args=(set(), [current_generation], input_queue, result_queue),
119        )
120        steering_process.start()
121
122        # Test that each generation is processed properly. I.e., the generations are
123        # processed in order.
124        while generations:
125            generation = generations.pop(0)
126            tasks = [task for task in generation.Pool()]
127
128            # Test that all the tasks are processed once and only once.
129            while tasks:
130                task = result_queue.get()
131
132                assert task in tasks
133                tasks.remove(task)
134
135                input_queue.put(task)
136
137        task = result_queue.get()
138
139        # Test that the steering algorithm returns properly after processing all
140        # the generations.
141        assert task == pipeline_process.POISONPILL
142
143        steering_process.join()
144
145    def testCache(self):
146        """The steering algorithm returns immediately if there is no new tasks.
147
148        If all the new tasks have been cached before, the steering algorithm does
149        not have to execute these tasks again and thus can terminate right away.
150        """
151
152        # Put a set of tasks in the cache and add this set to initial generation.
153        test_ranges = range(NUMBER_OF_TASKS)
154        tasks = [
155            IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges
156        ]
157        steering_tasks = set(tasks)
158
159        current_generation = MockGeneration(steering_tasks, None)
160
161        # Set up the input and result queue for the steering method.
162        manager = multiprocessing.Manager()
163        input_queue = manager.Queue()
164        result_queue = manager.Queue()
165
166        steering_process = multiprocessing.Process(
167            target=steering.Steering,
168            args=(
169                steering_tasks,
170                [current_generation],
171                input_queue,
172                result_queue,
173            ),
174        )
175
176        steering_process.start()
177
178        # Test that the steering method returns right away.
179        assert result_queue.get() == pipeline_process.POISONPILL
180        steering_process.join()
181
182
183if __name__ == "__main__":
184    unittest.main()
185