1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""LogStore saves logs and acts as a Python logging handler.""" 15 16from __future__ import annotations 17 18import collections 19import logging 20from datetime import datetime 21from typing import TYPE_CHECKING 22 23from pw_cli.color import colors as pw_cli_colors 24 25from pw_console.console_prefs import ConsolePrefs 26from pw_console.log_line import LogLine 27from pw_console.text_formatting import strip_ansi 28from pw_console.widgets.table import TableView 29 30if TYPE_CHECKING: 31 from pw_console.log_view import LogView 32 33 34class LogStore(logging.Handler): 35 """Pigweed Console logging handler. 36 37 This is a `Python logging.Handler 38 <https://docs.python.org/3/library/logging.html#handler-objects>`_ class 39 used to store logs for display in the pw_console user interface. 40 41 You may optionally add this as a handler to an existing logger 42 instances. This will be required if logs need to be captured for display in 43 the pw_console UI before the user interface is running. 44 45 Example usage: 46 47 .. code-block:: python 48 49 import logging 50 51 from pw_console import PwConsoleEmbed, LogStore 52 53 _DEVICE_LOG = logging.getLogger('usb_gadget') 54 55 # Create a log store and add as a handler. 56 device_log_store = LogStore() 57 _DEVICE_LOG.addHander(device_log_store) 58 59 # Start communication with your device here, before embedding 60 # pw_console. 61 62 # Create the pw_console embed instance 63 console = PwConsoleEmbed( 64 global_vars=globals(), 65 local_vars=locals(), 66 loggers={ 67 'Host Logs': [ 68 logging.getLogger(__package__), 69 logging.getLogger(__name__), 70 ], 71 # Set the LogStore as the value of this logger window. 72 'Device Logs': device_log_store, 73 }, 74 app_title='My Awesome Console', 75 ) 76 77 console.setup_python_logging() 78 console.embed() 79 """ 80 81 def __init__(self, prefs: ConsolePrefs | None = None): 82 """Initializes the LogStore instance.""" 83 84 # ConsolePrefs may not be passed on init. For example, if the user is 85 # creating a LogStore to capture log messages before console startup. 86 if not prefs: 87 prefs = ConsolePrefs( 88 project_file=False, project_user_file=False, user_file=False 89 ) 90 self.prefs = prefs 91 # Log storage deque for fast addition and deletion from the beginning 92 # and end of the iterable. 93 self.logs: collections.deque = collections.deque() 94 95 # Only allow this many log lines in memory. 96 self.max_history_size: int = 1000000 97 98 # Counts of logs per python logger name 99 self.channel_counts: dict[str, int] = {} 100 # Widths of each logger prefix string. For example: the character length 101 # of the timestamp string. 102 self.channel_formatted_prefix_widths: dict[str, int] = {} 103 # Longest of the above prefix widths. 104 self.longest_channel_prefix_width = 0 105 106 self.table: TableView = TableView(prefs=self.prefs) 107 108 # Erase existing logs. 109 self.clear_logs() 110 111 # List of viewers that should be notified on new log line arrival. 112 self.registered_viewers: list[LogView] = [] 113 114 super().__init__() 115 116 # Set formatting after logging.Handler init. 117 self.set_formatting() 118 119 def set_prefs(self, prefs: ConsolePrefs) -> None: 120 """Set the ConsolePrefs for this LogStore.""" 121 self.prefs = prefs 122 self.table.set_prefs(prefs) 123 124 def register_viewer(self, viewer: LogView) -> None: 125 """Register this LogStore with a LogView.""" 126 self.registered_viewers.append(viewer) 127 128 def set_formatting(self) -> None: 129 """Setup log formatting.""" 130 # Copy of pw_cli log formatter 131 colors = pw_cli_colors(True) 132 timestamp_prefix = colors.black_on_white('%(asctime)s') + ' ' 133 timestamp_format = '%Y%m%d %H:%M:%S' 134 format_string = timestamp_prefix + '%(levelname)s %(message)s' 135 formatter = logging.Formatter(format_string, timestamp_format) 136 137 self.setLevel(logging.DEBUG) 138 self.setFormatter(formatter) 139 140 # Update log time character width. 141 example_time_string = datetime.now().strftime(timestamp_format) 142 self.table.column_widths['time'] = len(example_time_string) 143 144 def clear_logs(self): 145 """Erase all stored pane lines.""" 146 self.logs = collections.deque() 147 self.channel_counts = {} 148 self.channel_formatted_prefix_widths = {} 149 self.line_index = 0 150 151 def get_channel_names(self) -> list[str]: 152 return list(sorted(self.channel_counts.keys())) 153 154 def get_channel_counts(self): 155 """Return the seen channel log counts for this conatiner.""" 156 return ', '.join( 157 [f'{name}: {count}' for name, count in self.channel_counts.items()] 158 ) 159 160 def get_total_count(self): 161 """Total size of the logs store.""" 162 return len(self.logs) 163 164 def get_last_log_index(self): 165 """Last valid index of the logs.""" 166 # Subtract 1 since self.logs is zero indexed. 167 total = self.get_total_count() 168 return 0 if total < 0 else total - 1 169 170 def _update_log_prefix_width(self, record: logging.LogRecord): 171 """Save the formatted prefix width if this is a new logger channel 172 name.""" 173 if self.formatter and ( 174 record.name not in self.channel_formatted_prefix_widths.keys() 175 ): 176 # Find the width of the formatted timestamp and level 177 format_string = ( 178 self.formatter._fmt # pylint: disable=protected-access 179 ) 180 181 # There may not be a _fmt defined. 182 if not format_string: 183 return 184 185 format_without_message = format_string.replace('%(message)s', '') 186 # If any other style parameters are left, get the width of them. 187 if ( 188 format_without_message 189 and 'asctime' in format_without_message 190 and 'levelname' in format_without_message 191 ): 192 formatted_time_and_level = format_without_message % dict( 193 asctime=record.asctime, levelname=record.levelname 194 ) 195 196 # Delete ANSI escape sequences. 197 ansi_stripped_time_and_level = strip_ansi( 198 formatted_time_and_level 199 ) 200 201 self.channel_formatted_prefix_widths[record.name] = len( 202 ansi_stripped_time_and_level 203 ) 204 else: 205 self.channel_formatted_prefix_widths[record.name] = 0 206 207 # Set the max width of all known formats so far. 208 self.longest_channel_prefix_width = max( 209 self.channel_formatted_prefix_widths.values() 210 ) 211 212 def _append_log(self, record: logging.LogRecord): 213 """Add a new log event.""" 214 # Format incoming log line. 215 formatted_log = self.format(record) 216 ansi_stripped_log = strip_ansi(formatted_log) 217 # Save this log. 218 self.logs.append( 219 LogLine( 220 record=record, 221 formatted_log=formatted_log, 222 ansi_stripped_log=ansi_stripped_log, 223 ) 224 ) 225 # Increment this logger count 226 self.channel_counts[record.name] = ( 227 self.channel_counts.get(record.name, 0) + 1 228 ) 229 230 # TODO: b/235271486 - Revisit calculating prefix widths automatically 231 # when line wrapping indentation is supported. 232 # Set the prefix width to 0 233 self.channel_formatted_prefix_widths[record.name] = 0 234 235 # Parse metadata fields 236 self.logs[-1].update_metadata() 237 238 # Check for bigger column widths. 239 self.table.update_metadata_column_widths(self.logs[-1]) 240 241 def emit(self, record) -> None: 242 """Process a new log record. 243 244 This defines the logging.Handler emit() fuction which is called by 245 logging.Handler.handle() We don't implement handle() as it is done in 246 the parent class with thread safety and filters applied. 247 """ 248 self._append_log(record) 249 # Notify viewers of new logs 250 for viewer in self.registered_viewers: 251 viewer.new_logs_arrived() 252 253 def render_table_header(self): 254 """Get pre-formatted table header.""" 255 return self.table.formatted_header() 256