xref: /aosp_15_r20/external/perfetto/python/perfetto/batch_trace_processor/api.py (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1#!/usr/bin/env python3
2# Copyright (C) 2021 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Contains classes for BatchTraceProcessor API."""
16import abc
17import concurrent.futures as cf
18import dataclasses as dc
19import time
20from enum import Enum
21import multiprocessing
22from typing import Any, Callable, Dict, Tuple, List, Optional
23
24import pandas as pd
25
26from perfetto.batch_trace_processor.platform import PlatformDelegate
27from perfetto.common.exceptions import PerfettoException
28from perfetto.trace_processor.api import PLATFORM_DELEGATE as TP_PLATFORM_DELEGATE
29from perfetto.trace_processor.api import TraceProcessor
30from perfetto.trace_processor.api import TraceProcessorException
31from perfetto.trace_processor.api import TraceProcessorConfig
32from perfetto.trace_uri_resolver import registry
33from perfetto.trace_uri_resolver.registry import ResolverRegistry
34
35# Defining this field as a module variable means this can be changed by
36# implementations at startup and used for all BatchTraceProcessor objects
37# without having to specify on each one.
38# In Google3, this field is rewritten using Copybara to a implementation
39# which can integrates with internal infra.
40PLATFORM_DELEGATE = PlatformDelegate
41
42TraceListReference = registry.TraceListReference
43Metadata = Dict[str, str]
44
45MAX_LOAD_WORKERS = 32
46
47# Enum encoding how errors while loading/querying traces in BatchTraceProcessor
48# should be handled.
49class FailureHandling(Enum):
50  # If any trace fails to load or be queried, raises an exception causing the
51  # entire batch trace processor to fail.
52  # This is the default behaviour and the method which should be preferred for
53  # any interactive use of BatchTraceProcessor.
54  RAISE_EXCEPTION = 0
55
56  # If a trace fails to load or be queried, the trace processor for that trace
57  # is dropped but loading of other traces is unaffected. A failure integer is
58  # incremented in the Stats class for the batch trace processor instance.
59  INCREMENT_STAT = 1
60
61
62@dc.dataclass
63class BatchTraceProcessorConfig:
64  tp_config: TraceProcessorConfig
65  load_failure_handling: FailureHandling
66  execute_failure_handling: FailureHandling
67
68  def __init__(
69      self,
70      tp_config: TraceProcessorConfig = TraceProcessorConfig(),
71      load_failure_handling: FailureHandling = FailureHandling.RAISE_EXCEPTION,
72      execute_failure_handling: FailureHandling = FailureHandling
73      .RAISE_EXCEPTION,
74  ):
75    self.tp_config = tp_config
76    self.load_failure_handling = load_failure_handling
77    self.execute_failure_handling = execute_failure_handling
78
79
80# Contains stats about the events which happened during the use of
81# BatchTraceProcessor.
82@dc.dataclass
83class Stats:
84  # The number of traces which failed to load; only non-zero if
85  # FailureHanding.INCREMENT_STAT is chosen as the load failure handling type.
86  load_failures: int = 0
87
88  # The number of traces which failed while executing (query, metric or
89  # arbitary function); only non-zero if FailureHanding.INCREMENT_STAT is
90  # chosen as the execute failure handling type.
91  execute_failures: int = 0
92
93
94class BatchTraceProcessor:
95  """Run ad-hoc SQL queries across many Perfetto traces.
96
97  Usage:
98    with BatchTraceProcessor(traces) as btp:
99      dfs = btp.query('select * from slice')
100      for df in dfs:
101        print(df)
102  """
103
104  class Observer(abc.ABC):
105    """Observer that can be used to provide side-channel information about
106    processed traces.
107    """
108
109    @abc.abstractmethod
110    def trace_processed(self, metadata: Metadata,
111                        execution_time_seconds: float):
112      """Invoked every time query has been executed on a trace.
113
114      Args:
115        metadata: Metadata provided by trace resolver, can be used to identify
116          the trace.
117
118        execution_time_seconds: Query execution time, in seconds.
119      """
120      raise NotImplementedError
121
122  def __init__(self,
123               traces: TraceListReference,
124               config: BatchTraceProcessorConfig = BatchTraceProcessorConfig(),
125               observer: Optional[Observer] = None):
126    """Creates a batch trace processor instance.
127
128    BatchTraceProcessor is the blessed way of running ad-hoc queries in
129    Python across many traces.
130
131    Args:
132      traces: A list of traces, a trace URI resolver or a URI which
133        can be resolved to a list of traces.
134
135        If a list, each of items must be one of the following types:
136        1) path to a trace file to open and read
137        2) a file like object (file, io.BytesIO or similar) to read
138        3) a generator yielding bytes
139        4) an URI which resolves to a trace
140
141        A trace URI resolver is a subclass of resolver.TraceUriResolver
142        which generates a list of trace references when the |resolve|
143        method is called on it.
144
145        A URI is similar to a connection string (e.g. for a web
146        address or SQL database) which specifies where to lookup traces
147        and which traces to pick from this data source. The format of a
148        string should be as follows:
149        resolver_name:key_1=list,of,values;key_2=value
150
151        Custom resolvers can be provided to handle URIs via
152        |config.resolver_registry|.
153      config: configuration options which customize functionality of batch
154        trace processor and underlying trace processors.
155      observer: an optional observer for side-channel information, e.g.
156        running time of queries.
157    """
158
159    self.tps_and_metadata = None
160    self.closed = False
161    self._stats = Stats()
162
163    self.platform_delegate = PLATFORM_DELEGATE()
164    self.tp_platform_delegate = TP_PLATFORM_DELEGATE()
165    self.config = config
166
167    self.observer = observer
168
169    # Make sure the descendent trace processors are using the same resolver
170    # registry (even though they won't actually use it as we will resolve
171    # everything fully in this class).
172    self.resolver_registry = config.tp_config.resolver_registry or \
173      self.tp_platform_delegate.default_resolver_registry()
174    self.config.tp_config.resolver_registry = self.resolver_registry
175
176    # Resolve all the traces to their final form.
177    resolved = self.resolver_registry.resolve(traces)
178
179    # As trace processor is completely CPU bound, it makes sense to just
180    # max out the CPUs available.
181    query_executor = self.platform_delegate.create_query_executor(
182        len(resolved)) or cf.ThreadPoolExecutor(
183            max_workers=multiprocessing.cpu_count())
184    # Loading trace involves FS access, so it makes sense to limit parallelism
185    max_load_workers = min(multiprocessing.cpu_count(), MAX_LOAD_WORKERS)
186    load_executor = self.platform_delegate.create_load_executor(
187        len(resolved)) or cf.ThreadPoolExecutor(max_workers=max_load_workers)
188
189    self.query_executor = query_executor
190    self.tps_and_metadata = [
191        x for x in load_executor.map(self._create_tp, resolved) if x is not None
192    ]
193
194  def metric(self, metrics: List[str]):
195    """Computes the provided metrics.
196
197    The computation happens in parallel across all the traces.
198
199    Args:
200      metrics: A list of valid metrics as defined in TraceMetrics
201
202    Returns:
203      A list of TraceMetric protos (one for each trace).
204    """
205    return self.execute(lambda tp: tp.metric(metrics))
206
207  def query(self, sql: str):
208    """Executes the provided SQL statement (returning a single row).
209
210    The execution happens in parallel across all the traces.
211
212    Args:
213      sql: The SQL statement to execute.
214
215    Returns:
216      A list of Pandas dataframes with the result of executing the query (one
217      per trace).
218
219    Raises:
220      TraceProcessorException: An error occurred running the query.
221    """
222    return self.execute(lambda tp: tp.query(sql).as_pandas_dataframe())
223
224  def query_and_flatten(self, sql: str):
225    """Executes the provided SQL statement and flattens the result.
226
227    The execution happens in parallel across all the traces and the
228    resulting Pandas dataframes are flattened into a single dataframe.
229
230    Args:
231      sql: The SQL statement to execute.
232
233    Returns:
234      A concatenated Pandas dataframe containing the result of executing the
235      query across all the traces.
236
237      If an URI or a trace resolver was passed to the constructor, the
238      contents of the |metadata| dictionary emitted by the resolver will also
239      be emitted as extra columns (key being column name, value being the
240      value in the dataframe).
241
242      For example:
243        class CustomResolver(TraceUriResolver):
244          def resolve(self):
245            return [TraceUriResolver.Result(trace='/tmp/path',
246                                            metadata={
247                                              'path': '/tmp/path'
248                                              'foo': 'bar'
249                                            })]
250
251        with BatchTraceProcessor(CustomResolver()) as btp:
252          df = btp.query_and_flatten('select count(1) as cnt from slice')
253
254      Then df will look like this:
255        cnt       path              foo
256        100       /tmp/path         bar
257
258    Raises:
259      TraceProcessorException: An error occurred running the query.
260    """
261    return self.execute_and_flatten(lambda tp: tp.query(sql).
262                                    as_pandas_dataframe())
263
264  def query_single_result(self, sql: str):
265    """Executes the provided SQL statement (returning a single row).
266
267    The execution happens in parallel across all the traces.
268
269    Args:
270      sql: The SQL statement to execute. This statement should return exactly
271        one row on any trace.
272
273    Returns:
274      A list of values with the result of executing the query (one per trace).
275
276    Raises:
277      TraceProcessorException: An error occurred running the query or more than
278        one result was returned.
279    """
280
281    def query_single_result_inner(tp):
282      df = tp.query(sql).as_pandas_dataframe()
283      if len(df.index) != 1:
284        raise TraceProcessorException("Query should only return a single row")
285
286      if len(df.columns) != 1:
287        raise TraceProcessorException(
288            "Query should only return a single column")
289
290      return df.iloc[0, 0]
291
292    return self.execute(query_single_result_inner)
293
294  def execute(self, fn: Callable[[TraceProcessor], Any]) -> List[Any]:
295    """Executes the provided function.
296
297    The execution happens in parallel across all the trace processor instances
298    owned by this object.
299
300    Args:
301      fn: The function to execute.
302
303    Returns:
304      A list of values with the result of executing the fucntion (one per
305      trace).
306    """
307
308    def wrapped(pair: Tuple[TraceProcessor, Metadata]):
309      (tp, metadata) = pair
310      return self._execute_handling_failure(fn, tp, metadata)
311
312    return list(self.query_executor.map(wrapped, self.tps_and_metadata))
313
314  def execute_and_flatten(self, fn: Callable[[TraceProcessor], pd.DataFrame]
315                         ) -> pd.DataFrame:
316    """Executes the provided function and flattens the result.
317
318    The execution happens in parallel across all the trace processor
319    instances owned by this object and the returned Pandas dataframes are
320    flattened into a single dataframe.
321
322    Args:
323      fn: The function to execute which returns a Pandas dataframe.
324
325    Returns:
326      A Pandas dataframe containing the result of executing the query across all
327      the traces. Extra columns containing the file path and args will
328      be added to the dataframe (see |query_and_flatten| for details).
329    """
330
331    def wrapped(pair: Tuple[TraceProcessor, Metadata]):
332      (tp, metadata) = pair
333      start = time.time()
334      df = self._execute_handling_failure(fn, tp, metadata)
335      end = time.time()
336      if self.observer:
337        self.observer.trace_processed(metadata, end - start)
338      for key, value in metadata.items():
339        df[key] = value
340      return df
341
342    df = pd.concat(
343        list(self.query_executor.map(wrapped, self.tps_and_metadata)))
344    return df.reset_index(drop=True)
345
346  def close(self):
347    """Closes this batch trace processor instance.
348
349    This closes all spawned trace processor instances, releasing all the memory
350    and resources those instances take.
351
352    No further calls to other methods in this class should be made after
353    calling this method.
354    """
355    if self.closed:
356      return
357    self.closed = True
358
359    if self.tps_and_metadata:
360      for tp, _ in self.tps_and_metadata:
361        tp.close()
362
363  def stats(self):
364    """Statistics about the operation of this batch trace processor instance.
365
366    See |Stats| class definition for the list of the statistics available."""
367    return self._stats
368
369  def _create_tp(self, trace: ResolverRegistry.Result
370                ) -> Optional[Tuple[TraceProcessor, Metadata]]:
371    try:
372      return TraceProcessor(
373          trace=trace.generator, config=self.config.tp_config), trace.metadata
374    except Exception as ex:
375      if self.config.load_failure_handling == FailureHandling.RAISE_EXCEPTION:
376        raise ex
377      self._stats.load_failures += 1
378      return None
379
380  def _execute_handling_failure(self, fn: Callable[[TraceProcessor], Any],
381                                tp: TraceProcessor, metadata: Metadata):
382    try:
383      return fn(tp)
384    except TraceProcessorException as ex:
385      if self.config.execute_failure_handling == \
386          FailureHandling.RAISE_EXCEPTION:
387        raise TraceProcessorException(f'{metadata} {ex}') from None
388      self._stats.execute_failures += 1
389      return pd.DataFrame()
390
391  def __enter__(self):
392    return self
393
394  def __exit__(self, a, b, c):
395    del a, b, c  # Unused.
396    self.close()
397    return False
398
399  def __del__(self):
400    self.close()
401