xref: /aosp_15_r20/external/pigweed/pw_console/py/pw_console/log_store.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""LogStore saves logs and acts as a Python logging handler."""
15
16from __future__ import annotations
17
18import collections
19import logging
20from datetime import datetime
21from typing import TYPE_CHECKING
22
23from pw_cli.color import colors as pw_cli_colors
24
25from pw_console.console_prefs import ConsolePrefs
26from pw_console.log_line import LogLine
27from pw_console.text_formatting import strip_ansi
28from pw_console.widgets.table import TableView
29
30if TYPE_CHECKING:
31    from pw_console.log_view import LogView
32
33
34class LogStore(logging.Handler):
35    """Pigweed Console logging handler.
36
37    This is a `Python logging.Handler
38    <https://docs.python.org/3/library/logging.html#handler-objects>`_ class
39    used to store logs for display in the pw_console user interface.
40
41    You may optionally add this as a handler to an existing logger
42    instances. This will be required if logs need to be captured for display in
43    the pw_console UI before the user interface is running.
44
45    Example usage:
46
47    .. code-block:: python
48
49        import logging
50
51        from pw_console import PwConsoleEmbed, LogStore
52
53        _DEVICE_LOG = logging.getLogger('usb_gadget')
54
55        # Create a log store and add as a handler.
56        device_log_store = LogStore()
57        _DEVICE_LOG.addHander(device_log_store)
58
59        # Start communication with your device here, before embedding
60        # pw_console.
61
62        # Create the pw_console embed instance
63        console = PwConsoleEmbed(
64            global_vars=globals(),
65            local_vars=locals(),
66            loggers={
67                'Host Logs': [
68                    logging.getLogger(__package__),
69                    logging.getLogger(__name__),
70                ],
71                # Set the LogStore as the value of this logger window.
72                'Device Logs': device_log_store,
73            },
74            app_title='My Awesome Console',
75        )
76
77        console.setup_python_logging()
78        console.embed()
79    """
80
81    def __init__(self, prefs: ConsolePrefs | None = None):
82        """Initializes the LogStore instance."""
83
84        # ConsolePrefs may not be passed on init. For example, if the user is
85        # creating a LogStore to capture log messages before console startup.
86        if not prefs:
87            prefs = ConsolePrefs(
88                project_file=False, project_user_file=False, user_file=False
89            )
90        self.prefs = prefs
91        # Log storage deque for fast addition and deletion from the beginning
92        # and end of the iterable.
93        self.logs: collections.deque = collections.deque()
94
95        # Only allow this many log lines in memory.
96        self.max_history_size: int = 1000000
97
98        # Counts of logs per python logger name
99        self.channel_counts: dict[str, int] = {}
100        # Widths of each logger prefix string. For example: the character length
101        # of the timestamp string.
102        self.channel_formatted_prefix_widths: dict[str, int] = {}
103        # Longest of the above prefix widths.
104        self.longest_channel_prefix_width = 0
105
106        self.table: TableView = TableView(prefs=self.prefs)
107
108        # Erase existing logs.
109        self.clear_logs()
110
111        # List of viewers that should be notified on new log line arrival.
112        self.registered_viewers: list[LogView] = []
113
114        super().__init__()
115
116        # Set formatting after logging.Handler init.
117        self.set_formatting()
118
119    def set_prefs(self, prefs: ConsolePrefs) -> None:
120        """Set the ConsolePrefs for this LogStore."""
121        self.prefs = prefs
122        self.table.set_prefs(prefs)
123
124    def register_viewer(self, viewer: LogView) -> None:
125        """Register this LogStore with a LogView."""
126        self.registered_viewers.append(viewer)
127
128    def set_formatting(self) -> None:
129        """Setup log formatting."""
130        # Copy of pw_cli log formatter
131        colors = pw_cli_colors(True)
132        timestamp_prefix = colors.black_on_white('%(asctime)s') + ' '
133        timestamp_format = '%Y%m%d %H:%M:%S'
134        format_string = timestamp_prefix + '%(levelname)s %(message)s'
135        formatter = logging.Formatter(format_string, timestamp_format)
136
137        self.setLevel(logging.DEBUG)
138        self.setFormatter(formatter)
139
140        # Update log time character width.
141        example_time_string = datetime.now().strftime(timestamp_format)
142        self.table.column_widths['time'] = len(example_time_string)
143
144    def clear_logs(self):
145        """Erase all stored pane lines."""
146        self.logs = collections.deque()
147        self.channel_counts = {}
148        self.channel_formatted_prefix_widths = {}
149        self.line_index = 0
150
151    def get_channel_names(self) -> list[str]:
152        return list(sorted(self.channel_counts.keys()))
153
154    def get_channel_counts(self):
155        """Return the seen channel log counts for this conatiner."""
156        return ', '.join(
157            [f'{name}: {count}' for name, count in self.channel_counts.items()]
158        )
159
160    def get_total_count(self):
161        """Total size of the logs store."""
162        return len(self.logs)
163
164    def get_last_log_index(self):
165        """Last valid index of the logs."""
166        # Subtract 1 since self.logs is zero indexed.
167        total = self.get_total_count()
168        return 0 if total < 0 else total - 1
169
170    def _update_log_prefix_width(self, record: logging.LogRecord):
171        """Save the formatted prefix width if this is a new logger channel
172        name."""
173        if self.formatter and (
174            record.name not in self.channel_formatted_prefix_widths.keys()
175        ):
176            # Find the width of the formatted timestamp and level
177            format_string = (
178                self.formatter._fmt  # pylint: disable=protected-access
179            )
180
181            # There may not be a _fmt defined.
182            if not format_string:
183                return
184
185            format_without_message = format_string.replace('%(message)s', '')
186            # If any other style parameters are left, get the width of them.
187            if (
188                format_without_message
189                and 'asctime' in format_without_message
190                and 'levelname' in format_without_message
191            ):
192                formatted_time_and_level = format_without_message % dict(
193                    asctime=record.asctime, levelname=record.levelname
194                )
195
196                # Delete ANSI escape sequences.
197                ansi_stripped_time_and_level = strip_ansi(
198                    formatted_time_and_level
199                )
200
201                self.channel_formatted_prefix_widths[record.name] = len(
202                    ansi_stripped_time_and_level
203                )
204            else:
205                self.channel_formatted_prefix_widths[record.name] = 0
206
207            # Set the max width of all known formats so far.
208            self.longest_channel_prefix_width = max(
209                self.channel_formatted_prefix_widths.values()
210            )
211
212    def _append_log(self, record: logging.LogRecord):
213        """Add a new log event."""
214        # Format incoming log line.
215        formatted_log = self.format(record)
216        ansi_stripped_log = strip_ansi(formatted_log)
217        # Save this log.
218        self.logs.append(
219            LogLine(
220                record=record,
221                formatted_log=formatted_log,
222                ansi_stripped_log=ansi_stripped_log,
223            )
224        )
225        # Increment this logger count
226        self.channel_counts[record.name] = (
227            self.channel_counts.get(record.name, 0) + 1
228        )
229
230        # TODO: b/235271486 - Revisit calculating prefix widths automatically
231        # when line wrapping indentation is supported.
232        # Set the prefix width to 0
233        self.channel_formatted_prefix_widths[record.name] = 0
234
235        # Parse metadata fields
236        self.logs[-1].update_metadata()
237
238        # Check for bigger column widths.
239        self.table.update_metadata_column_widths(self.logs[-1])
240
241    def emit(self, record) -> None:
242        """Process a new log record.
243
244        This defines the logging.Handler emit() fuction which is called by
245        logging.Handler.handle() We don't implement handle() as it is done in
246        the parent class with thread safety and filters applied.
247        """
248        self._append_log(record)
249        # Notify viewers of new logs
250        for viewer in self.registered_viewers:
251            viewer.new_logs_arrived()
252
253    def render_table_header(self):
254        """Get pre-formatted table header."""
255        return self.table.formatted_header()
256