xref: /aosp_15_r20/external/google-benchmark/tools/gbench/util.py (revision dbb99499c3810fa1611fa2242a2fc446be01a57c)
1"""util.py - General utilities for running, loading, and processing benchmarks"""
2
3import json
4import os
5import re
6import subprocess
7import sys
8import tempfile
9
10# Input file type enumeration
11IT_Invalid = 0
12IT_JSON = 1
13IT_Executable = 2
14
15_num_magic_bytes = 2 if sys.platform.startswith("win") else 4
16
17
18def is_executable_file(filename):
19    """
20    Return 'True' if 'filename' names a valid file which is likely
21    an executable. A file is considered an executable if it starts with the
22    magic bytes for a EXE, Mach O, or ELF file.
23    """
24    if not os.path.isfile(filename):
25        return False
26    with open(filename, mode="rb") as f:
27        magic_bytes = f.read(_num_magic_bytes)
28    if sys.platform == "darwin":
29        return magic_bytes in [
30            b"\xfe\xed\xfa\xce",  # MH_MAGIC
31            b"\xce\xfa\xed\xfe",  # MH_CIGAM
32            b"\xfe\xed\xfa\xcf",  # MH_MAGIC_64
33            b"\xcf\xfa\xed\xfe",  # MH_CIGAM_64
34            b"\xca\xfe\xba\xbe",  # FAT_MAGIC
35            b"\xbe\xba\xfe\xca",  # FAT_CIGAM
36        ]
37    elif sys.platform.startswith("win"):
38        return magic_bytes == b"MZ"
39    else:
40        return magic_bytes == b"\x7fELF"
41
42
43def is_json_file(filename):
44    """
45    Returns 'True' if 'filename' names a valid JSON output file.
46    'False' otherwise.
47    """
48    try:
49        with open(filename, "r") as f:
50            json.load(f)
51        return True
52    except BaseException:
53        pass
54    return False
55
56
57def classify_input_file(filename):
58    """
59    Return a tuple (type, msg) where 'type' specifies the classified type
60    of 'filename'. If 'type' is 'IT_Invalid' then 'msg' is a human readable
61    string representing the error.
62    """
63    ftype = IT_Invalid
64    err_msg = None
65    if not os.path.exists(filename):
66        err_msg = "'%s' does not exist" % filename
67    elif not os.path.isfile(filename):
68        err_msg = "'%s' does not name a file" % filename
69    elif is_executable_file(filename):
70        ftype = IT_Executable
71    elif is_json_file(filename):
72        ftype = IT_JSON
73    else:
74        err_msg = (
75            "'%s' does not name a valid benchmark executable or JSON file"
76            % filename
77        )
78    return ftype, err_msg
79
80
81def check_input_file(filename):
82    """
83    Classify the file named by 'filename' and return the classification.
84    If the file is classified as 'IT_Invalid' print an error message and exit
85    the program.
86    """
87    ftype, msg = classify_input_file(filename)
88    if ftype == IT_Invalid:
89        print("Invalid input file: %s" % msg)
90        sys.exit(1)
91    return ftype
92
93
94def find_benchmark_flag(prefix, benchmark_flags):
95    """
96    Search the specified list of flags for a flag matching `<prefix><arg>` and
97    if it is found return the arg it specifies. If specified more than once the
98    last value is returned. If the flag is not found None is returned.
99    """
100    assert prefix.startswith("--") and prefix.endswith("=")
101    result = None
102    for f in benchmark_flags:
103        if f.startswith(prefix):
104            result = f[len(prefix) :]
105    return result
106
107
108def remove_benchmark_flags(prefix, benchmark_flags):
109    """
110    Return a new list containing the specified benchmark_flags except those
111    with the specified prefix.
112    """
113    assert prefix.startswith("--") and prefix.endswith("=")
114    return [f for f in benchmark_flags if not f.startswith(prefix)]
115
116
117def load_benchmark_results(fname, benchmark_filter):
118    """
119    Read benchmark output from a file and return the JSON object.
120
121    Apply benchmark_filter, a regular expression, with nearly the same
122    semantics of the --benchmark_filter argument.  May be None.
123    Note: the Python regular expression engine is used instead of the
124    one used by the C++ code, which may produce different results
125    in complex cases.
126
127    REQUIRES: 'fname' names a file containing JSON benchmark output.
128    """
129
130    def benchmark_wanted(benchmark):
131        if benchmark_filter is None:
132            return True
133        name = benchmark.get("run_name", None) or benchmark["name"]
134        return re.search(benchmark_filter, name) is not None
135
136    with open(fname, "r") as f:
137        results = json.load(f)
138        if "context" in results:
139            if "json_schema_version" in results["context"]:
140                json_schema_version = results["context"]["json_schema_version"]
141                if json_schema_version != 1:
142                    print(
143                        "In %s, got unnsupported JSON schema version: %i, expected 1"
144                        % (fname, json_schema_version)
145                    )
146                    sys.exit(1)
147        if "benchmarks" in results:
148            results["benchmarks"] = list(
149                filter(benchmark_wanted, results["benchmarks"])
150            )
151        return results
152
153
154def sort_benchmark_results(result):
155    benchmarks = result["benchmarks"]
156
157    # From inner key to the outer key!
158    benchmarks = sorted(
159        benchmarks,
160        key=lambda benchmark: benchmark["repetition_index"]
161        if "repetition_index" in benchmark
162        else -1,
163    )
164    benchmarks = sorted(
165        benchmarks,
166        key=lambda benchmark: 1
167        if "run_type" in benchmark and benchmark["run_type"] == "aggregate"
168        else 0,
169    )
170    benchmarks = sorted(
171        benchmarks,
172        key=lambda benchmark: benchmark["per_family_instance_index"]
173        if "per_family_instance_index" in benchmark
174        else -1,
175    )
176    benchmarks = sorted(
177        benchmarks,
178        key=lambda benchmark: benchmark["family_index"]
179        if "family_index" in benchmark
180        else -1,
181    )
182
183    result["benchmarks"] = benchmarks
184    return result
185
186
187def run_benchmark(exe_name, benchmark_flags):
188    """
189    Run a benchmark specified by 'exe_name' with the specified
190    'benchmark_flags'. The benchmark is run directly as a subprocess to preserve
191    real time console output.
192    RETURNS: A JSON object representing the benchmark output
193    """
194    output_name = find_benchmark_flag("--benchmark_out=", benchmark_flags)
195    is_temp_output = False
196    if output_name is None:
197        is_temp_output = True
198        thandle, output_name = tempfile.mkstemp()
199        os.close(thandle)
200        benchmark_flags = list(benchmark_flags) + [
201            "--benchmark_out=%s" % output_name
202        ]
203
204    cmd = [exe_name] + benchmark_flags
205    print("RUNNING: %s" % " ".join(cmd))
206    exitCode = subprocess.call(cmd)
207    if exitCode != 0:
208        print("TEST FAILED...")
209        sys.exit(exitCode)
210    json_res = load_benchmark_results(output_name, None)
211    if is_temp_output:
212        os.unlink(output_name)
213    return json_res
214
215
216def run_or_load_benchmark(filename, benchmark_flags):
217    """
218    Get the results for a specified benchmark. If 'filename' specifies
219    an executable benchmark then the results are generated by running the
220    benchmark. Otherwise 'filename' must name a valid JSON output file,
221    which is loaded and the result returned.
222    """
223    ftype = check_input_file(filename)
224    if ftype == IT_JSON:
225        benchmark_filter = find_benchmark_flag(
226            "--benchmark_filter=", benchmark_flags
227        )
228        return load_benchmark_results(filename, benchmark_filter)
229    if ftype == IT_Executable:
230        return run_benchmark(filename, benchmark_flags)
231    raise ValueError("Unknown file type %s" % ftype)
232