1# Copyright 2013 The ChromiumOS Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Pipeline process that encapsulates the actual content. 5 6Part of the Chrome build flags optimization. 7 8The actual stages include the builder and the executor. 9""" 10 11__author__ = "[email protected] (Yuheng Long)" 12 13import multiprocessing 14 15 16# Pick an integer at random. 17POISONPILL = 975 18 19 20class PipelineProcess(multiprocessing.Process): 21 """A process that encapsulates the actual content pipeline stage. 22 23 The actual pipeline stage can be the builder or the tester. This process 24 continuously pull tasks from the queue until a poison pill is received. 25 Once a job is received, it will hand it to the actual stage for processing. 26 27 Each pipeline stage contains three modules. 28 The first module continuously pulls task from the input queue. It searches the 29 cache to check whether the task has encountered before. If so, duplicate 30 computation can be avoided. 31 The second module consists of a pool of workers that do the actual work, e.g., 32 the worker will compile the source code and get the image in the builder 33 pipeline stage. 34 The third module is a helper that put the result cost to the cost field of the 35 duplicate tasks. For example, if two tasks are equivalent, only one task, say 36 t1 will be executed and the other task, say t2 will not be executed. The third 37 mode gets the result from t1, when it is available and set the cost of t2 to 38 be the same as that of t1. 39 """ 40 41 def __init__( 42 self, 43 num_processes, 44 name, 45 cache, 46 stage, 47 task_queue, 48 helper, 49 worker, 50 result_queue, 51 ): 52 """Set up input/output queue and the actual method to be called. 53 54 Args: 55 num_processes: Number of helpers subprocessors this stage has. 56 name: The name of this stage. 57 cache: The computed tasks encountered before. 58 stage: An int value that specifies the stage for this pipeline stage, for 59 example, build stage or test stage. This value will be used to retrieve 60 the keys in different stage. I.e., the flags set is the key in build 61 stage and the checksum is the key in the test stage. The key is used to 62 detect duplicates. 63 task_queue: The input task queue for this pipeline stage. 64 helper: The method hosted by the helper module to fill up the cost of the 65 duplicate tasks. 66 worker: The method hosted by the worker pools to do the actual work, e.g., 67 compile the image. 68 result_queue: The output task queue for this pipeline stage. 69 """ 70 71 multiprocessing.Process.__init__(self) 72 73 self._name = name 74 self._task_queue = task_queue 75 self._result_queue = result_queue 76 77 self._helper = helper 78 self._worker = worker 79 80 self._cache = cache 81 self._stage = stage 82 self._num_processes = num_processes 83 84 # the queues used by the modules for communication 85 manager = multiprocessing.Manager() 86 self._helper_queue = manager.Queue() 87 self._work_queue = manager.Queue() 88 89 def run(self): 90 """Busy pulling the next task from the queue for execution. 91 92 Once a job is pulled, this stage invokes the actual stage method and submits 93 the result to the next pipeline stage. 94 95 The process will terminate on receiving the poison pill from previous stage. 96 """ 97 98 # the worker pool 99 work_pool = multiprocessing.Pool(self._num_processes) 100 101 # the helper process 102 helper_process = multiprocessing.Process( 103 target=self._helper, 104 args=( 105 self._stage, 106 self._cache, 107 self._helper_queue, 108 self._work_queue, 109 self._result_queue, 110 ), 111 ) 112 helper_process.start() 113 mycache = self._cache.keys() 114 115 while True: 116 task = self._task_queue.get() 117 if task == POISONPILL: 118 # Poison pill means shutdown 119 self._result_queue.put(POISONPILL) 120 break 121 122 task_key = task.GetIdentifier(self._stage) 123 if task_key in mycache: 124 # The task has been encountered before. It will be sent to the helper 125 # module for further processing. 126 self._helper_queue.put(task) 127 else: 128 # Let the workers do the actual work. 129 work_pool.apply_async( 130 self._worker, 131 args=( 132 self._stage, 133 task, 134 self._work_queue, 135 self._result_queue, 136 ), 137 ) 138 mycache.append(task_key) 139 140 # Shutdown the workers pool and the helper process. 141 work_pool.close() 142 work_pool.join() 143 144 self._helper_queue.put(POISONPILL) 145 helper_process.join() 146