1# Copyright (C) 2022 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import unittest 16 17from perfetto.trace_uri_resolver.util import parse_trace_uri 18from perfetto.trace_uri_resolver.util import to_list 19from perfetto.trace_uri_resolver.util import _cs_list 20from perfetto.trace_uri_resolver.util import and_list 21from perfetto.trace_uri_resolver.util import or_list 22from perfetto.trace_uri_resolver.resolver import _args_dict_from_uri 23from perfetto.trace_uri_resolver.resolver import Constraint 24from perfetto.trace_uri_resolver.resolver import ConstraintClass 25from perfetto.trace_uri_resolver.resolver import TraceUriResolver 26from perfetto.trace_uri_resolver.registry import ResolverRegistry 27 28 29class SimpleResolver(TraceUriResolver): 30 PREFIX = 'simple' 31 32 def __init__(self, foo=None, bar=None): 33 self.foo = foo 34 self.bar = bar 35 36 def foo_gen(self): 37 yield self.foo.encode() if self.foo else b'' 38 39 def bar_gen(self): 40 yield self.bar.encode() if self.bar else b'' 41 42 def resolve(self): 43 return [ 44 TraceUriResolver.Result(self.foo_gen()), 45 TraceUriResolver.Result( 46 self.bar_gen(), metadata={ 47 'foo': self.foo, 48 'bar': self.bar 49 }) 50 ] 51 52 53class RecursiveResolver(SimpleResolver): 54 PREFIX = 'recursive' 55 56 def __init__(self, foo=None, bar=None): 57 super().__init__(foo=foo, bar=bar) 58 59 def resolve(self): 60 return [ 61 TraceUriResolver.Result(self.foo_gen()), 62 TraceUriResolver.Result( 63 self.bar_gen(), metadata={ 64 'foo': 'foo', 65 'bar': 'bar' 66 }), 67 TraceUriResolver.Result(f'simple:foo={self.foo};bar={self.bar}'), 68 TraceUriResolver.Result(SimpleResolver(foo=self.foo, bar=self.bar)), 69 ] 70 71 72class TestResolver(unittest.TestCase): 73 74 def test_simple_resolve(self): 75 registry = ResolverRegistry([SimpleResolver]) 76 77 res = registry.resolve('simple:foo=x;bar=y') 78 self.assertEqual(len(res), 2) 79 80 (foo_res, bar_res) = res 81 self._check_resolver_result(foo_res, bar_res) 82 83 (foo_res, bar_res) = registry.resolve(['simple:foo=x;bar=y']) 84 self._check_resolver_result(foo_res, bar_res) 85 86 resolver = SimpleResolver(foo='x', bar='y') 87 88 (foo_res, bar_res) = registry.resolve(resolver) 89 self._check_resolver_result(foo_res, bar_res) 90 91 (foo_res, bar_res) = registry.resolve([resolver]) 92 self._check_resolver_result(foo_res, bar_res) 93 94 (foo_a, bar_b, foo_x, 95 bar_y) = registry.resolve(['simple:foo=a;bar=b', resolver]) 96 self._check_resolver_result(foo_a, bar_b, foo='a', bar='b') 97 self._check_resolver_result(foo_x, bar_y) 98 99 def test_simple_resolve_missing_arg(self): 100 registry = ResolverRegistry([SimpleResolver]) 101 102 (foo_res, bar_res) = registry.resolve('simple:foo=x') 103 self._check_resolver_result(foo_res, bar_res, bar=None) 104 105 (foo_res, bar_res) = registry.resolve('simple:bar=y') 106 self._check_resolver_result(foo_res, bar_res, foo=None) 107 108 (foo_res, bar_res) = registry.resolve('simple:') 109 self._check_resolver_result(foo_res, bar_res, foo=None, bar=None) 110 111 def test_recursive_resolve(self): 112 registry = ResolverRegistry([SimpleResolver]) 113 registry.register(RecursiveResolver) 114 115 res = registry.resolve('recursive:foo=x;bar=y') 116 self.assertEqual(len(res), 6) 117 118 (non_rec_foo, non_rec_bar, rec_foo_str, rec_bar_str, rec_foo_obj, 119 rec_bar_obj) = res 120 121 self._check_resolver_result( 122 non_rec_foo, non_rec_bar, foo_metadata='foo', bar_metadata='bar') 123 self._check_resolver_result(rec_foo_str, rec_bar_str) 124 self._check_resolver_result(rec_foo_obj, rec_bar_obj) 125 126 def test_parse_trace_uri(self): 127 self.assertEqual(parse_trace_uri('/foo/bar'), (None, '/foo/bar')) 128 self.assertEqual(parse_trace_uri('foo/bar'), (None, 'foo/bar')) 129 self.assertEqual(parse_trace_uri('/foo/b:ar'), (None, '/foo/b:ar')) 130 self.assertEqual(parse_trace_uri('./foo/b:ar'), (None, './foo/b:ar')) 131 self.assertEqual(parse_trace_uri('foo/b:ar'), ('foo/b', 'ar')) 132 133 def test_to_list(self): 134 self.assertEqual(to_list(None), None) 135 self.assertEqual(to_list(1), [1]) 136 self.assertEqual(to_list('1'), ['1']) 137 self.assertEqual(to_list([]), []) 138 self.assertEqual(to_list([1]), [1]) 139 140 def test_cs_list(self): 141 fn = 'col = {}'.format 142 sep = ' || ' 143 self.assertEqual(_cs_list(None, fn, 'FALSE', sep), 'TRUE') 144 self.assertEqual(_cs_list(None, fn, 'TRUE', sep), 'TRUE') 145 self.assertEqual(_cs_list([], fn, 'FALSE', sep), 'FALSE') 146 self.assertEqual(_cs_list([], fn, 'TRUE', sep), 'TRUE') 147 self.assertEqual(_cs_list([1], fn, 'FALSE', sep), '(col = 1)') 148 self.assertEqual(_cs_list([1, 2], fn, 'FALSE', sep), '(col = 1 || col = 2)') 149 150 def test_and_list(self): 151 fn = 'col != {}'.format 152 self.assertEqual(and_list([1, 2], fn, 'FALSE'), '(col != 1 AND col != 2)') 153 154 def test_or_list(self): 155 fn = 'col = {}'.format 156 self.assertEqual(or_list([1, 2], fn, 'FALSE'), '(col = 1 OR col = 2)') 157 158 def test_args_dict_from_uri(self): 159 self.assertEqual(_args_dict_from_uri('foo:', {}), {}) 160 self.assertEqual(_args_dict_from_uri('foo:bar=baz', {}), { 161 'bar': 'baz', 162 }) 163 self.assertEqual( 164 _args_dict_from_uri('foo:key=v1,v2', {}), {'key': ['v1', 'v2']}) 165 self.assertEqual( 166 _args_dict_from_uri('foo:bar=baz;key=v1,v2', {}), { 167 'bar': 'baz', 168 'key': ['v1', 'v2'] 169 }) 170 with self.assertRaises(ValueError): 171 _args_dict_from_uri('foo:=v1', {}) 172 with self.assertRaises(ValueError): 173 _args_dict_from_uri('foo:key', {}) 174 with self.assertRaises(ValueError): 175 _args_dict_from_uri('foo:key<', {}) 176 with self.assertRaises(ValueError): 177 _args_dict_from_uri('foo:key<v1', {}) 178 with self.assertRaises(ValueError): 179 _args_dict_from_uri('foo:key<v1', {'key': str}) 180 181 type_hints = {'key': Constraint[str]} 182 self.assertEqual( 183 _args_dict_from_uri('foo:key=v1', type_hints), 184 {'key': ConstraintClass('v1', ConstraintClass.Op.EQ)}) 185 self.assertEqual( 186 _args_dict_from_uri('foo:key!=v1', type_hints), 187 {'key': ConstraintClass('v1', ConstraintClass.Op.NE)}) 188 self.assertEqual( 189 _args_dict_from_uri('foo:key<=v1', type_hints), 190 {'key': ConstraintClass('v1', ConstraintClass.Op.LE)}) 191 self.assertEqual( 192 _args_dict_from_uri('foo:key>=v1', type_hints), 193 {'key': ConstraintClass('v1', ConstraintClass.Op.GE)}) 194 self.assertEqual( 195 _args_dict_from_uri('foo:key>v1', type_hints), 196 {'key': ConstraintClass('v1', ConstraintClass.Op.GT)}) 197 self.assertEqual( 198 _args_dict_from_uri('foo:key<v1', type_hints), 199 {'key': ConstraintClass('v1', ConstraintClass.Op.LT)}) 200 self.assertEqual( 201 _args_dict_from_uri('foo:key>v1;key<=v2', type_hints), { 202 'key': [ 203 ConstraintClass('v1', ConstraintClass.Op.GT), 204 ConstraintClass('v2', ConstraintClass.Op.LE) 205 ] 206 }) 207 self.assertEqual( 208 _args_dict_from_uri('foo:key>=v1;key<v4;key!=v2;key!=v3', type_hints), { 209 'key': [ 210 ConstraintClass('v1', ConstraintClass.Op.GE), 211 ConstraintClass('v4', ConstraintClass.Op.LT), 212 ConstraintClass('v2', ConstraintClass.Op.NE), 213 ConstraintClass('v3', ConstraintClass.Op.NE), 214 ] 215 }) 216 217 def _check_resolver_result(self, 218 foo_res, 219 bar_res, 220 foo='x', 221 bar='y', 222 foo_metadata=None, 223 bar_metadata=None): 224 self.assertEqual( 225 tuple(foo_res.generator), (foo.encode() if foo else ''.encode(),)) 226 self.assertEqual( 227 tuple(bar_res.generator), (bar.encode() if bar else ''.encode(),)) 228 self.assertEqual( 229 bar_res.metadata, { 230 'foo': foo_metadata if foo_metadata else foo, 231 'bar': bar_metadata if bar_metadata else bar 232 }) 233