xref: /aosp_15_r20/external/pigweed/pw_snapshot/py/pw_snapshot/processor.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tool for processing and outputting Snapshot protos as text"""
15
16import argparse
17import functools
18import logging
19import sys
20from pathlib import Path
21from typing import BinaryIO, TextIO, Callable
22import pw_tokenizer
23import pw_cpu_exception_cortex_m
24import pw_cpu_exception_risc_v
25import pw_build_info.build_id
26from pw_snapshot_metadata import metadata
27from pw_snapshot_metadata_proto import snapshot_metadata_pb2
28from pw_snapshot_protos import snapshot_pb2
29from pw_symbolizer import LlvmSymbolizer, Symbolizer
30from pw_thread import thread_analyzer
31from pw_chrono import timestamp_analyzer
32
33_LOG = logging.getLogger('snapshot_processor')
34
35_BRANDING = """
36        ____ _       __    _____ _   _____    ____  _____ __  ______  ______
37       / __ \\ |     / /   / ___// | / /   |  / __ \\/ ___// / / / __ \\/_  __/
38      / /_/ / | /| / /    \\__ \\/  |/ / /| | / /_/ /\\__ \\/ /_/ / / / / / /
39     / ____/| |/ |/ /    ___/ / /|  / ___ |/ ____/___/ / __  / /_/ / / /
40    /_/     |__/|__/____/____/_/ |_/_/  |_/_/    /____/_/ /_/\\____/ /_/
41                  /_____/
42
43"""
44
45# Deprecated, use SymbolizerMatcher. Will be removed shortly.
46ElfMatcher = Callable[[snapshot_pb2.Snapshot], Path | None]
47
48# Symbolizers are useful for turning addresses into source code locations and
49# function names. As a single snapshot may contain embedded snapshots from
50# multiple devices, there's a need to match ELF files to the correct snapshot to
51# correctly symbolize addresses.
52#
53# A SymbolizerMatcher is a function that takes a snapshot and investigates its
54# metadata (often build ID, device name, or the version string) to determine
55# whether a Symbolizer may be loaded with a suitable ELF file for symbolization.
56SymbolizerMatcher = Callable[[snapshot_pb2.Snapshot], Symbolizer]
57
58
59def process_snapshot(
60    serialized_snapshot: bytes,
61    detokenizer: pw_tokenizer.Detokenizer | None = None,
62    elf_matcher: ElfMatcher | None = None,
63    symbolizer_matcher: SymbolizerMatcher | None = None,
64    llvm_symbolizer_binary: Path | None = None,
65    thread_processing_callback: Callable[[bytes], str] | None = None,
66) -> str:
67    """Processes a single snapshot."""
68
69    output = [_BRANDING]
70
71    # Open a symbolizer.
72    snapshot = snapshot_pb2.Snapshot()
73    snapshot.ParseFromString(serialized_snapshot)
74
75    if symbolizer_matcher is not None:
76        symbolizer = symbolizer_matcher(snapshot)
77    elif elf_matcher is not None:
78        symbolizer = LlvmSymbolizer(
79            elf_matcher(snapshot), llvm_symbolizer_binary=llvm_symbolizer_binary
80        )
81    else:
82        symbolizer = LlvmSymbolizer(
83            llvm_symbolizer_binary=llvm_symbolizer_binary
84        )
85
86    captured_metadata = metadata.process_snapshot(
87        serialized_snapshot, detokenizer
88    )
89    if captured_metadata:
90        output.append(captured_metadata)
91
92    # Create MetadataProcessor
93    snapshot_metadata = snapshot_metadata_pb2.SnapshotBasicInfo()
94    snapshot_metadata.ParseFromString(serialized_snapshot)
95    metadata_processor = metadata.MetadataProcessor(
96        snapshot_metadata.metadata, detokenizer
97    )
98
99    # Check which CPU architecture to process the snapshot with.
100    if metadata_processor.cpu_arch().startswith("RV"):
101        risc_v_cpu_state = pw_cpu_exception_risc_v.process_snapshot(
102            serialized_snapshot, symbolizer
103        )
104        if risc_v_cpu_state:
105            output.append(risc_v_cpu_state)
106    else:
107        cortex_m_cpu_state = pw_cpu_exception_cortex_m.process_snapshot(
108            serialized_snapshot, symbolizer
109        )
110        if cortex_m_cpu_state:
111            output.append(cortex_m_cpu_state)
112
113    thread_info = thread_analyzer.process_snapshot(
114        serialized_snapshot, detokenizer, symbolizer, thread_processing_callback
115    )
116
117    if thread_info:
118        output.append(thread_info)
119
120    timestamp_info = timestamp_analyzer.process_snapshot(serialized_snapshot)
121
122    if timestamp_info:
123        output.append(timestamp_info)
124
125    # Check and emit the number of related snapshots embedded in this snapshot.
126    if snapshot.related_snapshots:
127        snapshot_count = len(snapshot.related_snapshots)
128        plural = 's' if snapshot_count > 1 else ''
129        output.append(
130            f'This snapshot contains {snapshot_count} related snapshot{plural}'
131        )
132        output.append('')
133
134    return '\n'.join(output)
135
136
137def process_snapshots(
138    serialized_snapshot: bytes,
139    detokenizer: pw_tokenizer.Detokenizer | None = None,
140    elf_matcher: ElfMatcher | None = None,
141    user_processing_callback: Callable[[bytes], str] | None = None,
142    symbolizer_matcher: SymbolizerMatcher | None = None,
143    thread_processing_callback: Callable[[snapshot_pb2.Snapshot, bytes], str]
144    | None = None,
145) -> str:
146    """Processes a snapshot that may have multiple embedded snapshots."""
147    output = []
148    # Process the top-level snapshot.
149    snapshot = snapshot_pb2.Snapshot()
150    snapshot.ParseFromString(serialized_snapshot)
151
152    callback: Callable[[bytes], str] | None = None
153    if thread_processing_callback:
154        callback = functools.partial(thread_processing_callback, snapshot)
155
156    output.append(
157        process_snapshot(
158            serialized_snapshot=serialized_snapshot,
159            detokenizer=detokenizer,
160            elf_matcher=elf_matcher,
161            symbolizer_matcher=symbolizer_matcher,
162            thread_processing_callback=callback,
163        )
164    )
165
166    # If the user provided a custom processing callback, call it on each
167    # snapshot.
168    if user_processing_callback is not None:
169        output.append(user_processing_callback(serialized_snapshot))
170
171    # Process any related snapshots that were embedded in this one.
172    for nested_snapshot in snapshot.related_snapshots:
173        output.append('\n[' + '=' * 78 + ']\n')
174        output.append(
175            str(
176                process_snapshots(
177                    nested_snapshot.SerializeToString(),
178                    detokenizer,
179                    elf_matcher,
180                    user_processing_callback,
181                    symbolizer_matcher,
182                    thread_processing_callback,
183                )
184            )
185        )
186
187    return '\n'.join(output)
188
189
190def _snapshot_symbolizer_matcher(
191    artifacts_dir: Path, snapshot: snapshot_pb2.Snapshot
192) -> LlvmSymbolizer:
193    matching_elf: Path | None = pw_build_info.build_id.find_matching_elf(
194        snapshot.metadata.software_build_uuid, artifacts_dir
195    )
196    if not matching_elf:
197        _LOG.error(
198            'Error: No matching ELF found for GNU build ID %s.',
199            snapshot.metadata.software_build_uuid.hex(),
200        )
201    return LlvmSymbolizer(matching_elf)
202
203
204def _load_and_dump_snapshots(
205    in_file: BinaryIO,
206    out_file: TextIO,
207    token_db: TextIO | None,
208    artifacts_dir: Path | None,
209):
210    detokenizer = None
211    if token_db:
212        detokenizer = pw_tokenizer.Detokenizer(token_db)
213    symbolizer_matcher: SymbolizerMatcher | None = None
214    if artifacts_dir:
215        symbolizer_matcher = functools.partial(
216            _snapshot_symbolizer_matcher, artifacts_dir
217        )
218    out_file.write(
219        process_snapshots(
220            serialized_snapshot=in_file.read(),
221            detokenizer=detokenizer,
222            symbolizer_matcher=symbolizer_matcher,
223        )
224    )
225
226
227def _parse_args():
228    parser = argparse.ArgumentParser(description='Decode Pigweed snapshots')
229    parser.add_argument(
230        'in_file', type=argparse.FileType('rb'), help='Binary snapshot file'
231    )
232    parser.add_argument(
233        '--out-file',
234        '-o',
235        default='-',
236        type=argparse.FileType('wb'),
237        help='File to output decoded snapshots to. Defaults to stdout.',
238    )
239    parser.add_argument(
240        '--token-db',
241        type=argparse.FileType('r'),
242        help='Token database or ELF file to use for detokenization.',
243    )
244    parser.add_argument(
245        '--artifacts-dir',
246        type=Path,
247        help=(
248            'Directory to recursively search for matching ELF files to use '
249            'for symbolization.'
250        ),
251    )
252    return parser.parse_args()
253
254
255if __name__ == '__main__':
256    logging.basicConfig(format='%(message)s', level=logging.INFO)
257    _load_and_dump_snapshots(**vars(_parse_args()))
258    sys.exit(0)
259