1# -*- coding: utf-8 -*- 2# Copyright 2011 The ChromiumOS Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""The experiment runner module.""" 7 8import getpass 9import os 10import shutil 11import time 12 13from cros_utils import command_executer 14from cros_utils import logger 15from cros_utils.email_sender import EmailSender 16from cros_utils.file_utils import FileUtils 17from experiment_status import ExperimentStatus 18import lock_machine 19from results_cache import CacheConditions 20from results_cache import ResultsCache 21from results_report import HTMLResultsReport 22from results_report import JSONResultsReport 23from results_report import TextResultsReport 24from schedv2 import Schedv2 25import test_flag 26 27import config 28 29 30def _WriteJSONReportToFile(experiment, results_dir, json_report): 31 """Writes a JSON report to a file in results_dir.""" 32 has_llvm = any("llvm" in l.compiler for l in experiment.labels) 33 compiler_string = "llvm" if has_llvm else "gcc" 34 board = experiment.labels[0].board 35 filename = "report_%s_%s_%s.%s.json" % ( 36 board, 37 json_report.date, 38 json_report.time.replace(":", "."), 39 compiler_string, 40 ) 41 fullname = os.path.join(results_dir, filename) 42 report_text = json_report.GetReport() 43 with open(fullname, "w") as out_file: 44 out_file.write(report_text) 45 46 47class ExperimentRunner(object): 48 """ExperimentRunner Class.""" 49 50 STATUS_TIME_DELAY = 30 51 THREAD_MONITOR_DELAY = 2 52 53 SUCCEEDED = 0 54 HAS_FAILURE = 1 55 ALL_FAILED = 2 56 57 def __init__( 58 self, 59 experiment, 60 json_report, 61 using_schedv2=False, 62 log=None, 63 cmd_exec=None, 64 ): 65 self._experiment = experiment 66 self.l = log or logger.GetLogger(experiment.log_dir) 67 self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l) 68 self._terminated = False 69 self.json_report = json_report 70 self.locked_machines = [] 71 if experiment.log_level != "verbose": 72 self.STATUS_TIME_DELAY = 10 73 74 # Setting this to True will use crosperf sched v2 (feature in progress). 75 self._using_schedv2 = using_schedv2 76 77 def _GetMachineList(self): 78 """Return a list of all requested machines. 79 80 Create a list of all the requested machines, both global requests and 81 label-specific requests, and return the list. 82 """ 83 machines = self._experiment.remote 84 # All Label.remote is a sublist of experiment.remote. 85 for l in self._experiment.labels: 86 for r in l.remote: 87 assert r in machines 88 return machines 89 90 def _UpdateMachineList(self, locked_machines): 91 """Update machines lists to contain only locked machines. 92 93 Go through all the lists of requested machines, both global and 94 label-specific requests, and remove any machine that we were not 95 able to lock. 96 97 Args: 98 locked_machines: A list of the machines we successfully locked. 99 """ 100 for m in self._experiment.remote: 101 if m not in locked_machines: 102 self._experiment.remote.remove(m) 103 104 for l in self._experiment.labels: 105 for m in l.remote: 106 if m not in locked_machines: 107 l.remote.remove(m) 108 109 def _GetMachineType(self, lock_mgr, machine): 110 """Get where is the machine from. 111 112 Returns: 113 The location of the machine: local or crosfleet 114 """ 115 # We assume that lab machine always starts with chromeos*, and local 116 # machines are ip address. 117 if "chromeos" in machine: 118 if lock_mgr.CheckMachineInCrosfleet(machine): 119 return "crosfleet" 120 else: 121 raise RuntimeError("Lab machine not in Crosfleet.") 122 return "local" 123 124 def _LockAllMachines(self, experiment): 125 """Attempt to globally lock all of the machines requested for run. 126 127 This method tries to lock all machines requested for this crosperf run 128 in three different modes automatically, to prevent any other crosperf runs 129 from being able to update/use the machines while this experiment is 130 running: 131 - Crosfleet machines: Use crosfleet lease-dut mechanism to lease 132 - Local machines: Use file lock mechanism to lock 133 """ 134 if test_flag.GetTestMode(): 135 self.locked_machines = self._GetMachineList() 136 experiment.locked_machines = self.locked_machines 137 else: 138 experiment.lock_mgr = lock_machine.LockManager( 139 self._GetMachineList(), 140 "", 141 experiment.labels[0].chromeos_root, 142 experiment.locks_dir, 143 log=self.l, 144 ) 145 for m in experiment.lock_mgr.machines: 146 machine_type = self._GetMachineType(experiment.lock_mgr, m) 147 if machine_type == "local": 148 experiment.lock_mgr.AddMachineToLocal(m) 149 elif machine_type == "crosfleet": 150 experiment.lock_mgr.AddMachineToCrosfleet(m) 151 machine_states = experiment.lock_mgr.GetMachineStates("lock") 152 experiment.lock_mgr.CheckMachineLocks(machine_states, "lock") 153 self.locked_machines = experiment.lock_mgr.UpdateMachines(True) 154 experiment.locked_machines = self.locked_machines 155 self._UpdateMachineList(self.locked_machines) 156 experiment.machine_manager.RemoveNonLockedMachines( 157 self.locked_machines 158 ) 159 if not self.locked_machines: 160 raise RuntimeError("Unable to lock any machines.") 161 162 def _ClearCacheEntries(self, experiment): 163 for br in experiment.benchmark_runs: 164 cache = ResultsCache() 165 cache.Init( 166 br.label.chromeos_image, 167 br.label.chromeos_root, 168 br.benchmark.test_name, 169 br.iteration, 170 br.test_args, 171 br.profiler_args, 172 br.machine_manager, 173 br.machine, 174 br.label.board, 175 br.cache_conditions, 176 br.logger(), 177 br.log_level, 178 br.label, 179 br.share_cache, 180 br.benchmark.suite, 181 br.benchmark.show_all_results, 182 br.benchmark.run_local, 183 br.benchmark.cwp_dso, 184 ) 185 cache_dir = cache.GetCacheDirForWrite() 186 if os.path.exists(cache_dir): 187 self.l.LogOutput("Removing cache dir: %s" % cache_dir) 188 shutil.rmtree(cache_dir) 189 190 def _Run(self, experiment): 191 try: 192 # We should not lease machines if tests are launched via `crosfleet 193 # create-test`. This is because leasing DUT in crosfleet will create a 194 # no-op task on the DUT and new test created will be hanging there. 195 # TODO(zhizhouy): Need to check whether machine is ready or not before 196 # assigning a test to it. 197 if not experiment.no_lock and not experiment.crosfleet: 198 self._LockAllMachines(experiment) 199 # Calculate all checksums of avaiable/locked machines, to ensure same 200 # label has same machines for testing 201 experiment.SetCheckSums(forceSameImage=True) 202 if self._using_schedv2: 203 schedv2 = Schedv2(experiment) 204 experiment.set_schedv2(schedv2) 205 if CacheConditions.FALSE in experiment.cache_conditions: 206 self._ClearCacheEntries(experiment) 207 status = ExperimentStatus(experiment) 208 experiment.Run() 209 last_status_time = 0 210 last_status_string = "" 211 try: 212 if experiment.log_level != "verbose": 213 self.l.LogStartDots() 214 while not experiment.IsComplete(): 215 if last_status_time + self.STATUS_TIME_DELAY < time.time(): 216 last_status_time = time.time() 217 border = "==============================" 218 if experiment.log_level == "verbose": 219 self.l.LogOutput(border) 220 self.l.LogOutput(status.GetProgressString()) 221 self.l.LogOutput(status.GetStatusString()) 222 self.l.LogOutput(border) 223 else: 224 current_status_string = status.GetStatusString() 225 if current_status_string != last_status_string: 226 self.l.LogEndDots() 227 self.l.LogOutput(border) 228 self.l.LogOutput(current_status_string) 229 self.l.LogOutput(border) 230 last_status_string = current_status_string 231 else: 232 self.l.LogAppendDot() 233 time.sleep(self.THREAD_MONITOR_DELAY) 234 except KeyboardInterrupt: 235 self._terminated = True 236 self.l.LogError("Ctrl-c pressed. Cleaning up...") 237 experiment.Terminate() 238 raise 239 except SystemExit: 240 self._terminated = True 241 self.l.LogError("Unexpected exit. Cleaning up...") 242 experiment.Terminate() 243 raise 244 finally: 245 experiment.Cleanup() 246 247 def _PrintTable(self, experiment): 248 self.l.LogOutput( 249 TextResultsReport.FromExperiment(experiment).GetReport() 250 ) 251 252 def _Email(self, experiment): 253 # Only email by default if a new run was completed. 254 send_mail = False 255 for benchmark_run in experiment.benchmark_runs: 256 if not benchmark_run.cache_hit: 257 send_mail = True 258 break 259 if ( 260 not send_mail 261 and not experiment.email_to 262 or config.GetConfig("no_email") 263 ): 264 return 265 266 label_names = [] 267 for label in experiment.labels: 268 label_names.append(label.name) 269 subject = "%s: %s" % (experiment.name, " vs. ".join(label_names)) 270 271 text_report = TextResultsReport.FromExperiment( 272 experiment, True 273 ).GetReport() 274 text_report += ( 275 "\nResults are stored in %s.\n" % experiment.results_directory 276 ) 277 text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report 278 html_report = HTMLResultsReport.FromExperiment(experiment).GetReport() 279 attachment = EmailSender.Attachment("report.html", html_report) 280 email_to = experiment.email_to or [] 281 email_to.append(getpass.getuser()) 282 EmailSender().SendEmail( 283 email_to, 284 subject, 285 text_report, 286 attachments=[attachment], 287 msg_type="html", 288 ) 289 290 def _StoreResults(self, experiment): 291 if self._terminated: 292 return self.ALL_FAILED 293 294 results_directory = experiment.results_directory 295 FileUtils().RmDir(results_directory) 296 FileUtils().MkDirP(results_directory) 297 self.l.LogOutput("Storing experiment file in %s." % results_directory) 298 experiment_file_path = os.path.join(results_directory, "experiment.exp") 299 FileUtils().WriteFile(experiment_file_path, experiment.experiment_file) 300 301 all_failed = True 302 303 topstats_file = os.path.join(results_directory, "topstats.log") 304 self.l.LogOutput( 305 "Storing top statistics of each benchmark run into %s." 306 % topstats_file 307 ) 308 # Track if any iterations for a given benchmark has passed for each 309 # label. 310 benchmarks_passes = {} 311 with open(topstats_file, "w") as top_fd: 312 for benchmark_run in experiment.benchmark_runs: 313 benchmarks_passes.setdefault( 314 benchmark_run.label.name, 315 {benchmark_run.benchmark.name: False}, 316 ) 317 if benchmark_run.result: 318 if not benchmark_run.result.retval: 319 all_failed = False 320 benchmarks_passes[benchmark_run.label.name][ 321 benchmark_run.benchmark.name 322 ] = True 323 # Header with benchmark run name. 324 top_fd.write("%s\n" % str(benchmark_run)) 325 # Formatted string with top statistics. 326 top_fd.write(benchmark_run.result.FormatStringTopCommands()) 327 top_fd.write("\n\n") 328 329 if all_failed: 330 return self.ALL_FAILED 331 # Set has_passes if atleast one iteration of all benchmarks has passed 332 # for every label. 333 has_passes = True 334 for benchmarks in benchmarks_passes.values(): 335 has_passes = has_passes and all(benchmarks.values()) 336 337 self.l.LogOutput("Storing results of each benchmark run.") 338 for benchmark_run in experiment.benchmark_runs: 339 if benchmark_run.result: 340 benchmark_run_name = "".join( 341 ch for ch in benchmark_run.name if ch.isalnum() 342 ) 343 benchmark_run_path = os.path.join( 344 results_directory, benchmark_run_name 345 ) 346 if experiment.compress_results: 347 benchmark_run.result.CompressResultsTo(benchmark_run_path) 348 else: 349 benchmark_run.result.CopyResultsTo(benchmark_run_path) 350 # Don't remove benchmark tmp if it was a cache hit. 351 benchmark_run.result.CleanUp( 352 benchmark_run.benchmark.rm_chroot_tmp 353 and not benchmark_run.cache_hit 354 ) 355 356 self.l.LogOutput("Storing results report in %s." % results_directory) 357 results_table_path = os.path.join(results_directory, "results.html") 358 report = HTMLResultsReport.FromExperiment(experiment).GetReport() 359 if self.json_report: 360 json_report = JSONResultsReport.FromExperiment( 361 experiment, json_args={"indent": 2} 362 ) 363 _WriteJSONReportToFile(experiment, results_directory, json_report) 364 365 FileUtils().WriteFile(results_table_path, report) 366 367 self.l.LogOutput( 368 "Storing email message body in %s." % results_directory 369 ) 370 msg_file_path = os.path.join(results_directory, "msg_body.html") 371 text_report = TextResultsReport.FromExperiment( 372 experiment, True 373 ).GetReport() 374 text_report += ( 375 "\nResults are stored in %s.\n" % experiment.results_directory 376 ) 377 msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report 378 FileUtils().WriteFile(msg_file_path, msg_body) 379 380 return self.SUCCEEDED if has_passes else self.HAS_FAILURE 381 382 def Run(self): 383 try: 384 self._Run(self._experiment) 385 finally: 386 # Always print the report at the end of the run. 387 self._PrintTable(self._experiment) 388 ret = self._StoreResults(self._experiment) 389 if ret != self.ALL_FAILED: 390 self._Email(self._experiment) 391 return ret 392 393 394class MockExperimentRunner(ExperimentRunner): 395 """Mocked ExperimentRunner for testing.""" 396 397 def __init__(self, experiment, json_report): 398 super(MockExperimentRunner, self).__init__(experiment, json_report) 399 400 def _Run(self, experiment): 401 self.l.LogOutput( 402 "Would run the following experiment: '%s'." % experiment.name 403 ) 404 405 def _PrintTable(self, experiment): 406 self.l.LogOutput("Would print the experiment table.") 407 408 def _Email(self, experiment): 409 self.l.LogOutput("Would send result email.") 410 411 def _StoreResults(self, experiment): 412 self.l.LogOutput("Would store the results.") 413