1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tool for processing and outputting Snapshot protos as text""" 15 16import argparse 17import functools 18import logging 19import sys 20from pathlib import Path 21from typing import BinaryIO, TextIO, Callable 22import pw_tokenizer 23import pw_cpu_exception_cortex_m 24import pw_cpu_exception_risc_v 25import pw_build_info.build_id 26from pw_snapshot_metadata import metadata 27from pw_snapshot_metadata_proto import snapshot_metadata_pb2 28from pw_snapshot_protos import snapshot_pb2 29from pw_symbolizer import LlvmSymbolizer, Symbolizer 30from pw_thread import thread_analyzer 31from pw_chrono import timestamp_analyzer 32 33_LOG = logging.getLogger('snapshot_processor') 34 35_BRANDING = """ 36 ____ _ __ _____ _ _____ ____ _____ __ ______ ______ 37 / __ \\ | / / / ___// | / / | / __ \\/ ___// / / / __ \\/_ __/ 38 / /_/ / | /| / / \\__ \\/ |/ / /| | / /_/ /\\__ \\/ /_/ / / / / / / 39 / ____/| |/ |/ / ___/ / /| / ___ |/ ____/___/ / __ / /_/ / / / 40 /_/ |__/|__/____/____/_/ |_/_/ |_/_/ /____/_/ /_/\\____/ /_/ 41 /_____/ 42 43""" 44 45# Deprecated, use SymbolizerMatcher. Will be removed shortly. 46ElfMatcher = Callable[[snapshot_pb2.Snapshot], Path | None] 47 48# Symbolizers are useful for turning addresses into source code locations and 49# function names. As a single snapshot may contain embedded snapshots from 50# multiple devices, there's a need to match ELF files to the correct snapshot to 51# correctly symbolize addresses. 52# 53# A SymbolizerMatcher is a function that takes a snapshot and investigates its 54# metadata (often build ID, device name, or the version string) to determine 55# whether a Symbolizer may be loaded with a suitable ELF file for symbolization. 56SymbolizerMatcher = Callable[[snapshot_pb2.Snapshot], Symbolizer] 57 58 59def process_snapshot( 60 serialized_snapshot: bytes, 61 detokenizer: pw_tokenizer.Detokenizer | None = None, 62 elf_matcher: ElfMatcher | None = None, 63 symbolizer_matcher: SymbolizerMatcher | None = None, 64 llvm_symbolizer_binary: Path | None = None, 65 thread_processing_callback: Callable[[bytes], str] | None = None, 66) -> str: 67 """Processes a single snapshot.""" 68 69 output = [_BRANDING] 70 71 # Open a symbolizer. 72 snapshot = snapshot_pb2.Snapshot() 73 snapshot.ParseFromString(serialized_snapshot) 74 75 if symbolizer_matcher is not None: 76 symbolizer = symbolizer_matcher(snapshot) 77 elif elf_matcher is not None: 78 symbolizer = LlvmSymbolizer( 79 elf_matcher(snapshot), llvm_symbolizer_binary=llvm_symbolizer_binary 80 ) 81 else: 82 symbolizer = LlvmSymbolizer( 83 llvm_symbolizer_binary=llvm_symbolizer_binary 84 ) 85 86 captured_metadata = metadata.process_snapshot( 87 serialized_snapshot, detokenizer 88 ) 89 if captured_metadata: 90 output.append(captured_metadata) 91 92 # Create MetadataProcessor 93 snapshot_metadata = snapshot_metadata_pb2.SnapshotBasicInfo() 94 snapshot_metadata.ParseFromString(serialized_snapshot) 95 metadata_processor = metadata.MetadataProcessor( 96 snapshot_metadata.metadata, detokenizer 97 ) 98 99 # Check which CPU architecture to process the snapshot with. 100 if metadata_processor.cpu_arch().startswith("RV"): 101 risc_v_cpu_state = pw_cpu_exception_risc_v.process_snapshot( 102 serialized_snapshot, symbolizer 103 ) 104 if risc_v_cpu_state: 105 output.append(risc_v_cpu_state) 106 else: 107 cortex_m_cpu_state = pw_cpu_exception_cortex_m.process_snapshot( 108 serialized_snapshot, symbolizer 109 ) 110 if cortex_m_cpu_state: 111 output.append(cortex_m_cpu_state) 112 113 thread_info = thread_analyzer.process_snapshot( 114 serialized_snapshot, detokenizer, symbolizer, thread_processing_callback 115 ) 116 117 if thread_info: 118 output.append(thread_info) 119 120 timestamp_info = timestamp_analyzer.process_snapshot(serialized_snapshot) 121 122 if timestamp_info: 123 output.append(timestamp_info) 124 125 # Check and emit the number of related snapshots embedded in this snapshot. 126 if snapshot.related_snapshots: 127 snapshot_count = len(snapshot.related_snapshots) 128 plural = 's' if snapshot_count > 1 else '' 129 output.append( 130 f'This snapshot contains {snapshot_count} related snapshot{plural}' 131 ) 132 output.append('') 133 134 return '\n'.join(output) 135 136 137def process_snapshots( 138 serialized_snapshot: bytes, 139 detokenizer: pw_tokenizer.Detokenizer | None = None, 140 elf_matcher: ElfMatcher | None = None, 141 user_processing_callback: Callable[[bytes], str] | None = None, 142 symbolizer_matcher: SymbolizerMatcher | None = None, 143 thread_processing_callback: Callable[[snapshot_pb2.Snapshot, bytes], str] 144 | None = None, 145) -> str: 146 """Processes a snapshot that may have multiple embedded snapshots.""" 147 output = [] 148 # Process the top-level snapshot. 149 snapshot = snapshot_pb2.Snapshot() 150 snapshot.ParseFromString(serialized_snapshot) 151 152 callback: Callable[[bytes], str] | None = None 153 if thread_processing_callback: 154 callback = functools.partial(thread_processing_callback, snapshot) 155 156 output.append( 157 process_snapshot( 158 serialized_snapshot=serialized_snapshot, 159 detokenizer=detokenizer, 160 elf_matcher=elf_matcher, 161 symbolizer_matcher=symbolizer_matcher, 162 thread_processing_callback=callback, 163 ) 164 ) 165 166 # If the user provided a custom processing callback, call it on each 167 # snapshot. 168 if user_processing_callback is not None: 169 output.append(user_processing_callback(serialized_snapshot)) 170 171 # Process any related snapshots that were embedded in this one. 172 for nested_snapshot in snapshot.related_snapshots: 173 output.append('\n[' + '=' * 78 + ']\n') 174 output.append( 175 str( 176 process_snapshots( 177 nested_snapshot.SerializeToString(), 178 detokenizer, 179 elf_matcher, 180 user_processing_callback, 181 symbolizer_matcher, 182 thread_processing_callback, 183 ) 184 ) 185 ) 186 187 return '\n'.join(output) 188 189 190def _snapshot_symbolizer_matcher( 191 artifacts_dir: Path, snapshot: snapshot_pb2.Snapshot 192) -> LlvmSymbolizer: 193 matching_elf: Path | None = pw_build_info.build_id.find_matching_elf( 194 snapshot.metadata.software_build_uuid, artifacts_dir 195 ) 196 if not matching_elf: 197 _LOG.error( 198 'Error: No matching ELF found for GNU build ID %s.', 199 snapshot.metadata.software_build_uuid.hex(), 200 ) 201 return LlvmSymbolizer(matching_elf) 202 203 204def _load_and_dump_snapshots( 205 in_file: BinaryIO, 206 out_file: TextIO, 207 token_db: TextIO | None, 208 artifacts_dir: Path | None, 209): 210 detokenizer = None 211 if token_db: 212 detokenizer = pw_tokenizer.Detokenizer(token_db) 213 symbolizer_matcher: SymbolizerMatcher | None = None 214 if artifacts_dir: 215 symbolizer_matcher = functools.partial( 216 _snapshot_symbolizer_matcher, artifacts_dir 217 ) 218 out_file.write( 219 process_snapshots( 220 serialized_snapshot=in_file.read(), 221 detokenizer=detokenizer, 222 symbolizer_matcher=symbolizer_matcher, 223 ) 224 ) 225 226 227def _parse_args(): 228 parser = argparse.ArgumentParser(description='Decode Pigweed snapshots') 229 parser.add_argument( 230 'in_file', type=argparse.FileType('rb'), help='Binary snapshot file' 231 ) 232 parser.add_argument( 233 '--out-file', 234 '-o', 235 default='-', 236 type=argparse.FileType('wb'), 237 help='File to output decoded snapshots to. Defaults to stdout.', 238 ) 239 parser.add_argument( 240 '--token-db', 241 type=argparse.FileType('r'), 242 help='Token database or ELF file to use for detokenization.', 243 ) 244 parser.add_argument( 245 '--artifacts-dir', 246 type=Path, 247 help=( 248 'Directory to recursively search for matching ELF files to use ' 249 'for symbolization.' 250 ), 251 ) 252 return parser.parse_args() 253 254 255if __name__ == '__main__': 256 logging.basicConfig(format='%(message)s', level=logging.INFO) 257 _load_and_dump_snapshots(**vars(_parse_args())) 258 sys.exit(0) 259