1# -*- coding: utf-8 -*- 2# Copyright 2013 The ChromiumOS Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""The experiment setting module.""" 7 8 9import os 10from threading import Lock 11import time 12 13import benchmark_run 14from cros_utils import logger 15from cros_utils import misc 16from machine_manager import BadChecksum 17from machine_manager import MachineManager 18from machine_manager import MockMachineManager 19import test_flag 20 21 22class Experiment(object): 23 """Class representing an Experiment to be run.""" 24 25 def __init__( 26 self, 27 name, 28 remote, 29 working_directory, 30 chromeos_root, 31 cache_conditions, 32 labels, 33 benchmarks, 34 experiment_file, 35 email_to, 36 acquire_timeout, 37 log_dir, 38 log_level, 39 share_cache, 40 results_directory, 41 compress_results, 42 locks_directory, 43 cwp_dso, 44 ignore_min_max, 45 crosfleet, 46 dut_config, 47 keep_stateful: bool, 48 no_lock: bool, 49 ): 50 self.name = name 51 self.working_directory = working_directory 52 self.remote = remote 53 self.chromeos_root = chromeos_root 54 self.cache_conditions = cache_conditions 55 self.experiment_file = experiment_file 56 self.email_to = email_to 57 if not results_directory: 58 self.results_directory = os.path.join( 59 self.working_directory, self.name + "_results" 60 ) 61 else: 62 self.results_directory = misc.CanonicalizePath(results_directory) 63 self.compress_results = compress_results 64 self.log_dir = log_dir 65 self.log_level = log_level 66 self.labels = labels 67 self.benchmarks = benchmarks 68 self.num_complete = 0 69 self.num_run_complete = 0 70 self.share_cache = share_cache 71 self.active_threads = [] 72 self.locks_dir = locks_directory 73 self.locked_machines = [] 74 self.lock_mgr = None 75 self.cwp_dso = cwp_dso 76 self.ignore_min_max = ignore_min_max 77 self.crosfleet = crosfleet 78 self.no_lock = no_lock 79 self.l = logger.GetLogger(log_dir) 80 81 if not self.benchmarks: 82 raise RuntimeError("No benchmarks specified") 83 if not self.labels: 84 raise RuntimeError("No labels specified") 85 if not remote and not self.crosfleet: 86 raise RuntimeError("No remote hosts specified") 87 88 # We need one chromeos_root to run the benchmarks in, but it doesn't 89 # matter where it is, unless the ABIs are different. 90 if not chromeos_root: 91 for label in self.labels: 92 if label.chromeos_root: 93 chromeos_root = label.chromeos_root 94 break 95 if not chromeos_root: 96 raise RuntimeError( 97 "No chromeos_root given and could not determine " 98 "one from the image path." 99 ) 100 101 machine_manager_fn = MachineManager 102 if test_flag.GetTestMode(): 103 machine_manager_fn = MockMachineManager 104 self.machine_manager = machine_manager_fn( 105 chromeos_root, 106 acquire_timeout, 107 log_level, 108 locks_directory, 109 keep_stateful=keep_stateful, 110 ) 111 self.l = logger.GetLogger(log_dir) 112 113 for machine in self.remote: 114 # machine_manager.AddMachine only adds reachable machines. 115 self.machine_manager.AddMachine(machine) 116 # Now machine_manager._all_machines contains a list of reachable 117 # machines. This is a subset of self.remote. We make both lists the same. 118 self.remote = [m.name for m in self.machine_manager.GetAllMachines()] 119 if not self.remote: 120 raise RuntimeError("No machine available for running experiment.") 121 122 # Initialize checksums for all machines, ignore errors at this time. 123 # The checksum will be double checked, and image will be flashed after 124 # duts are locked/leased. 125 self.SetCheckSums() 126 127 self.start_time = None 128 self.benchmark_runs = self._GenerateBenchmarkRuns(dut_config) 129 130 self._schedv2 = None 131 self._internal_counter_lock = Lock() 132 133 def set_schedv2(self, schedv2): 134 self._schedv2 = schedv2 135 136 def schedv2(self): 137 return self._schedv2 138 139 def _GenerateBenchmarkRuns(self, dut_config): 140 """Generate benchmark runs from labels and benchmark defintions.""" 141 benchmark_runs = [] 142 for label in self.labels: 143 for benchmark in self.benchmarks: 144 for iteration in range(1, benchmark.iterations + 1): 145 benchmark_run_name = "%s: %s (%s)" % ( 146 label.name, 147 benchmark.name, 148 iteration, 149 ) 150 full_name = "%s_%s_%s" % ( 151 label.name, 152 benchmark.name, 153 iteration, 154 ) 155 logger_to_use = logger.Logger( 156 self.log_dir, "run.%s" % (full_name), True 157 ) 158 benchmark_runs.append( 159 benchmark_run.BenchmarkRun( 160 benchmark_run_name, 161 benchmark, 162 label, 163 iteration, 164 self.cache_conditions, 165 self.machine_manager, 166 logger_to_use, 167 self.log_level, 168 self.share_cache, 169 dut_config, 170 ) 171 ) 172 173 return benchmark_runs 174 175 def SetCheckSums(self, forceSameImage=False): 176 for label in self.labels: 177 # We filter out label remotes that are not reachable (not in 178 # self.remote). So each label.remote is a sublist of experiment.remote. 179 label.remote = [r for r in label.remote if r in self.remote] 180 try: 181 self.machine_manager.ComputeCommonCheckSum(label) 182 except BadChecksum: 183 # Force same image on all machines, then we do checksum again. No 184 # bailout if checksums still do not match. 185 # TODO (zhizhouy): Need to figure out how flashing image will influence 186 # the new checksum. 187 if forceSameImage: 188 self.machine_manager.ForceSameImageToAllMachines(label) 189 self.machine_manager.ComputeCommonCheckSum(label) 190 191 self.machine_manager.ComputeCommonCheckSumString(label) 192 193 def Build(self): 194 pass 195 196 def Terminate(self): 197 if self._schedv2 is not None: 198 self._schedv2.terminate() 199 else: 200 for t in self.benchmark_runs: 201 if t.isAlive(): 202 self.l.LogError("Terminating run: '%s'." % t.name) 203 t.Terminate() 204 205 def IsComplete(self): 206 if self._schedv2: 207 return self._schedv2.is_complete() 208 if self.active_threads: 209 for t in self.active_threads: 210 if t.isAlive(): 211 t.join(0) 212 if not t.isAlive(): 213 self.num_complete += 1 214 if not t.cache_hit: 215 self.num_run_complete += 1 216 self.active_threads.remove(t) 217 return False 218 return True 219 220 def BenchmarkRunFinished(self, br): 221 """Update internal counters after br finishes. 222 223 Note this is only used by schedv2 and is called by multiple threads. 224 Never throw any exception here. 225 """ 226 227 assert self._schedv2 is not None 228 with self._internal_counter_lock: 229 self.num_complete += 1 230 if not br.cache_hit: 231 self.num_run_complete += 1 232 233 def Run(self): 234 self.start_time = time.time() 235 if self._schedv2 is not None: 236 self._schedv2.run_sched() 237 else: 238 self.active_threads = [] 239 for run in self.benchmark_runs: 240 # Set threads to daemon so program exits when ctrl-c is pressed. 241 run.daemon = True 242 run.start() 243 self.active_threads.append(run) 244 245 def SetCacheConditions(self, cache_conditions): 246 for run in self.benchmark_runs: 247 run.SetCacheConditions(cache_conditions) 248 249 def Cleanup(self): 250 """Make sure all machines are unlocked.""" 251 if self.locks_dir: 252 # We are using the file locks mechanism, so call machine_manager.Cleanup 253 # to unlock everything. 254 self.machine_manager.Cleanup() 255 256 if test_flag.GetTestMode() or not self.locked_machines: 257 return 258 259 # If we locked any machines earlier, make sure we unlock them now. 260 if self.lock_mgr: 261 machine_states = self.lock_mgr.GetMachineStates("unlock") 262 self.lock_mgr.CheckMachineLocks(machine_states, "unlock") 263 unlocked_machines = self.lock_mgr.UpdateMachines(False) 264 failed_machines = [ 265 m for m in self.locked_machines if m not in unlocked_machines 266 ] 267 if failed_machines: 268 raise RuntimeError( 269 "These machines are not unlocked correctly: %s" 270 % failed_machines 271 ) 272 self.lock_mgr = None 273