xref: /aosp_15_r20/external/perfetto/python/perfetto/trace_uri_resolver/resolver.py (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1# Copyright (C) 2022 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import dataclasses as dc
16import enum
17from typing import BinaryIO, Dict, Generator, List, Type, Union
18from typing import Generic, Tuple, TypeVar, get_type_hints
19
20from perfetto.trace_uri_resolver import util
21
22TraceUri = str
23TraceGenerator = Generator[bytes, None, None]
24TraceContent = Union[BinaryIO, TraceGenerator]
25_T = TypeVar('_T')
26
27
28@dc.dataclass
29class ConstraintClass(Generic[_T]):
30
31  class Op(enum.Enum):
32    EQ = '='
33    NE = '!='
34    LE = '<='
35    GE = '>='
36    GT = '>'
37    LT = '<'
38
39    def __str__(self):
40      return self.value
41
42  value: _T
43  op: Op = Op.EQ
44
45
46Constraint = Union[_T, ConstraintClass[_T]]
47ConstraintWithList = Union[Constraint[_T], Constraint[List[_T]]]
48
49
50class TraceUriResolver:
51  """"Resolves a trace URI (e.g. 'ants:trace_id=1234') into a list of traces.
52
53  This class can be subclassed to provide a pluggable mechanism for looking
54  up traces using URI strings.
55
56  For example:
57    class CustomTraceResolver(TraceUriResolver):
58      PREFIX = 'custom'
59
60      def __init__(self, build_branch: List[str] = None, id: str = None):
61        self.build_branch = build_branch
62        self.id = id
63        self.db = init_db()
64
65      def resolve(self):
66        traces = self.db.lookup(
67          id=self.id, build_branch=self.build_branch)['path']
68        return [
69          TraceUriResolver.Result(
70            trace=t['path'],
71            args={'iteration': t['iteration'], 'device': t['device']}
72          )
73          for t in traces
74        ]
75
76  Trace resolvers can be passed to trace processor directly:
77    with TraceProcessor(CustomTraceResolver(id='abcdefg')) as tp:
78      tp.query('select * from slice')
79
80  Alternatively, a trace addesses can be passed:
81    config = TraceProcessorConfig(
82      resolver_registry=ResolverRegistry(resolvers=[CustomTraceResolver])
83    )
84    with TraceProcessor('custom:id=abcdefg', config=config) as tp:
85      tp.query('select * from slice')
86  """
87
88  # Subclasses should set PREFIX to match the trace address prefix they
89  # want to handle.
90  PREFIX: str = None
91
92  @dc.dataclass
93  class Result:
94    # TraceUri is present here because it allows recursive lookups (i.e.
95    # a resolver which returns a path to a trace).
96    trace: Union[TraceUri, TraceContent]
97
98    # metadata allows additional key-value pairs to be provided which are
99    # associated for trace. For example, test names and iteration numbers
100    # could be provivded for traces originating from lab tests.
101    metadata: Dict[str, str]
102
103    def __init__(self,
104                 trace: Union[TraceUri, TraceContent],
105                 metadata: Dict[str, str] = dict()):
106      self.trace = trace
107      self.metadata = metadata
108
109  def resolve(self) -> List['TraceUriResolver.Result']:
110    """Resolves a list of traces.
111
112    Subclasses should implement this method and resolve the parameters
113    specified in the constructor to a list of traces.
114    """
115    raise Exception('resolve is unimplemented for this resolver')
116
117  @classmethod
118  def from_trace_uri(cls: Type['TraceUriResolver'],
119                     uri: TraceUri) -> 'TraceUriResolver':
120    """Creates a resolver from a URI.
121
122    URIs have the form:
123    android_ci:day=2021-01-01;devices=blueline,crosshatch;key>=value
124
125    This is converted to a dictionary of the form:
126    {'day': '2021-01-01', 'id': ['blueline', 'crosshatch'],
127    'key': ConstraintClass('value', Op.GE)}
128
129    and passed as kwargs to the constructor of the trace resolver (see class
130    documentation for info).
131
132    Generally, sublcasses should not override this method as the standard
133    trace address format should work for most usecases. Instead, simply
134    define your constructor with the parameters you expect to see in the
135    trace address.
136    """
137    return cls(**_args_dict_from_uri(uri, get_type_hints(cls.__init__)))
138
139
140def _read_op(arg_str: str, op_start_ind: int) -> ConstraintClass.Op:
141  """Parse operator from string.
142
143  Given string and an expected start index for operator it returns Op object or
144  raises error if operator was not found.
145
146  For example:
147  _read_op('a>4', 1) returns Op.GE
148  _read_op('a>4', 0) raises ValueError
149  _read_op('a>4', 3) raises ValueError
150  """
151  first = arg_str[op_start_ind] if op_start_ind < len(arg_str) else None
152  second = arg_str[op_start_ind +
153                   1] if op_start_ind + 1 < len(arg_str) else None
154  Op = ConstraintClass.Op
155  if first == '>':
156    return Op.GE if second == '=' else Op.GT
157  elif first == '<':
158    return Op.LE if second == '=' else Op.LT
159  elif first == '!' and second == '=':
160    return Op.NE
161  elif first == '=':
162    return Op.EQ
163  raise ValueError('Could not find valid operator in uri arg_str: ' + arg_str)
164
165
166def _parse_arg(arg_str: str) -> Tuple[str, ConstraintClass.Op, str]:
167  """Parse argument string and return a tuple (key, operator, value).
168
169  Given a string like 'branch_num>=4000', it returns a tuple ('branch_num',
170  Op.GE,'4000'). Raises ValueError exceptions in case ill formed arg_str is
171  passed like '>30', 'key>', 'key', 'key--31'
172  """
173  op_start_ind = 0
174  for ind, c in enumerate(arg_str):
175    if not c.isalnum() and c != '_':
176      op_start_ind = ind
177      break
178  if op_start_ind == 0:
179    raise ValueError('Could not find valid key in arg_str: ' + arg_str)
180  key = arg_str[:op_start_ind]
181  op = _read_op(arg_str, op_start_ind)
182  value = arg_str[op_start_ind + len(str(op)):]
183  if not value:
184    raise ValueError('Empty value in trace uri arg_str: ' + arg_str)
185  return (key, op, value)
186
187
188def _args_dict_from_uri(uri: str,
189                        type_hints) -> Dict[str, ConstraintWithList[str]]:
190  """Creates an the args dictionary from a trace URI.
191
192    URIs have the form:
193    android_ci:day=2021-01-01;devices=blueline,crosshatch;key>=value;\
194    version>=1;version<5
195
196    This is converted to a dictionary of the form:
197    {'day': '2021-01-01', 'id': ['blueline', 'crosshatch'],
198    'key': ConstraintClass('value', Op.GE),
199    'version': [ConstraintClass(1, Op.GE), ConstraintClass(5, Op.LT)]}
200  """
201  _, args_str = util.parse_trace_uri(uri)
202  if not args_str:
203    return {}
204
205  args_lst = args_str.split(';')
206  args_dict = dict()
207  for arg in args_lst:
208    (key, op, value) = _parse_arg(arg)
209    lst = value.split(',')
210    if len(lst) > 1:
211      args_dict_value = lst
212    else:
213      args_dict_value = value
214
215    if key not in type_hints:
216      if op != ConstraintClass.Op.EQ:
217        raise ValueError(f'{key} only supports "=" operator')
218      args_dict[key] = args_dict_value
219      continue
220    have_constraint = False
221    type_hint = type_hints[key]
222    type_args = type_hint.__args__ if hasattr(type_hint, '__args__') else ()
223    for type_arg in type_args:
224      type_origin = type_arg.__origin__ if hasattr(type_arg,
225                                                   '__origin__') else None
226      if type_origin is ConstraintClass:
227        have_constraint = True
228        break
229    if not have_constraint and op != ConstraintClass.Op.EQ:
230      raise ValueError('Operator other than "=" passed to argument which '
231                       'does not have constraint type: ' + arg)
232    if have_constraint:
233      if key not in args_dict:
234        args_dict[key] = ConstraintClass(args_dict_value, op)
235      else:
236        if isinstance(args_dict[key], ConstraintClass):
237          args_dict[key] = [args_dict[key]]
238        args_dict[key].append(ConstraintClass(args_dict_value, op))
239    else:
240      args_dict[key] = args_dict_value
241  return args_dict
242