xref: /aosp_15_r20/external/pigweed/pw_snapshot/py/pw_snapshot_metadata/metadata.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Library to assist processing Snapshot Metadata protos into text"""
15
16from typing import Mapping
17import pw_log_tokenized
18import pw_tokenizer
19from pw_tokenizer import proto as proto_detokenizer
20from pw_snapshot_metadata_proto import snapshot_metadata_pb2
21
22_PRETTY_FORMAT_DEFAULT_WIDTH = 80
23
24_FATAL = (
25    '▪▄▄▄ ▄▄▄· ▄▄▄▄▄ ▄▄▄· ▄ ·',
26    '█▄▄▄▐█ ▀█ • █▌ ▐█ ▀█ █  ',
27    '█ ▪ ▄█▀▀█   █. ▄█▀▀█ █  ',
28    '▐▌ .▐█ ▪▐▌ ▪▐▌·▐█ ▪▐▌▐▌ ',
29    '▀    ▀  ▀ ·  ▀  ▀  ▀ .▀▀',
30)
31
32
33def _process_tags(tags: Mapping[str, str]) -> str | None:
34    """Outputs snapshot tags as a multi-line string."""
35    if not tags:
36        return None
37
38    output: list[str] = ['Tags:']
39    for key, value in tags.items():
40        output.append(f'  {key}: {value}')
41
42    return '\n'.join(output)
43
44
45def process_snapshot(
46    serialized_snapshot: bytes, tokenizer_db: pw_tokenizer.Detokenizer | None
47) -> str:
48    """Processes snapshot metadata and tags, producing a multi-line string."""
49    snapshot = snapshot_metadata_pb2.SnapshotBasicInfo()
50    snapshot.ParseFromString(serialized_snapshot)
51
52    output: list[str] = []
53
54    if snapshot.HasField('metadata'):
55        output.extend(
56            (
57                str(MetadataProcessor(snapshot.metadata, tokenizer_db)),
58                '',
59            )
60        )
61
62    if snapshot.tags:
63        tags = _process_tags(snapshot.tags)
64        if tags:
65            output.append(tags)
66        # Trailing blank line for spacing.
67        output.append('')
68
69    return '\n'.join(output)
70
71
72class MetadataProcessor:
73    """This class simplifies dumping contents of a snapshot Metadata message."""
74
75    def __init__(
76        self,
77        metadata: snapshot_metadata_pb2.Metadata,
78        tokenizer_db: pw_tokenizer.Detokenizer | None = None,
79    ):
80        self._metadata = metadata
81        self._tokenizer_db = (
82            tokenizer_db
83            if tokenizer_db is not None
84            else pw_tokenizer.Detokenizer(None)
85        )
86        self._reason_token = self._tokenizer_db.detokenize(
87            metadata.reason
88        ).token
89        self._format_width = _PRETTY_FORMAT_DEFAULT_WIDTH
90        proto_detokenizer.detokenize_fields(self._tokenizer_db, self._metadata)
91
92    def is_fatal(self) -> bool:
93        return self._metadata.fatal
94
95    def reason(self) -> str:
96        if not self._metadata.reason:
97            return 'UNKNOWN (field missing)'
98
99        log = pw_log_tokenized.FormatStringWithMetadata(
100            self._metadata.reason.decode()
101        )
102
103        return f'{log.file}: {log.message}' if log.file else log.message
104
105    def reason_token(self) -> int | None:
106        """If the snapshot `reason` is tokenized, the value of the token."""
107        return self._reason_token
108
109    def project_name(self) -> str:
110        return self._metadata.project_name.decode()
111
112    def device_name(self) -> str:
113        return self._metadata.device_name.decode()
114
115    def device_fw_version(self) -> str:
116        return self._metadata.software_version
117
118    def snapshot_uuid(self) -> str:
119        return self._metadata.snapshot_uuid.hex()
120
121    def cpu_arch(self) -> str:
122        descriptor = (
123            snapshot_metadata_pb2.CpuArchitecture.DESCRIPTOR.enum_types_by_name[
124                'Enum'
125            ]
126        )
127        return descriptor.values_by_number[self._metadata.cpu_arch].name
128
129    def fw_build_uuid(self) -> str:
130        return self._metadata.software_build_uuid.hex()
131
132    def set_pretty_format_width(self, width: int):
133        """Sets the centered width of the FATAL text for a formatted output."""
134        self._format_width = width
135
136    def __str__(self) -> str:
137        """outputs a pw.snapshot.Metadata proto as a multi-line string."""
138        output: list[str] = []
139        if self._metadata.fatal:
140            output.extend(
141                (
142                    *[x.center(self._format_width).rstrip() for x in _FATAL],
143                    '',
144                    'Device crash cause:',
145                )
146            )
147        else:
148            output.append('Snapshot capture reason:')
149
150        output.extend(
151            (
152                '    ' + self.reason(),
153                '',
154            )
155        )
156        if self.reason_token():
157            output.append(f'Reason token:      0x{self.reason_token():x}')
158
159        if self._metadata.project_name:
160            output.append(f'Project name:      {self.project_name()}')
161
162        if self._metadata.device_name:
163            output.append(f'Device:            {self.device_name()}')
164
165        if self._metadata.cpu_arch:
166            output.append(f'CPU Arch:          {self.cpu_arch()}')
167
168        if self._metadata.software_version:
169            output.append(f'Device FW version: {self.device_fw_version()}')
170
171        if self._metadata.software_build_uuid:
172            output.append(f'FW build UUID:     {self.fw_build_uuid()}')
173
174        if self._metadata.snapshot_uuid:
175            output.append(f'Snapshot UUID:     {self.snapshot_uuid()}')
176
177        return '\n'.join(output)
178