xref: /aosp_15_r20/external/pigweed/pw_trace/py/pw_trace/trace.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15r"""
16
17Trace module which creates trace files from a list of trace events.
18
19This is a work in progress, future work will look to add:
20    - Config options to customize output.
21    - A method of providing custom data formatters.
22    - Perfetto support.
23"""
24from enum import Enum
25import json
26import logging
27import struct
28from typing import Iterable, NamedTuple
29
30_LOG = logging.getLogger('pw_trace')
31_ORDERING_CHARS = ("@", "=", "<", ">", "!")
32
33
34class TraceType(Enum):
35    INVALID = 0
36    INSTANTANEOUS = 1
37    INSTANTANEOUS_GROUP = 2
38    ASYNC_START = 3
39    ASYNC_STEP = 4
40    ASYNC_END = 5
41    DURATION_START = 6
42    DURATION_END = 7
43    DURATION_GROUP_START = 8
44    DURATION_GROUP_END = 9
45
46    # TODO(hepler): Remove these aliases for the original style-incompliant
47    #     names when users have migrated.
48    DurationStart = 6  # pylint: disable=invalid-name
49    DurationEnd = 7  # pylint: disable=invalid-name
50
51
52class TraceEvent(NamedTuple):
53    event_type: TraceType
54    module: str
55    label: str
56    timestamp_us: float
57    group: str = ""
58    trace_id: int = 0
59    flags: int = 0
60    has_data: bool = False
61    data_fmt: str = ""
62    data: bytes = b''
63
64
65def event_has_trace_id(event_type):
66    return event_type in {
67        "PW_TRACE_EVENT_TYPE_ASYNC_START",
68        "PW_TRACE_EVENT_TYPE_ASYNC_STEP",
69        "PW_TRACE_EVENT_TYPE_ASYNC_END",
70    }
71
72
73def decode_struct_fmt_args(event):
74    """Decodes the trace's event data for struct-formatted data"""
75    args = {}
76    # We assume all data is packed, little-endian ordering if not specified.
77    struct_fmt = event.data_fmt[len("@pw_py_struct_fmt:") :]
78    if not struct_fmt.startswith(_ORDERING_CHARS):
79        struct_fmt = "<" + struct_fmt
80    try:
81        # Assert is needed in case the buffer is larger than expected.
82        assert struct.calcsize(struct_fmt) == len(event.data)
83        items = struct.unpack_from(struct_fmt, event.data)
84        for i, item in enumerate(items):
85            # Try to decode the item in case it is a byte array (string) since
86            # the JSON lib cannot serialize byte arrays.
87            try:
88                args["data_" + str(i)] = item.decode()
89            except (UnicodeDecodeError, AttributeError):
90                args["data_" + str(i)] = item
91    except (AssertionError, struct.error):
92        args["error"] = (
93            f"Mismatched struct/data format {event.data_fmt} "
94            f"expected data len {struct.calcsize(struct_fmt)} "
95            f"data {event.data.hex()} "
96            f"data len {len(event.data)}"
97        )
98    return args
99
100
101def decode_map_fmt_args(event):
102    """Decodes the trace's event data for map-formatted data"""
103    args = {}
104    fmt = event.data_fmt[len("@pw_py_map_fmt:") :]
105
106    # We assume all data is packed, little-endian ordering if not specified.
107    if not fmt.startswith(_ORDERING_CHARS):
108        fmt = '<' + fmt
109
110    try:
111        (fmt_bytes, fmt_list) = fmt.split("{")
112        fmt_list = fmt_list.strip("}").split(",")
113
114        names = []
115        for pair in fmt_list:
116            (name, fmt_char) = (s.strip() for s in pair.split(":"))
117            names.append(name)
118            fmt_bytes += fmt_char
119    except ValueError:
120        args["error"] = f"Invalid map format {event.data_fmt}"
121    else:
122        try:
123            # Assert is needed in case the buffer is larger than expected.
124            assert struct.calcsize(fmt_bytes) == len(event.data)
125            items = struct.unpack_from(fmt_bytes, event.data)
126            for i, item in enumerate(items):
127                # Try to decode the item in case it is a byte array (string)
128                # since the JSON lib cannot serialize byte arrays.
129                try:
130                    args[names[i]] = item.decode()
131                except (UnicodeDecodeError, AttributeError):
132                    args[names[i]] = item
133        except (AssertionError, struct.error):
134            args["error"] = (
135                f"Mismatched map/data format {event.data_fmt} "
136                f"expected data len {struct.calcsize(fmt_bytes)} "
137                f"data {event.data.hex()} "
138                f"data len {len(event.data)}"
139            )
140    return args
141
142
143def generate_trace_json(events: Iterable[TraceEvent]):
144    """Generates a list of JSON lines from provided trace events."""
145    json_lines = []
146    for event in events:
147        if (
148            event.module is None
149            or event.timestamp_us is None
150            or event.event_type is None
151            or event.label is None
152        ):
153            _LOG.error("Invalid sample")
154            continue
155
156        line = {
157            "pid": event.module,
158            "name": (event.label),
159            "ts": event.timestamp_us,
160        }
161        if event.event_type == TraceType.DURATION_START:
162            line["ph"] = "B"
163            line["tid"] = event.label
164        elif event.event_type == TraceType.DURATION_END:
165            line["ph"] = "E"
166            line["tid"] = event.label
167        elif event.event_type == TraceType.DURATION_GROUP_START:
168            line["ph"] = "B"
169            line["tid"] = event.group
170        elif event.event_type == TraceType.DURATION_GROUP_END:
171            line["ph"] = "E"
172            line["tid"] = event.group
173        elif event.event_type == TraceType.INSTANTANEOUS:
174            line["ph"] = "I"
175            line["s"] = "p"
176        elif event.event_type == TraceType.INSTANTANEOUS_GROUP:
177            line["ph"] = "I"
178            line["s"] = "t"
179            line["tid"] = event.group
180        elif event.event_type == TraceType.ASYNC_START:
181            line["ph"] = "b"
182            line["scope"] = event.group
183            line["tid"] = event.group
184            line["cat"] = event.module
185            line["id"] = event.trace_id
186            line["args"] = {"id": line["id"]}
187        elif event.event_type == TraceType.ASYNC_STEP:
188            line["ph"] = "n"
189            line["scope"] = event.group
190            line["tid"] = event.group
191            line["cat"] = event.module
192            line["id"] = event.trace_id
193            line["args"] = {"id": line["id"]}
194        elif event.event_type == TraceType.ASYNC_END:
195            line["ph"] = "e"
196            line["scope"] = event.group
197            line["tid"] = event.group
198            line["cat"] = event.module
199            line["id"] = event.trace_id
200            line["args"] = {"id": line["id"]}
201        else:
202            _LOG.error("Unknown event type, skipping")
203            continue
204
205        # Handle Data
206        if event.has_data:
207            if event.data_fmt == "@pw_arg_label":
208                line["name"] = event.data.decode("utf-8")
209            elif event.data_fmt == "@pw_arg_group":
210                line["tid"] = event.data.decode("utf-8")
211            elif event.data_fmt == "@pw_arg_counter":
212                line["ph"] = "C"
213                line["args"] = {
214                    line["name"]: int.from_bytes(event.data, "little")
215                }
216            elif event.data_fmt.startswith("@pw_py_struct_fmt:"):
217                line["args"] = decode_struct_fmt_args(event)
218            elif event.data_fmt.startswith("@pw_py_map_fmt:"):
219                line["args"] = decode_map_fmt_args(event)
220            else:
221                line["args"] = {"data": event.data.hex()}
222
223        # Encode as JSON
224        json_lines.append(json.dumps(line))
225
226    return json_lines
227