1# Copyright 2013 The ChromiumOS Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Steering stage unittest. 5 6Part of the Chrome build flags optimization. 7""" 8 9__author__ = "[email protected] (Yuheng Long)" 10 11import multiprocessing 12import unittest 13 14from generation import Generation 15from mock_task import IdentifierMockTask 16import pipeline_process 17import steering 18 19 20# Pick an integer at random. 21STEERING_TEST_STAGE = -8 22 23# The number of generations to be used to do the testing. 24NUMBER_OF_GENERATIONS = 20 25 26# The number of tasks to be put in each generation to be tested. 27NUMBER_OF_TASKS = 20 28 29# The stride of permutation used to shuffle the input list of tasks. Should be 30# relatively prime with NUMBER_OF_TASKS. 31STRIDE = 7 32 33 34class MockGeneration(Generation): 35 """This class emulates an actual generation. 36 37 It will output the next_generations when the method Next is called. The 38 next_generations is initiated when the MockGeneration instance is constructed. 39 """ 40 41 def __init__(self, tasks, next_generations): 42 """Set up the next generations for this task. 43 44 Args: 45 tasks: A set of tasks to be run. 46 next_generations: A list of generations as the next generation of the 47 current generation. 48 """ 49 Generation.__init__(self, tasks, None) 50 self._next_generations = next_generations 51 52 def Next(self, _): 53 return self._next_generations 54 55 def IsImproved(self): 56 if self._next_generations: 57 return True 58 return False 59 60 61class SteeringTest(unittest.TestCase): 62 """This class test the steering method. 63 64 The steering algorithm should return if there is no new task in the initial 65 generation. The steering algorithm should send all the tasks to the next stage 66 and should terminate once there is no pending generation. A generation is 67 pending if it contains pending task. A task is pending if its (test) result 68 is not ready. 69 """ 70 71 def testSteering(self): 72 """Test that the steering algorithm processes all the tasks properly. 73 74 Test that the steering algorithm sends all the tasks to the next stage. Test 75 that the steering algorithm terminates once all the tasks have been 76 processed, i.e., the results for the tasks are all ready. 77 """ 78 79 # A list of generations used to test the steering stage. 80 generations = [] 81 82 task_index = 0 83 previous_generations = None 84 85 # Generate a sequence of generations to be tested. Each generation will 86 # output the next generation in reverse order of the list when the "Next" 87 # method is called. 88 for _ in range(NUMBER_OF_GENERATIONS): 89 # Use a consecutive sequence of numbers as identifiers for the set of 90 # tasks put into a generation. 91 test_ranges = range(task_index, task_index + NUMBER_OF_TASKS) 92 tasks = [ 93 IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges 94 ] 95 steering_tasks = set(tasks) 96 97 # Let the previous generation as the offspring generation of the current 98 # generation. 99 current_generation = MockGeneration( 100 steering_tasks, previous_generations 101 ) 102 generations.insert(0, current_generation) 103 previous_generations = [current_generation] 104 105 task_index += NUMBER_OF_TASKS 106 107 # If there is no generation at all, the unittest returns right away. 108 if not current_generation: 109 return 110 111 # Set up the input and result queue for the steering method. 112 manager = multiprocessing.Manager() 113 input_queue = manager.Queue() 114 result_queue = manager.Queue() 115 116 steering_process = multiprocessing.Process( 117 target=steering.Steering, 118 args=(set(), [current_generation], input_queue, result_queue), 119 ) 120 steering_process.start() 121 122 # Test that each generation is processed properly. I.e., the generations are 123 # processed in order. 124 while generations: 125 generation = generations.pop(0) 126 tasks = [task for task in generation.Pool()] 127 128 # Test that all the tasks are processed once and only once. 129 while tasks: 130 task = result_queue.get() 131 132 assert task in tasks 133 tasks.remove(task) 134 135 input_queue.put(task) 136 137 task = result_queue.get() 138 139 # Test that the steering algorithm returns properly after processing all 140 # the generations. 141 assert task == pipeline_process.POISONPILL 142 143 steering_process.join() 144 145 def testCache(self): 146 """The steering algorithm returns immediately if there is no new tasks. 147 148 If all the new tasks have been cached before, the steering algorithm does 149 not have to execute these tasks again and thus can terminate right away. 150 """ 151 152 # Put a set of tasks in the cache and add this set to initial generation. 153 test_ranges = range(NUMBER_OF_TASKS) 154 tasks = [ 155 IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges 156 ] 157 steering_tasks = set(tasks) 158 159 current_generation = MockGeneration(steering_tasks, None) 160 161 # Set up the input and result queue for the steering method. 162 manager = multiprocessing.Manager() 163 input_queue = manager.Queue() 164 result_queue = manager.Queue() 165 166 steering_process = multiprocessing.Process( 167 target=steering.Steering, 168 args=( 169 steering_tasks, 170 [current_generation], 171 input_queue, 172 result_queue, 173 ), 174 ) 175 176 steering_process.start() 177 178 # Test that the steering method returns right away. 179 assert result_queue.get() == pipeline_process.POISONPILL 180 steering_process.join() 181 182 183if __name__ == "__main__": 184 unittest.main() 185