xref: /aosp_15_r20/external/perfetto/python/generators/diff_tests/utils.py (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1#!/usr/bin/env python3
2# Copyright (C) 2023 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import sys
17import os
18import signal
19from typing import List
20import subprocess
21
22from google.protobuf import descriptor_pb2, message_factory, text_format
23
24from python.generators.diff_tests import testing
25
26USE_COLOR_CODES = sys.stderr.isatty()
27
28
29class ColorFormatter:
30
31  def __init__(self, no_colors: bool):
32    self.no_colors = no_colors
33
34  def __red_str(self) -> str:
35    return "\u001b[31m" if USE_COLOR_CODES and not self.no_colors else ""
36
37  def __green_str(self) -> str:
38    return "\u001b[32m" if USE_COLOR_CODES and not self.no_colors else ""
39
40  def __yellow_str(self) -> str:
41    return "\u001b[33m" if USE_COLOR_CODES and not self.no_colors else ""
42
43  def __end_color(self) -> str:
44    return "\u001b[0m" if USE_COLOR_CODES and not self.no_colors else ""
45
46  def red(self, s: str) -> str:
47    return self.__red_str() + s + self.__end_color()
48
49  def green(self, s: str) -> str:
50    return self.__green_str() + s + self.__end_color()
51
52  def yellow(self, s: str) -> str:
53    return self.__yellow_str() + s + self.__end_color()
54
55
56def get_env(root_dir):
57  env = {
58      'PERFETTO_BINARY_PATH': os.path.join(root_dir, 'test', 'data'),
59  }
60  if sys.platform.startswith('linux'):
61    env['PATH'] = os.path.join(root_dir, 'buildtools', 'linux64', 'clang',
62                               'bin')
63  elif sys.platform.startswith('darwin'):
64    # Sadly, on macOS we need to check out the Android deps to get
65    # llvm symbolizer.
66    env['PATH'] = os.path.join(root_dir, 'buildtools', 'ndk', 'toolchains',
67                               'llvm', 'prebuilt', 'darwin-x86_64', 'bin')
68  elif sys.platform.startswith('win32'):
69    env['PATH'] = os.path.join(root_dir, 'buildtools', 'win', 'clang', 'bin')
70  return env
71
72
73def ctrl_c_handler(_num, _frame):
74  # Send a sigkill to the whole process group. Our process group looks like:
75  # - Main python interpreter running the main()
76  #   - N python interpreters coming from ProcessPoolExecutor workers.
77  #     - 1 trace_processor_shell subprocess coming from the subprocess.Popen().
78  # We don't need any graceful termination as the diff tests are stateless and
79  # don't write any file. Just kill them all immediately.
80  os.killpg(os.getpid(), signal.SIGKILL)
81
82
83def create_message_factory(descriptor_file_paths, proto_type):
84  files = []
85  for file_path in descriptor_file_paths:
86    files.extend(read_descriptor(file_path).file)
87
88  # We use this method rather than working directly with DescriptorPool
89  # because, when the pure-Python protobuf runtime is used, extensions
90  # need to be explicitly registered with the message type. See
91  # https://github.com/protocolbuffers/protobuf/blob/9e09343a49e9e75be576b31ed7402bf8502b080c/python/google/protobuf/message_factory.py#L145
92  return message_factory.GetMessages(files)[proto_type]
93
94
95def create_metrics_message_factory(metrics_descriptor_paths):
96  return create_message_factory(metrics_descriptor_paths,
97                                'perfetto.protos.TraceMetrics')
98
99
100def read_descriptor(file_name):
101  with open(file_name, 'rb') as f:
102    contents = f.read()
103
104  descriptor = descriptor_pb2.FileDescriptorSet()
105  descriptor.MergeFromString(contents)
106
107  return descriptor
108
109
110def serialize_textproto_trace(trace_descriptor_path, extension_descriptor_paths,
111                              text_proto_path, out_stream):
112  proto = create_message_factory([trace_descriptor_path] +
113                                 extension_descriptor_paths,
114                                 'perfetto.protos.Trace')()
115
116  with open(text_proto_path, 'r') as text_proto_file:
117    text_format.Merge(text_proto_file.read(), proto)
118  out_stream.write(proto.SerializeToString())
119  out_stream.flush()
120
121
122def serialize_python_trace(root_dir, trace_descriptor_path, python_trace_path,
123                           out_stream):
124  python_cmd = [
125      'python3',
126      python_trace_path,
127      trace_descriptor_path,
128  ]
129
130  # Add the test dir to the PYTHONPATH to allow synth_common to be found.
131  env = os.environ.copy()
132  if 'PYTHONPATH' in env:
133    env['PYTHONPATH'] = "{}:{}".format(
134        os.path.join(root_dir, 'test'), env['PYTHONPATH'])
135  else:
136    env['PYTHONPATH'] = os.path.join(root_dir, 'test')
137  subprocess.check_call(python_cmd, env=env, stdout=out_stream)
138
139
140def get_trace_descriptor_path(out_path: str, trace_descriptor: str):
141  if trace_descriptor:
142    return trace_descriptor
143
144  path = ['gen', 'protos', 'perfetto', 'trace', 'trace.descriptor']
145  trace_descriptor_path = os.path.join(out_path, *path)
146  if not os.path.exists(trace_descriptor_path):
147    trace_descriptor_path = os.path.join(out_path, 'gcc_like_host', *path)
148
149  return trace_descriptor_path
150
151
152def modify_trace(trace_descriptor_path, extension_descriptor_paths,
153                 in_trace_path, out_trace_path, modifier):
154  trace_proto = create_message_factory([trace_descriptor_path] +
155                                       extension_descriptor_paths,
156                                       'perfetto.protos.Trace')()
157
158  with open(in_trace_path, "rb") as f:
159    # This may raise DecodeError when |in_trace_path| isn't protobuf.
160    trace_proto.ParseFromString(f.read())
161    # Modify the trace proto object with the provided modifier function.
162    modifier.inject(trace_proto)
163
164  with open(out_trace_path, "wb") as f:
165    f.write(trace_proto.SerializeToString())
166    f.flush()
167
168
169def read_all_tests(name_filter: str, root_dir: str) -> List[testing.TestCase]:
170  # Import
171  INCLUDE_PATH = os.path.join(root_dir, 'test', 'trace_processor', 'diff_tests')
172  sys.path.append(INCLUDE_PATH)
173  from include_index import fetch_all_diff_tests
174  sys.path.pop()
175  diff_tests = fetch_all_diff_tests(INCLUDE_PATH)
176
177  return [test for test in diff_tests if test.validate(name_filter)]
178