xref: /aosp_15_r20/external/perfetto/python/test/resolver_unittest.py (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1# Copyright (C) 2022 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import unittest
16
17from perfetto.trace_uri_resolver.util import parse_trace_uri
18from perfetto.trace_uri_resolver.util import to_list
19from perfetto.trace_uri_resolver.util import _cs_list
20from perfetto.trace_uri_resolver.util import and_list
21from perfetto.trace_uri_resolver.util import or_list
22from perfetto.trace_uri_resolver.resolver import _args_dict_from_uri
23from perfetto.trace_uri_resolver.resolver import Constraint
24from perfetto.trace_uri_resolver.resolver import ConstraintClass
25from perfetto.trace_uri_resolver.resolver import TraceUriResolver
26from perfetto.trace_uri_resolver.registry import ResolverRegistry
27
28
29class SimpleResolver(TraceUriResolver):
30  PREFIX = 'simple'
31
32  def __init__(self, foo=None, bar=None):
33    self.foo = foo
34    self.bar = bar
35
36  def foo_gen(self):
37    yield self.foo.encode() if self.foo else b''
38
39  def bar_gen(self):
40    yield self.bar.encode() if self.bar else b''
41
42  def resolve(self):
43    return [
44        TraceUriResolver.Result(self.foo_gen()),
45        TraceUriResolver.Result(
46            self.bar_gen(), metadata={
47                'foo': self.foo,
48                'bar': self.bar
49            })
50    ]
51
52
53class RecursiveResolver(SimpleResolver):
54  PREFIX = 'recursive'
55
56  def __init__(self, foo=None, bar=None):
57    super().__init__(foo=foo, bar=bar)
58
59  def resolve(self):
60    return [
61        TraceUriResolver.Result(self.foo_gen()),
62        TraceUriResolver.Result(
63            self.bar_gen(), metadata={
64                'foo': 'foo',
65                'bar': 'bar'
66            }),
67        TraceUriResolver.Result(f'simple:foo={self.foo};bar={self.bar}'),
68        TraceUriResolver.Result(SimpleResolver(foo=self.foo, bar=self.bar)),
69    ]
70
71
72class TestResolver(unittest.TestCase):
73
74  def test_simple_resolve(self):
75    registry = ResolverRegistry([SimpleResolver])
76
77    res = registry.resolve('simple:foo=x;bar=y')
78    self.assertEqual(len(res), 2)
79
80    (foo_res, bar_res) = res
81    self._check_resolver_result(foo_res, bar_res)
82
83    (foo_res, bar_res) = registry.resolve(['simple:foo=x;bar=y'])
84    self._check_resolver_result(foo_res, bar_res)
85
86    resolver = SimpleResolver(foo='x', bar='y')
87
88    (foo_res, bar_res) = registry.resolve(resolver)
89    self._check_resolver_result(foo_res, bar_res)
90
91    (foo_res, bar_res) = registry.resolve([resolver])
92    self._check_resolver_result(foo_res, bar_res)
93
94    (foo_a, bar_b, foo_x,
95     bar_y) = registry.resolve(['simple:foo=a;bar=b', resolver])
96    self._check_resolver_result(foo_a, bar_b, foo='a', bar='b')
97    self._check_resolver_result(foo_x, bar_y)
98
99  def test_simple_resolve_missing_arg(self):
100    registry = ResolverRegistry([SimpleResolver])
101
102    (foo_res, bar_res) = registry.resolve('simple:foo=x')
103    self._check_resolver_result(foo_res, bar_res, bar=None)
104
105    (foo_res, bar_res) = registry.resolve('simple:bar=y')
106    self._check_resolver_result(foo_res, bar_res, foo=None)
107
108    (foo_res, bar_res) = registry.resolve('simple:')
109    self._check_resolver_result(foo_res, bar_res, foo=None, bar=None)
110
111  def test_recursive_resolve(self):
112    registry = ResolverRegistry([SimpleResolver])
113    registry.register(RecursiveResolver)
114
115    res = registry.resolve('recursive:foo=x;bar=y')
116    self.assertEqual(len(res), 6)
117
118    (non_rec_foo, non_rec_bar, rec_foo_str, rec_bar_str, rec_foo_obj,
119     rec_bar_obj) = res
120
121    self._check_resolver_result(
122        non_rec_foo, non_rec_bar, foo_metadata='foo', bar_metadata='bar')
123    self._check_resolver_result(rec_foo_str, rec_bar_str)
124    self._check_resolver_result(rec_foo_obj, rec_bar_obj)
125
126  def test_parse_trace_uri(self):
127    self.assertEqual(parse_trace_uri('/foo/bar'), (None, '/foo/bar'))
128    self.assertEqual(parse_trace_uri('foo/bar'), (None, 'foo/bar'))
129    self.assertEqual(parse_trace_uri('/foo/b:ar'), (None, '/foo/b:ar'))
130    self.assertEqual(parse_trace_uri('./foo/b:ar'), (None, './foo/b:ar'))
131    self.assertEqual(parse_trace_uri('foo/b:ar'), ('foo/b', 'ar'))
132
133  def test_to_list(self):
134    self.assertEqual(to_list(None), None)
135    self.assertEqual(to_list(1), [1])
136    self.assertEqual(to_list('1'), ['1'])
137    self.assertEqual(to_list([]), [])
138    self.assertEqual(to_list([1]), [1])
139
140  def test_cs_list(self):
141    fn = 'col = {}'.format
142    sep = ' || '
143    self.assertEqual(_cs_list(None, fn, 'FALSE', sep), 'TRUE')
144    self.assertEqual(_cs_list(None, fn, 'TRUE', sep), 'TRUE')
145    self.assertEqual(_cs_list([], fn, 'FALSE', sep), 'FALSE')
146    self.assertEqual(_cs_list([], fn, 'TRUE', sep), 'TRUE')
147    self.assertEqual(_cs_list([1], fn, 'FALSE', sep), '(col = 1)')
148    self.assertEqual(_cs_list([1, 2], fn, 'FALSE', sep), '(col = 1 || col = 2)')
149
150  def test_and_list(self):
151    fn = 'col != {}'.format
152    self.assertEqual(and_list([1, 2], fn, 'FALSE'), '(col != 1 AND col != 2)')
153
154  def test_or_list(self):
155    fn = 'col = {}'.format
156    self.assertEqual(or_list([1, 2], fn, 'FALSE'), '(col = 1 OR col = 2)')
157
158  def test_args_dict_from_uri(self):
159    self.assertEqual(_args_dict_from_uri('foo:', {}), {})
160    self.assertEqual(_args_dict_from_uri('foo:bar=baz', {}), {
161        'bar': 'baz',
162    })
163    self.assertEqual(
164        _args_dict_from_uri('foo:key=v1,v2', {}), {'key': ['v1', 'v2']})
165    self.assertEqual(
166        _args_dict_from_uri('foo:bar=baz;key=v1,v2', {}), {
167            'bar': 'baz',
168            'key': ['v1', 'v2']
169        })
170    with self.assertRaises(ValueError):
171      _args_dict_from_uri('foo:=v1', {})
172    with self.assertRaises(ValueError):
173      _args_dict_from_uri('foo:key', {})
174    with self.assertRaises(ValueError):
175      _args_dict_from_uri('foo:key<', {})
176    with self.assertRaises(ValueError):
177      _args_dict_from_uri('foo:key<v1', {})
178    with self.assertRaises(ValueError):
179      _args_dict_from_uri('foo:key<v1', {'key': str})
180
181    type_hints = {'key': Constraint[str]}
182    self.assertEqual(
183        _args_dict_from_uri('foo:key=v1', type_hints),
184        {'key': ConstraintClass('v1', ConstraintClass.Op.EQ)})
185    self.assertEqual(
186        _args_dict_from_uri('foo:key!=v1', type_hints),
187        {'key': ConstraintClass('v1', ConstraintClass.Op.NE)})
188    self.assertEqual(
189        _args_dict_from_uri('foo:key<=v1', type_hints),
190        {'key': ConstraintClass('v1', ConstraintClass.Op.LE)})
191    self.assertEqual(
192        _args_dict_from_uri('foo:key>=v1', type_hints),
193        {'key': ConstraintClass('v1', ConstraintClass.Op.GE)})
194    self.assertEqual(
195        _args_dict_from_uri('foo:key>v1', type_hints),
196        {'key': ConstraintClass('v1', ConstraintClass.Op.GT)})
197    self.assertEqual(
198        _args_dict_from_uri('foo:key<v1', type_hints),
199        {'key': ConstraintClass('v1', ConstraintClass.Op.LT)})
200    self.assertEqual(
201        _args_dict_from_uri('foo:key>v1;key<=v2', type_hints), {
202            'key': [
203                ConstraintClass('v1', ConstraintClass.Op.GT),
204                ConstraintClass('v2', ConstraintClass.Op.LE)
205            ]
206        })
207    self.assertEqual(
208        _args_dict_from_uri('foo:key>=v1;key<v4;key!=v2;key!=v3', type_hints), {
209            'key': [
210                ConstraintClass('v1', ConstraintClass.Op.GE),
211                ConstraintClass('v4', ConstraintClass.Op.LT),
212                ConstraintClass('v2', ConstraintClass.Op.NE),
213                ConstraintClass('v3', ConstraintClass.Op.NE),
214            ]
215        })
216
217  def _check_resolver_result(self,
218                             foo_res,
219                             bar_res,
220                             foo='x',
221                             bar='y',
222                             foo_metadata=None,
223                             bar_metadata=None):
224    self.assertEqual(
225        tuple(foo_res.generator), (foo.encode() if foo else ''.encode(),))
226    self.assertEqual(
227        tuple(bar_res.generator), (bar.encode() if bar else ''.encode(),))
228    self.assertEqual(
229        bar_res.metadata, {
230            'foo': foo_metadata if foo_metadata else foo,
231            'bar': bar_metadata if bar_metadata else bar
232        })
233