xref: /aosp_15_r20/external/toolchain-utils/bestflags/pipeline_process_test.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# Copyright 2013 The ChromiumOS Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Pipeline Process unittest.
5
6Part of the Chrome build flags optimization.
7"""
8
9__author__ = "[email protected] (Yuheng Long)"
10
11import multiprocessing
12import unittest
13
14from mock_task import MockTask
15import pipeline_process
16
17
18# Pick an integer at random.
19ERROR = -334
20# Pick an integer at random.
21TEST_STAGE = -8
22
23
24def MockHelper(stage, done_dict, helper_queue, _, result_queue):
25    """This method echos input to the output."""
26
27    assert stage == TEST_STAGE
28    while True:
29        if not helper_queue.empty():
30            task = helper_queue.get()
31            if task == pipeline_process.POISONPILL:
32                # Poison pill means shutdown
33                break
34
35            if task in done_dict:
36                # verify that it does not get duplicate "1"s in the test.
37                result_queue.put(ERROR)
38            else:
39                result_queue.put(("helper", task.GetIdentifier(TEST_STAGE)))
40
41
42def MockWorker(stage, task, _, result_queue):
43    assert stage == TEST_STAGE
44    result_queue.put(("worker", task.GetIdentifier(TEST_STAGE)))
45
46
47class PipelineProcessTest(unittest.TestCase):
48    """This class test the PipelineProcess.
49
50    All the task inserted into the input queue should be taken out and hand to the
51    actual pipeline handler, except for the POISON_PILL.  All these task should
52    also be passed to the next pipeline stage via the output queue.
53    """
54
55    def testRun(self):
56        """Test the run method.
57
58        Ensure that all the tasks inserted into the queue are properly handled.
59        """
60
61        manager = multiprocessing.Manager()
62        inp = manager.Queue()
63        output = manager.Queue()
64
65        process = pipeline_process.PipelineProcess(
66            2, "testing", {}, TEST_STAGE, inp, MockHelper, MockWorker, output
67        )
68
69        process.start()
70        inp.put(MockTask(TEST_STAGE, 1))
71        inp.put(MockTask(TEST_STAGE, 1))
72        inp.put(MockTask(TEST_STAGE, 2))
73        inp.put(pipeline_process.POISONPILL)
74        process.join()
75
76        # All tasks are processed once and only once.
77        result = [
78            ("worker", 1),
79            ("helper", 1),
80            ("worker", 2),
81            pipeline_process.POISONPILL,
82        ]
83        while result:
84            task = output.get()
85
86            # One "1"s is passed to the worker and one to the helper.
87            self.assertNotEqual(task, ERROR)
88
89            # The messages received should be exactly the same as the result.
90            self.assertTrue(task in result)
91            result.remove(task)
92
93
94if __name__ == "__main__":
95    unittest.main()
96