xref: /aosp_15_r20/external/toolchain-utils/crosperf/experiment_runner.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# -*- coding: utf-8 -*-
2# Copyright 2011 The ChromiumOS Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""The experiment runner module."""
7
8import getpass
9import os
10import shutil
11import time
12
13from cros_utils import command_executer
14from cros_utils import logger
15from cros_utils.email_sender import EmailSender
16from cros_utils.file_utils import FileUtils
17from experiment_status import ExperimentStatus
18import lock_machine
19from results_cache import CacheConditions
20from results_cache import ResultsCache
21from results_report import HTMLResultsReport
22from results_report import JSONResultsReport
23from results_report import TextResultsReport
24from schedv2 import Schedv2
25import test_flag
26
27import config
28
29
30def _WriteJSONReportToFile(experiment, results_dir, json_report):
31    """Writes a JSON report to a file in results_dir."""
32    has_llvm = any("llvm" in l.compiler for l in experiment.labels)
33    compiler_string = "llvm" if has_llvm else "gcc"
34    board = experiment.labels[0].board
35    filename = "report_%s_%s_%s.%s.json" % (
36        board,
37        json_report.date,
38        json_report.time.replace(":", "."),
39        compiler_string,
40    )
41    fullname = os.path.join(results_dir, filename)
42    report_text = json_report.GetReport()
43    with open(fullname, "w") as out_file:
44        out_file.write(report_text)
45
46
47class ExperimentRunner(object):
48    """ExperimentRunner Class."""
49
50    STATUS_TIME_DELAY = 30
51    THREAD_MONITOR_DELAY = 2
52
53    SUCCEEDED = 0
54    HAS_FAILURE = 1
55    ALL_FAILED = 2
56
57    def __init__(
58        self,
59        experiment,
60        json_report,
61        using_schedv2=False,
62        log=None,
63        cmd_exec=None,
64    ):
65        self._experiment = experiment
66        self.l = log or logger.GetLogger(experiment.log_dir)
67        self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l)
68        self._terminated = False
69        self.json_report = json_report
70        self.locked_machines = []
71        if experiment.log_level != "verbose":
72            self.STATUS_TIME_DELAY = 10
73
74        # Setting this to True will use crosperf sched v2 (feature in progress).
75        self._using_schedv2 = using_schedv2
76
77    def _GetMachineList(self):
78        """Return a list of all requested machines.
79
80        Create a list of all the requested machines, both global requests and
81        label-specific requests, and return the list.
82        """
83        machines = self._experiment.remote
84        # All Label.remote is a sublist of experiment.remote.
85        for l in self._experiment.labels:
86            for r in l.remote:
87                assert r in machines
88        return machines
89
90    def _UpdateMachineList(self, locked_machines):
91        """Update machines lists to contain only locked machines.
92
93        Go through all the lists of requested machines, both global and
94        label-specific requests, and remove any machine that we were not
95        able to lock.
96
97        Args:
98          locked_machines: A list of the machines we successfully locked.
99        """
100        for m in self._experiment.remote:
101            if m not in locked_machines:
102                self._experiment.remote.remove(m)
103
104        for l in self._experiment.labels:
105            for m in l.remote:
106                if m not in locked_machines:
107                    l.remote.remove(m)
108
109    def _GetMachineType(self, lock_mgr, machine):
110        """Get where is the machine from.
111
112        Returns:
113          The location of the machine: local or crosfleet
114        """
115        # We assume that lab machine always starts with chromeos*, and local
116        # machines are ip address.
117        if "chromeos" in machine:
118            if lock_mgr.CheckMachineInCrosfleet(machine):
119                return "crosfleet"
120            else:
121                raise RuntimeError("Lab machine not in Crosfleet.")
122        return "local"
123
124    def _LockAllMachines(self, experiment):
125        """Attempt to globally lock all of the machines requested for run.
126
127        This method tries to lock all machines requested for this crosperf run
128        in three different modes automatically, to prevent any other crosperf runs
129        from being able to update/use the machines while this experiment is
130        running:
131          - Crosfleet machines: Use crosfleet lease-dut mechanism to lease
132          - Local machines: Use file lock mechanism to lock
133        """
134        if test_flag.GetTestMode():
135            self.locked_machines = self._GetMachineList()
136            experiment.locked_machines = self.locked_machines
137        else:
138            experiment.lock_mgr = lock_machine.LockManager(
139                self._GetMachineList(),
140                "",
141                experiment.labels[0].chromeos_root,
142                experiment.locks_dir,
143                log=self.l,
144            )
145            for m in experiment.lock_mgr.machines:
146                machine_type = self._GetMachineType(experiment.lock_mgr, m)
147                if machine_type == "local":
148                    experiment.lock_mgr.AddMachineToLocal(m)
149                elif machine_type == "crosfleet":
150                    experiment.lock_mgr.AddMachineToCrosfleet(m)
151            machine_states = experiment.lock_mgr.GetMachineStates("lock")
152            experiment.lock_mgr.CheckMachineLocks(machine_states, "lock")
153            self.locked_machines = experiment.lock_mgr.UpdateMachines(True)
154            experiment.locked_machines = self.locked_machines
155            self._UpdateMachineList(self.locked_machines)
156            experiment.machine_manager.RemoveNonLockedMachines(
157                self.locked_machines
158            )
159            if not self.locked_machines:
160                raise RuntimeError("Unable to lock any machines.")
161
162    def _ClearCacheEntries(self, experiment):
163        for br in experiment.benchmark_runs:
164            cache = ResultsCache()
165            cache.Init(
166                br.label.chromeos_image,
167                br.label.chromeos_root,
168                br.benchmark.test_name,
169                br.iteration,
170                br.test_args,
171                br.profiler_args,
172                br.machine_manager,
173                br.machine,
174                br.label.board,
175                br.cache_conditions,
176                br.logger(),
177                br.log_level,
178                br.label,
179                br.share_cache,
180                br.benchmark.suite,
181                br.benchmark.show_all_results,
182                br.benchmark.run_local,
183                br.benchmark.cwp_dso,
184            )
185            cache_dir = cache.GetCacheDirForWrite()
186            if os.path.exists(cache_dir):
187                self.l.LogOutput("Removing cache dir: %s" % cache_dir)
188                shutil.rmtree(cache_dir)
189
190    def _Run(self, experiment):
191        try:
192            # We should not lease machines if tests are launched via `crosfleet
193            # create-test`. This is because leasing DUT in crosfleet will create a
194            # no-op task on the DUT and new test created will be hanging there.
195            # TODO(zhizhouy): Need to check whether machine is ready or not before
196            # assigning a test to it.
197            if not experiment.no_lock and not experiment.crosfleet:
198                self._LockAllMachines(experiment)
199            # Calculate all checksums of avaiable/locked machines, to ensure same
200            # label has same machines for testing
201            experiment.SetCheckSums(forceSameImage=True)
202            if self._using_schedv2:
203                schedv2 = Schedv2(experiment)
204                experiment.set_schedv2(schedv2)
205            if CacheConditions.FALSE in experiment.cache_conditions:
206                self._ClearCacheEntries(experiment)
207            status = ExperimentStatus(experiment)
208            experiment.Run()
209            last_status_time = 0
210            last_status_string = ""
211            try:
212                if experiment.log_level != "verbose":
213                    self.l.LogStartDots()
214                while not experiment.IsComplete():
215                    if last_status_time + self.STATUS_TIME_DELAY < time.time():
216                        last_status_time = time.time()
217                        border = "=============================="
218                        if experiment.log_level == "verbose":
219                            self.l.LogOutput(border)
220                            self.l.LogOutput(status.GetProgressString())
221                            self.l.LogOutput(status.GetStatusString())
222                            self.l.LogOutput(border)
223                        else:
224                            current_status_string = status.GetStatusString()
225                            if current_status_string != last_status_string:
226                                self.l.LogEndDots()
227                                self.l.LogOutput(border)
228                                self.l.LogOutput(current_status_string)
229                                self.l.LogOutput(border)
230                                last_status_string = current_status_string
231                            else:
232                                self.l.LogAppendDot()
233                    time.sleep(self.THREAD_MONITOR_DELAY)
234            except KeyboardInterrupt:
235                self._terminated = True
236                self.l.LogError("Ctrl-c pressed. Cleaning up...")
237                experiment.Terminate()
238                raise
239            except SystemExit:
240                self._terminated = True
241                self.l.LogError("Unexpected exit. Cleaning up...")
242                experiment.Terminate()
243                raise
244        finally:
245            experiment.Cleanup()
246
247    def _PrintTable(self, experiment):
248        self.l.LogOutput(
249            TextResultsReport.FromExperiment(experiment).GetReport()
250        )
251
252    def _Email(self, experiment):
253        # Only email by default if a new run was completed.
254        send_mail = False
255        for benchmark_run in experiment.benchmark_runs:
256            if not benchmark_run.cache_hit:
257                send_mail = True
258                break
259        if (
260            not send_mail
261            and not experiment.email_to
262            or config.GetConfig("no_email")
263        ):
264            return
265
266        label_names = []
267        for label in experiment.labels:
268            label_names.append(label.name)
269        subject = "%s: %s" % (experiment.name, " vs. ".join(label_names))
270
271        text_report = TextResultsReport.FromExperiment(
272            experiment, True
273        ).GetReport()
274        text_report += (
275            "\nResults are stored in %s.\n" % experiment.results_directory
276        )
277        text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report
278        html_report = HTMLResultsReport.FromExperiment(experiment).GetReport()
279        attachment = EmailSender.Attachment("report.html", html_report)
280        email_to = experiment.email_to or []
281        email_to.append(getpass.getuser())
282        EmailSender().SendEmail(
283            email_to,
284            subject,
285            text_report,
286            attachments=[attachment],
287            msg_type="html",
288        )
289
290    def _StoreResults(self, experiment):
291        if self._terminated:
292            return self.ALL_FAILED
293
294        results_directory = experiment.results_directory
295        FileUtils().RmDir(results_directory)
296        FileUtils().MkDirP(results_directory)
297        self.l.LogOutput("Storing experiment file in %s." % results_directory)
298        experiment_file_path = os.path.join(results_directory, "experiment.exp")
299        FileUtils().WriteFile(experiment_file_path, experiment.experiment_file)
300
301        all_failed = True
302
303        topstats_file = os.path.join(results_directory, "topstats.log")
304        self.l.LogOutput(
305            "Storing top statistics of each benchmark run into %s."
306            % topstats_file
307        )
308        # Track if any iterations for a given benchmark has passed for each
309        # label.
310        benchmarks_passes = {}
311        with open(topstats_file, "w") as top_fd:
312            for benchmark_run in experiment.benchmark_runs:
313                benchmarks_passes.setdefault(
314                    benchmark_run.label.name,
315                    {benchmark_run.benchmark.name: False},
316                )
317                if benchmark_run.result:
318                    if not benchmark_run.result.retval:
319                        all_failed = False
320                        benchmarks_passes[benchmark_run.label.name][
321                            benchmark_run.benchmark.name
322                        ] = True
323                    # Header with benchmark run name.
324                    top_fd.write("%s\n" % str(benchmark_run))
325                    # Formatted string with top statistics.
326                    top_fd.write(benchmark_run.result.FormatStringTopCommands())
327                    top_fd.write("\n\n")
328
329        if all_failed:
330            return self.ALL_FAILED
331        # Set has_passes if atleast one iteration of all benchmarks has passed
332        # for every label.
333        has_passes = True
334        for benchmarks in benchmarks_passes.values():
335            has_passes = has_passes and all(benchmarks.values())
336
337        self.l.LogOutput("Storing results of each benchmark run.")
338        for benchmark_run in experiment.benchmark_runs:
339            if benchmark_run.result:
340                benchmark_run_name = "".join(
341                    ch for ch in benchmark_run.name if ch.isalnum()
342                )
343                benchmark_run_path = os.path.join(
344                    results_directory, benchmark_run_name
345                )
346                if experiment.compress_results:
347                    benchmark_run.result.CompressResultsTo(benchmark_run_path)
348                else:
349                    benchmark_run.result.CopyResultsTo(benchmark_run_path)
350                # Don't remove benchmark tmp if it was a cache hit.
351                benchmark_run.result.CleanUp(
352                    benchmark_run.benchmark.rm_chroot_tmp
353                    and not benchmark_run.cache_hit
354                )
355
356        self.l.LogOutput("Storing results report in %s." % results_directory)
357        results_table_path = os.path.join(results_directory, "results.html")
358        report = HTMLResultsReport.FromExperiment(experiment).GetReport()
359        if self.json_report:
360            json_report = JSONResultsReport.FromExperiment(
361                experiment, json_args={"indent": 2}
362            )
363            _WriteJSONReportToFile(experiment, results_directory, json_report)
364
365        FileUtils().WriteFile(results_table_path, report)
366
367        self.l.LogOutput(
368            "Storing email message body in %s." % results_directory
369        )
370        msg_file_path = os.path.join(results_directory, "msg_body.html")
371        text_report = TextResultsReport.FromExperiment(
372            experiment, True
373        ).GetReport()
374        text_report += (
375            "\nResults are stored in %s.\n" % experiment.results_directory
376        )
377        msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report
378        FileUtils().WriteFile(msg_file_path, msg_body)
379
380        return self.SUCCEEDED if has_passes else self.HAS_FAILURE
381
382    def Run(self):
383        try:
384            self._Run(self._experiment)
385        finally:
386            # Always print the report at the end of the run.
387            self._PrintTable(self._experiment)
388            ret = self._StoreResults(self._experiment)
389            if ret != self.ALL_FAILED:
390                self._Email(self._experiment)
391        return ret
392
393
394class MockExperimentRunner(ExperimentRunner):
395    """Mocked ExperimentRunner for testing."""
396
397    def __init__(self, experiment, json_report):
398        super(MockExperimentRunner, self).__init__(experiment, json_report)
399
400    def _Run(self, experiment):
401        self.l.LogOutput(
402            "Would run the following experiment: '%s'." % experiment.name
403        )
404
405    def _PrintTable(self, experiment):
406        self.l.LogOutput("Would print the experiment table.")
407
408    def _Email(self, experiment):
409        self.l.LogOutput("Would send result email.")
410
411    def _StoreResults(self, experiment):
412        self.l.LogOutput("Would store the results.")
413