xref: /aosp_15_r20/external/toolchain-utils/bestflags/pipeline_worker_test.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# Copyright 2013 The ChromiumOS Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Unittest for the pipeline_worker functions in the build/test stage.
5
6Part of the Chrome build flags optimization.
7
8This module tests the helper method and the worker method.
9"""
10
11__author__ = "[email protected] (Yuheng Long)"
12
13import multiprocessing
14import random
15import sys
16import unittest
17
18from mock_task import MockTask
19import pipeline_process
20import pipeline_worker
21
22
23# Pick an integer at random.
24TEST_STAGE = -3
25
26
27def MockTaskCostGenerator():
28    """Calls a random number generator and returns a negative number."""
29    return random.randint(-sys.maxint - 1, -1)
30
31
32class PipelineWorkerTest(unittest.TestCase):
33    """This class tests the pipeline_worker functions.
34
35    Given the same identifier, the cost should result the same from the
36    pipeline_worker functions.
37    """
38
39    def testHelper(self):
40        """ "Test the helper.
41
42        Call the helper method twice, and test the results. The results should be
43        the same, i.e., the cost should be the same.
44        """
45
46        # Set up the input, helper and output queue for the helper method.
47        manager = multiprocessing.Manager()
48        helper_queue = manager.Queue()
49        result_queue = manager.Queue()
50        completed_queue = manager.Queue()
51
52        # Set up the helper process that holds the helper method.
53        helper_process = multiprocessing.Process(
54            target=pipeline_worker.Helper,
55            args=(TEST_STAGE, {}, helper_queue, completed_queue, result_queue),
56        )
57        helper_process.start()
58
59        # A dictionary defines the mock result to the helper.
60        mock_result = {1: 1995, 2: 59, 9: 1027}
61
62        # Test if there is a task that is done before, whether the duplicate task
63        # will have the same result. Here, two different scenarios are tested. That
64        # is the mock results are added to the completed_queue before and after the
65        # corresponding mock tasks being added to the input queue.
66        completed_queue.put((9, mock_result[9]))
67
68        # The output of the helper should contain all the following tasks.
69        results = [1, 1, 2, 9]
70
71        # Testing the correctness of having tasks having the same identifier, here
72        # 1.
73        for result in results:
74            helper_queue.put(
75                MockTask(TEST_STAGE, result, MockTaskCostGenerator())
76            )
77
78        completed_queue.put((2, mock_result[2]))
79        completed_queue.put((1, mock_result[1]))
80
81        # Signal there is no more duplicate task.
82        helper_queue.put(pipeline_process.POISONPILL)
83        helper_process.join()
84
85        while results:
86            task = result_queue.get()
87            identifier = task.GetIdentifier(TEST_STAGE)
88            self.assertTrue(identifier in results)
89            if identifier in mock_result:
90                self.assertTrue(
91                    task.GetResult(TEST_STAGE), mock_result[identifier]
92                )
93            results.remove(identifier)
94
95    def testWorker(self):
96        """ "Test the worker method.
97
98        The worker should process all the input tasks and output the tasks to the
99        helper and result queue.
100        """
101
102        manager = multiprocessing.Manager()
103        result_queue = manager.Queue()
104        completed_queue = manager.Queue()
105
106        # A dictionary defines the mock tasks and their corresponding results.
107        mock_work_tasks = {1: 86, 2: 788}
108
109        mock_tasks = []
110
111        for flag, cost in mock_work_tasks.iteritems():
112            mock_tasks.append(MockTask(TEST_STAGE, flag, cost))
113
114        # Submit the mock tasks to the worker.
115        for mock_task in mock_tasks:
116            pipeline_worker.Worker(
117                TEST_STAGE, mock_task, completed_queue, result_queue
118            )
119
120        # The tasks, from the output queue, should be the same as the input and
121        # should be performed.
122        for task in mock_tasks:
123            output = result_queue.get()
124            self.assertEqual(output, task)
125            self.assertTrue(output.Done(TEST_STAGE))
126
127        # The tasks, from the completed_queue, should be defined in the
128        # mock_work_tasks dictionary.
129        for flag, cost in mock_work_tasks.iteritems():
130            helper_input = completed_queue.get()
131            self.assertEqual(helper_input, (flag, cost))
132
133
134if __name__ == "__main__":
135    unittest.main()
136