1#!/usr/bin/env python3 2# Copyright (C) 2021 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Contains classes for BatchTraceProcessor API.""" 16import abc 17import concurrent.futures as cf 18import dataclasses as dc 19import time 20from enum import Enum 21import multiprocessing 22from typing import Any, Callable, Dict, Tuple, List, Optional 23 24import pandas as pd 25 26from perfetto.batch_trace_processor.platform import PlatformDelegate 27from perfetto.common.exceptions import PerfettoException 28from perfetto.trace_processor.api import PLATFORM_DELEGATE as TP_PLATFORM_DELEGATE 29from perfetto.trace_processor.api import TraceProcessor 30from perfetto.trace_processor.api import TraceProcessorException 31from perfetto.trace_processor.api import TraceProcessorConfig 32from perfetto.trace_uri_resolver import registry 33from perfetto.trace_uri_resolver.registry import ResolverRegistry 34 35# Defining this field as a module variable means this can be changed by 36# implementations at startup and used for all BatchTraceProcessor objects 37# without having to specify on each one. 38# In Google3, this field is rewritten using Copybara to a implementation 39# which can integrates with internal infra. 40PLATFORM_DELEGATE = PlatformDelegate 41 42TraceListReference = registry.TraceListReference 43Metadata = Dict[str, str] 44 45MAX_LOAD_WORKERS = 32 46 47# Enum encoding how errors while loading/querying traces in BatchTraceProcessor 48# should be handled. 49class FailureHandling(Enum): 50 # If any trace fails to load or be queried, raises an exception causing the 51 # entire batch trace processor to fail. 52 # This is the default behaviour and the method which should be preferred for 53 # any interactive use of BatchTraceProcessor. 54 RAISE_EXCEPTION = 0 55 56 # If a trace fails to load or be queried, the trace processor for that trace 57 # is dropped but loading of other traces is unaffected. A failure integer is 58 # incremented in the Stats class for the batch trace processor instance. 59 INCREMENT_STAT = 1 60 61 62@dc.dataclass 63class BatchTraceProcessorConfig: 64 tp_config: TraceProcessorConfig 65 load_failure_handling: FailureHandling 66 execute_failure_handling: FailureHandling 67 68 def __init__( 69 self, 70 tp_config: TraceProcessorConfig = TraceProcessorConfig(), 71 load_failure_handling: FailureHandling = FailureHandling.RAISE_EXCEPTION, 72 execute_failure_handling: FailureHandling = FailureHandling 73 .RAISE_EXCEPTION, 74 ): 75 self.tp_config = tp_config 76 self.load_failure_handling = load_failure_handling 77 self.execute_failure_handling = execute_failure_handling 78 79 80# Contains stats about the events which happened during the use of 81# BatchTraceProcessor. 82@dc.dataclass 83class Stats: 84 # The number of traces which failed to load; only non-zero if 85 # FailureHanding.INCREMENT_STAT is chosen as the load failure handling type. 86 load_failures: int = 0 87 88 # The number of traces which failed while executing (query, metric or 89 # arbitary function); only non-zero if FailureHanding.INCREMENT_STAT is 90 # chosen as the execute failure handling type. 91 execute_failures: int = 0 92 93 94class BatchTraceProcessor: 95 """Run ad-hoc SQL queries across many Perfetto traces. 96 97 Usage: 98 with BatchTraceProcessor(traces) as btp: 99 dfs = btp.query('select * from slice') 100 for df in dfs: 101 print(df) 102 """ 103 104 class Observer(abc.ABC): 105 """Observer that can be used to provide side-channel information about 106 processed traces. 107 """ 108 109 @abc.abstractmethod 110 def trace_processed(self, metadata: Metadata, 111 execution_time_seconds: float): 112 """Invoked every time query has been executed on a trace. 113 114 Args: 115 metadata: Metadata provided by trace resolver, can be used to identify 116 the trace. 117 118 execution_time_seconds: Query execution time, in seconds. 119 """ 120 raise NotImplementedError 121 122 def __init__(self, 123 traces: TraceListReference, 124 config: BatchTraceProcessorConfig = BatchTraceProcessorConfig(), 125 observer: Optional[Observer] = None): 126 """Creates a batch trace processor instance. 127 128 BatchTraceProcessor is the blessed way of running ad-hoc queries in 129 Python across many traces. 130 131 Args: 132 traces: A list of traces, a trace URI resolver or a URI which 133 can be resolved to a list of traces. 134 135 If a list, each of items must be one of the following types: 136 1) path to a trace file to open and read 137 2) a file like object (file, io.BytesIO or similar) to read 138 3) a generator yielding bytes 139 4) an URI which resolves to a trace 140 141 A trace URI resolver is a subclass of resolver.TraceUriResolver 142 which generates a list of trace references when the |resolve| 143 method is called on it. 144 145 A URI is similar to a connection string (e.g. for a web 146 address or SQL database) which specifies where to lookup traces 147 and which traces to pick from this data source. The format of a 148 string should be as follows: 149 resolver_name:key_1=list,of,values;key_2=value 150 151 Custom resolvers can be provided to handle URIs via 152 |config.resolver_registry|. 153 config: configuration options which customize functionality of batch 154 trace processor and underlying trace processors. 155 observer: an optional observer for side-channel information, e.g. 156 running time of queries. 157 """ 158 159 self.tps_and_metadata = None 160 self.closed = False 161 self._stats = Stats() 162 163 self.platform_delegate = PLATFORM_DELEGATE() 164 self.tp_platform_delegate = TP_PLATFORM_DELEGATE() 165 self.config = config 166 167 self.observer = observer 168 169 # Make sure the descendent trace processors are using the same resolver 170 # registry (even though they won't actually use it as we will resolve 171 # everything fully in this class). 172 self.resolver_registry = config.tp_config.resolver_registry or \ 173 self.tp_platform_delegate.default_resolver_registry() 174 self.config.tp_config.resolver_registry = self.resolver_registry 175 176 # Resolve all the traces to their final form. 177 resolved = self.resolver_registry.resolve(traces) 178 179 # As trace processor is completely CPU bound, it makes sense to just 180 # max out the CPUs available. 181 query_executor = self.platform_delegate.create_query_executor( 182 len(resolved)) or cf.ThreadPoolExecutor( 183 max_workers=multiprocessing.cpu_count()) 184 # Loading trace involves FS access, so it makes sense to limit parallelism 185 max_load_workers = min(multiprocessing.cpu_count(), MAX_LOAD_WORKERS) 186 load_executor = self.platform_delegate.create_load_executor( 187 len(resolved)) or cf.ThreadPoolExecutor(max_workers=max_load_workers) 188 189 self.query_executor = query_executor 190 self.tps_and_metadata = [ 191 x for x in load_executor.map(self._create_tp, resolved) if x is not None 192 ] 193 194 def metric(self, metrics: List[str]): 195 """Computes the provided metrics. 196 197 The computation happens in parallel across all the traces. 198 199 Args: 200 metrics: A list of valid metrics as defined in TraceMetrics 201 202 Returns: 203 A list of TraceMetric protos (one for each trace). 204 """ 205 return self.execute(lambda tp: tp.metric(metrics)) 206 207 def query(self, sql: str): 208 """Executes the provided SQL statement (returning a single row). 209 210 The execution happens in parallel across all the traces. 211 212 Args: 213 sql: The SQL statement to execute. 214 215 Returns: 216 A list of Pandas dataframes with the result of executing the query (one 217 per trace). 218 219 Raises: 220 TraceProcessorException: An error occurred running the query. 221 """ 222 return self.execute(lambda tp: tp.query(sql).as_pandas_dataframe()) 223 224 def query_and_flatten(self, sql: str): 225 """Executes the provided SQL statement and flattens the result. 226 227 The execution happens in parallel across all the traces and the 228 resulting Pandas dataframes are flattened into a single dataframe. 229 230 Args: 231 sql: The SQL statement to execute. 232 233 Returns: 234 A concatenated Pandas dataframe containing the result of executing the 235 query across all the traces. 236 237 If an URI or a trace resolver was passed to the constructor, the 238 contents of the |metadata| dictionary emitted by the resolver will also 239 be emitted as extra columns (key being column name, value being the 240 value in the dataframe). 241 242 For example: 243 class CustomResolver(TraceUriResolver): 244 def resolve(self): 245 return [TraceUriResolver.Result(trace='/tmp/path', 246 metadata={ 247 'path': '/tmp/path' 248 'foo': 'bar' 249 })] 250 251 with BatchTraceProcessor(CustomResolver()) as btp: 252 df = btp.query_and_flatten('select count(1) as cnt from slice') 253 254 Then df will look like this: 255 cnt path foo 256 100 /tmp/path bar 257 258 Raises: 259 TraceProcessorException: An error occurred running the query. 260 """ 261 return self.execute_and_flatten(lambda tp: tp.query(sql). 262 as_pandas_dataframe()) 263 264 def query_single_result(self, sql: str): 265 """Executes the provided SQL statement (returning a single row). 266 267 The execution happens in parallel across all the traces. 268 269 Args: 270 sql: The SQL statement to execute. This statement should return exactly 271 one row on any trace. 272 273 Returns: 274 A list of values with the result of executing the query (one per trace). 275 276 Raises: 277 TraceProcessorException: An error occurred running the query or more than 278 one result was returned. 279 """ 280 281 def query_single_result_inner(tp): 282 df = tp.query(sql).as_pandas_dataframe() 283 if len(df.index) != 1: 284 raise TraceProcessorException("Query should only return a single row") 285 286 if len(df.columns) != 1: 287 raise TraceProcessorException( 288 "Query should only return a single column") 289 290 return df.iloc[0, 0] 291 292 return self.execute(query_single_result_inner) 293 294 def execute(self, fn: Callable[[TraceProcessor], Any]) -> List[Any]: 295 """Executes the provided function. 296 297 The execution happens in parallel across all the trace processor instances 298 owned by this object. 299 300 Args: 301 fn: The function to execute. 302 303 Returns: 304 A list of values with the result of executing the fucntion (one per 305 trace). 306 """ 307 308 def wrapped(pair: Tuple[TraceProcessor, Metadata]): 309 (tp, metadata) = pair 310 return self._execute_handling_failure(fn, tp, metadata) 311 312 return list(self.query_executor.map(wrapped, self.tps_and_metadata)) 313 314 def execute_and_flatten(self, fn: Callable[[TraceProcessor], pd.DataFrame] 315 ) -> pd.DataFrame: 316 """Executes the provided function and flattens the result. 317 318 The execution happens in parallel across all the trace processor 319 instances owned by this object and the returned Pandas dataframes are 320 flattened into a single dataframe. 321 322 Args: 323 fn: The function to execute which returns a Pandas dataframe. 324 325 Returns: 326 A Pandas dataframe containing the result of executing the query across all 327 the traces. Extra columns containing the file path and args will 328 be added to the dataframe (see |query_and_flatten| for details). 329 """ 330 331 def wrapped(pair: Tuple[TraceProcessor, Metadata]): 332 (tp, metadata) = pair 333 start = time.time() 334 df = self._execute_handling_failure(fn, tp, metadata) 335 end = time.time() 336 if self.observer: 337 self.observer.trace_processed(metadata, end - start) 338 for key, value in metadata.items(): 339 df[key] = value 340 return df 341 342 df = pd.concat( 343 list(self.query_executor.map(wrapped, self.tps_and_metadata))) 344 return df.reset_index(drop=True) 345 346 def close(self): 347 """Closes this batch trace processor instance. 348 349 This closes all spawned trace processor instances, releasing all the memory 350 and resources those instances take. 351 352 No further calls to other methods in this class should be made after 353 calling this method. 354 """ 355 if self.closed: 356 return 357 self.closed = True 358 359 if self.tps_and_metadata: 360 for tp, _ in self.tps_and_metadata: 361 tp.close() 362 363 def stats(self): 364 """Statistics about the operation of this batch trace processor instance. 365 366 See |Stats| class definition for the list of the statistics available.""" 367 return self._stats 368 369 def _create_tp(self, trace: ResolverRegistry.Result 370 ) -> Optional[Tuple[TraceProcessor, Metadata]]: 371 try: 372 return TraceProcessor( 373 trace=trace.generator, config=self.config.tp_config), trace.metadata 374 except Exception as ex: 375 if self.config.load_failure_handling == FailureHandling.RAISE_EXCEPTION: 376 raise ex 377 self._stats.load_failures += 1 378 return None 379 380 def _execute_handling_failure(self, fn: Callable[[TraceProcessor], Any], 381 tp: TraceProcessor, metadata: Metadata): 382 try: 383 return fn(tp) 384 except TraceProcessorException as ex: 385 if self.config.execute_failure_handling == \ 386 FailureHandling.RAISE_EXCEPTION: 387 raise TraceProcessorException(f'{metadata} {ex}') from None 388 self._stats.execute_failures += 1 389 return pd.DataFrame() 390 391 def __enter__(self): 392 return self 393 394 def __exit__(self, a, b, c): 395 del a, b, c # Unused. 396 self.close() 397 return False 398 399 def __del__(self): 400 self.close() 401