1*760c253cSXin Li# Copyright 2013 The ChromiumOS Authors 2*760c253cSXin Li# Use of this source code is governed by a BSD-style license that can be 3*760c253cSXin Li# found in the LICENSE file. 4*760c253cSXin Li"""The framework stage that produces the next generation of tasks to run. 5*760c253cSXin Li 6*760c253cSXin LiPart of the Chrome build flags optimization. 7*760c253cSXin Li""" 8*760c253cSXin Li 9*760c253cSXin Li__author__ = "[email protected] (Yuheng Long)" 10*760c253cSXin Li 11*760c253cSXin Liimport pipeline_process 12*760c253cSXin Li 13*760c253cSXin Li 14*760c253cSXin Lidef Steering(cache, generations, input_queue, result_queue): 15*760c253cSXin Li """The core method template that produces the next generation of tasks to run. 16*760c253cSXin Li 17*760c253cSXin Li This method waits for the results of the tasks from the previous generation. 18*760c253cSXin Li Upon the arrival of all these results, the method uses them to generate the 19*760c253cSXin Li next generation of tasks. 20*760c253cSXin Li 21*760c253cSXin Li The main logic of producing the next generation from previous generation is 22*760c253cSXin Li application specific. For example, in the genetic algorithm, a task is 23*760c253cSXin Li produced by combining two parents tasks, while in the hill climbing algorithm, 24*760c253cSXin Li a task is generated by its immediate neighbor. The method 'Next' is overridden 25*760c253cSXin Li in the concrete subclasses of the class Generation to produce the next 26*760c253cSXin Li application-specific generation. The steering method invokes the 'Next' 27*760c253cSXin Li method, produces the next generation and submits the tasks in this generation 28*760c253cSXin Li to the next stage, e.g., the build/compilation stage. 29*760c253cSXin Li 30*760c253cSXin Li Args: 31*760c253cSXin Li cache: It stores the experiments that have been conducted before. Used to 32*760c253cSXin Li avoid duplicate works. 33*760c253cSXin Li generations: The initial generations of tasks to be run. 34*760c253cSXin Li input_queue: The input results from the last stage of the framework. These 35*760c253cSXin Li results will trigger new iteration of the algorithm. 36*760c253cSXin Li result_queue: The output task queue for this pipeline stage. The new tasks 37*760c253cSXin Li generated by the steering algorithm will be sent to the next stage via 38*760c253cSXin Li this queue. 39*760c253cSXin Li """ 40*760c253cSXin Li 41*760c253cSXin Li # Generations that have pending tasks to be executed. Pending tasks are those 42*760c253cSXin Li # whose results are not ready. The tasks that have their results ready are 43*760c253cSXin Li # referenced to as ready tasks. Once there is no pending generation, the 44*760c253cSXin Li # algorithm terminates. 45*760c253cSXin Li waiting = generations 46*760c253cSXin Li 47*760c253cSXin Li # Record how many initial tasks there are. If there is no task at all, the 48*760c253cSXin Li # algorithm can terminate right away. 49*760c253cSXin Li num_tasks = 0 50*760c253cSXin Li 51*760c253cSXin Li # Submit all the tasks in the initial generations to the next stage of the 52*760c253cSXin Li # framework. The next stage can be the build/compilation stage. 53*760c253cSXin Li for generation in generations: 54*760c253cSXin Li # Only send the task that has not been performed before to the next stage. 55*760c253cSXin Li for task in [task for task in generation.Pool() if task not in cache]: 56*760c253cSXin Li result_queue.put(task) 57*760c253cSXin Li cache.add(task) 58*760c253cSXin Li num_tasks += 1 59*760c253cSXin Li 60*760c253cSXin Li # If there is no task to be executed at all, the algorithm returns right away. 61*760c253cSXin Li if not num_tasks: 62*760c253cSXin Li # Inform the next stage that there will be no more task. 63*760c253cSXin Li result_queue.put(pipeline_process.POISONPILL) 64*760c253cSXin Li return 65*760c253cSXin Li 66*760c253cSXin Li # The algorithm is done if there is no pending generation. A generation is 67*760c253cSXin Li # pending if it has pending task. 68*760c253cSXin Li while waiting: 69*760c253cSXin Li # Busy-waiting for the next task. 70*760c253cSXin Li if input_queue.empty(): 71*760c253cSXin Li continue 72*760c253cSXin Li 73*760c253cSXin Li # If there is a task whose result is ready from the last stage of the 74*760c253cSXin Li # feedback loop, there will be one less pending task. 75*760c253cSXin Li 76*760c253cSXin Li task = input_queue.get() 77*760c253cSXin Li 78*760c253cSXin Li # Store the result of this ready task. Intermediate results can be used to 79*760c253cSXin Li # generate report for final result or be used to reboot from a crash from 80*760c253cSXin Li # the failure of any module of the framework. 81*760c253cSXin Li task.LogSteeringCost() 82*760c253cSXin Li 83*760c253cSXin Li # Find out which pending generation this ready task belongs to. This pending 84*760c253cSXin Li # generation will have one less pending task. The "next" expression iterates 85*760c253cSXin Li # the generations in waiting until the first generation whose UpdateTask 86*760c253cSXin Li # method returns true. 87*760c253cSXin Li generation = next(gen for gen in waiting if gen.UpdateTask(task)) 88*760c253cSXin Li 89*760c253cSXin Li # If there is still any pending task, do nothing. 90*760c253cSXin Li if not generation.Done(): 91*760c253cSXin Li continue 92*760c253cSXin Li 93*760c253cSXin Li # All the tasks in the generation are finished. The generation is ready to 94*760c253cSXin Li # produce the next generation. 95*760c253cSXin Li waiting.remove(generation) 96*760c253cSXin Li 97*760c253cSXin Li # Check whether a generation should generate the next generation. 98*760c253cSXin Li # A generation may not generate the next generation, e.g., because a 99*760c253cSXin Li # fixpoint has been reached, there has not been any improvement for a few 100*760c253cSXin Li # generations or a local maxima is reached. 101*760c253cSXin Li if not generation.IsImproved(): 102*760c253cSXin Li continue 103*760c253cSXin Li 104*760c253cSXin Li for new_generation in generation.Next(cache): 105*760c253cSXin Li # Make sure that each generation should contain at least one task. 106*760c253cSXin Li assert new_generation.Pool() 107*760c253cSXin Li waiting.append(new_generation) 108*760c253cSXin Li 109*760c253cSXin Li # Send the tasks of the new generations to the next stage for execution. 110*760c253cSXin Li for new_task in new_generation.Pool(): 111*760c253cSXin Li result_queue.put(new_task) 112*760c253cSXin Li cache.add(new_task) 113*760c253cSXin Li 114*760c253cSXin Li # Steering algorithm is finished and it informs the next stage that there will 115*760c253cSXin Li # be no more task. 116*760c253cSXin Li result_queue.put(pipeline_process.POISONPILL) 117