xref: /aosp_15_r20/external/toolchain-utils/crosperf/experiment.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# -*- coding: utf-8 -*-
2# Copyright 2013 The ChromiumOS Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""The experiment setting module."""
7
8
9import os
10from threading import Lock
11import time
12
13import benchmark_run
14from cros_utils import logger
15from cros_utils import misc
16from machine_manager import BadChecksum
17from machine_manager import MachineManager
18from machine_manager import MockMachineManager
19import test_flag
20
21
22class Experiment(object):
23    """Class representing an Experiment to be run."""
24
25    def __init__(
26        self,
27        name,
28        remote,
29        working_directory,
30        chromeos_root,
31        cache_conditions,
32        labels,
33        benchmarks,
34        experiment_file,
35        email_to,
36        acquire_timeout,
37        log_dir,
38        log_level,
39        share_cache,
40        results_directory,
41        compress_results,
42        locks_directory,
43        cwp_dso,
44        ignore_min_max,
45        crosfleet,
46        dut_config,
47        keep_stateful: bool,
48        no_lock: bool,
49    ):
50        self.name = name
51        self.working_directory = working_directory
52        self.remote = remote
53        self.chromeos_root = chromeos_root
54        self.cache_conditions = cache_conditions
55        self.experiment_file = experiment_file
56        self.email_to = email_to
57        if not results_directory:
58            self.results_directory = os.path.join(
59                self.working_directory, self.name + "_results"
60            )
61        else:
62            self.results_directory = misc.CanonicalizePath(results_directory)
63        self.compress_results = compress_results
64        self.log_dir = log_dir
65        self.log_level = log_level
66        self.labels = labels
67        self.benchmarks = benchmarks
68        self.num_complete = 0
69        self.num_run_complete = 0
70        self.share_cache = share_cache
71        self.active_threads = []
72        self.locks_dir = locks_directory
73        self.locked_machines = []
74        self.lock_mgr = None
75        self.cwp_dso = cwp_dso
76        self.ignore_min_max = ignore_min_max
77        self.crosfleet = crosfleet
78        self.no_lock = no_lock
79        self.l = logger.GetLogger(log_dir)
80
81        if not self.benchmarks:
82            raise RuntimeError("No benchmarks specified")
83        if not self.labels:
84            raise RuntimeError("No labels specified")
85        if not remote and not self.crosfleet:
86            raise RuntimeError("No remote hosts specified")
87
88        # We need one chromeos_root to run the benchmarks in, but it doesn't
89        # matter where it is, unless the ABIs are different.
90        if not chromeos_root:
91            for label in self.labels:
92                if label.chromeos_root:
93                    chromeos_root = label.chromeos_root
94                    break
95        if not chromeos_root:
96            raise RuntimeError(
97                "No chromeos_root given and could not determine "
98                "one from the image path."
99            )
100
101        machine_manager_fn = MachineManager
102        if test_flag.GetTestMode():
103            machine_manager_fn = MockMachineManager
104        self.machine_manager = machine_manager_fn(
105            chromeos_root,
106            acquire_timeout,
107            log_level,
108            locks_directory,
109            keep_stateful=keep_stateful,
110        )
111        self.l = logger.GetLogger(log_dir)
112
113        for machine in self.remote:
114            # machine_manager.AddMachine only adds reachable machines.
115            self.machine_manager.AddMachine(machine)
116        # Now machine_manager._all_machines contains a list of reachable
117        # machines. This is a subset of self.remote. We make both lists the same.
118        self.remote = [m.name for m in self.machine_manager.GetAllMachines()]
119        if not self.remote:
120            raise RuntimeError("No machine available for running experiment.")
121
122        # Initialize checksums for all machines, ignore errors at this time.
123        # The checksum will be double checked, and image will be flashed after
124        # duts are locked/leased.
125        self.SetCheckSums()
126
127        self.start_time = None
128        self.benchmark_runs = self._GenerateBenchmarkRuns(dut_config)
129
130        self._schedv2 = None
131        self._internal_counter_lock = Lock()
132
133    def set_schedv2(self, schedv2):
134        self._schedv2 = schedv2
135
136    def schedv2(self):
137        return self._schedv2
138
139    def _GenerateBenchmarkRuns(self, dut_config):
140        """Generate benchmark runs from labels and benchmark defintions."""
141        benchmark_runs = []
142        for label in self.labels:
143            for benchmark in self.benchmarks:
144                for iteration in range(1, benchmark.iterations + 1):
145                    benchmark_run_name = "%s: %s (%s)" % (
146                        label.name,
147                        benchmark.name,
148                        iteration,
149                    )
150                    full_name = "%s_%s_%s" % (
151                        label.name,
152                        benchmark.name,
153                        iteration,
154                    )
155                    logger_to_use = logger.Logger(
156                        self.log_dir, "run.%s" % (full_name), True
157                    )
158                    benchmark_runs.append(
159                        benchmark_run.BenchmarkRun(
160                            benchmark_run_name,
161                            benchmark,
162                            label,
163                            iteration,
164                            self.cache_conditions,
165                            self.machine_manager,
166                            logger_to_use,
167                            self.log_level,
168                            self.share_cache,
169                            dut_config,
170                        )
171                    )
172
173        return benchmark_runs
174
175    def SetCheckSums(self, forceSameImage=False):
176        for label in self.labels:
177            # We filter out label remotes that are not reachable (not in
178            # self.remote). So each label.remote is a sublist of experiment.remote.
179            label.remote = [r for r in label.remote if r in self.remote]
180            try:
181                self.machine_manager.ComputeCommonCheckSum(label)
182            except BadChecksum:
183                # Force same image on all machines, then we do checksum again. No
184                # bailout if checksums still do not match.
185                # TODO (zhizhouy): Need to figure out how flashing image will influence
186                # the new checksum.
187                if forceSameImage:
188                    self.machine_manager.ForceSameImageToAllMachines(label)
189                    self.machine_manager.ComputeCommonCheckSum(label)
190
191            self.machine_manager.ComputeCommonCheckSumString(label)
192
193    def Build(self):
194        pass
195
196    def Terminate(self):
197        if self._schedv2 is not None:
198            self._schedv2.terminate()
199        else:
200            for t in self.benchmark_runs:
201                if t.isAlive():
202                    self.l.LogError("Terminating run: '%s'." % t.name)
203                    t.Terminate()
204
205    def IsComplete(self):
206        if self._schedv2:
207            return self._schedv2.is_complete()
208        if self.active_threads:
209            for t in self.active_threads:
210                if t.isAlive():
211                    t.join(0)
212                if not t.isAlive():
213                    self.num_complete += 1
214                    if not t.cache_hit:
215                        self.num_run_complete += 1
216                    self.active_threads.remove(t)
217            return False
218        return True
219
220    def BenchmarkRunFinished(self, br):
221        """Update internal counters after br finishes.
222
223        Note this is only used by schedv2 and is called by multiple threads.
224        Never throw any exception here.
225        """
226
227        assert self._schedv2 is not None
228        with self._internal_counter_lock:
229            self.num_complete += 1
230            if not br.cache_hit:
231                self.num_run_complete += 1
232
233    def Run(self):
234        self.start_time = time.time()
235        if self._schedv2 is not None:
236            self._schedv2.run_sched()
237        else:
238            self.active_threads = []
239            for run in self.benchmark_runs:
240                # Set threads to daemon so program exits when ctrl-c is pressed.
241                run.daemon = True
242                run.start()
243                self.active_threads.append(run)
244
245    def SetCacheConditions(self, cache_conditions):
246        for run in self.benchmark_runs:
247            run.SetCacheConditions(cache_conditions)
248
249    def Cleanup(self):
250        """Make sure all machines are unlocked."""
251        if self.locks_dir:
252            # We are using the file locks mechanism, so call machine_manager.Cleanup
253            # to unlock everything.
254            self.machine_manager.Cleanup()
255
256        if test_flag.GetTestMode() or not self.locked_machines:
257            return
258
259        # If we locked any machines earlier, make sure we unlock them now.
260        if self.lock_mgr:
261            machine_states = self.lock_mgr.GetMachineStates("unlock")
262            self.lock_mgr.CheckMachineLocks(machine_states, "unlock")
263            unlocked_machines = self.lock_mgr.UpdateMachines(False)
264            failed_machines = [
265                m for m in self.locked_machines if m not in unlocked_machines
266            ]
267            if failed_machines:
268                raise RuntimeError(
269                    "These machines are not unlocked correctly: %s"
270                    % failed_machines
271                )
272            self.lock_mgr = None
273