1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Bandwidth Monitor Toolbar and Tracker.""" 15from __future__ import annotations 16 17from contextvars import ContextVar 18import logging 19import textwrap 20from typing import TYPE_CHECKING 21 22from prompt_toolkit.layout import WindowAlign 23 24from pw_console.plugin_mixin import PluginMixin 25from pw_console.widgets import ToolbarButton, WindowPaneToolbar 26from pw_console.widgets.event_count_history import EventCountHistory 27 28if TYPE_CHECKING: 29 from _typeshed import ReadableBuffer 30 31_LOG = logging.getLogger('pw_console.serial_debug_logger') 32 33 34def _log_hex_strings(data: bytes, prefix=''): 35 """Create alinged hex number and character view log messages.""" 36 # Make a list of 2 character hex number strings. 37 hex_numbers = textwrap.wrap(data.hex(), 2) 38 39 hex_chars = [ 40 ('<' + str(b.to_bytes(1, byteorder='big')) + '>') 41 .replace("<b'\\x", '', 1) # Remove b'\x from the beginning 42 .replace("<b'", '', 1) # Remove b' from the beginning 43 .replace("'>", '', 1) # Remove ' from the end 44 .rjust(2) 45 for b in data 46 ] 47 48 # Replace non-printable bytes with dots. 49 for i, num in enumerate(hex_numbers): 50 if num == hex_chars[i]: 51 hex_chars[i] = '..' 52 53 hex_numbers_msg = ' '.join(hex_numbers) 54 hex_chars_msg = ' '.join(hex_chars) 55 56 _LOG.debug( 57 '%s%s', 58 prefix, 59 hex_numbers_msg, 60 extra=dict( 61 extra_metadata_fields={ 62 'msg': hex_numbers_msg, 63 'view': 'hex', 64 } 65 ), 66 ) 67 _LOG.debug( 68 '%s%s', 69 prefix, 70 hex_chars_msg, 71 extra=dict( 72 extra_metadata_fields={ 73 'msg': hex_chars_msg, 74 'view': 'chars', 75 } 76 ), 77 ) 78 79 80BANDWIDTH_HISTORY_CONTEXTVAR = ContextVar( 81 'pw_console_bandwidth_history', 82 default={ 83 'total': EventCountHistory(interval=3), 84 'read': EventCountHistory(interval=3), 85 'write': EventCountHistory(interval=3), 86 }, 87) 88 89 90class SerialBandwidthTracker: 91 """Tracks and logs the data read and written by a serial tranport.""" 92 93 def __init__(self): 94 self.pw_bps_history = BANDWIDTH_HISTORY_CONTEXTVAR.get() 95 96 def track_read_data(self, data: bytes) -> None: 97 """Tracks and logs data read.""" 98 self.pw_bps_history['read'].log(len(data)) 99 self.pw_bps_history['total'].log(len(data)) 100 101 if len(data) > 0: 102 prefix = 'Read %2d B: ' % len(data) 103 _LOG.debug( 104 '%s%s', 105 prefix, 106 data, 107 extra=dict( 108 extra_metadata_fields={ 109 'mode': 'Read', 110 'bytes': len(data), 111 'view': 'bytes', 112 'msg': str(data), 113 } 114 ), 115 ) 116 _log_hex_strings(data, prefix=prefix) 117 118 # Print individual lines 119 for line in data.decode( 120 encoding='utf-8', errors='ignore' 121 ).splitlines(): 122 _LOG.debug( 123 '%s', 124 line, 125 extra=dict( 126 extra_metadata_fields={ 127 'msg': line, 128 'view': 'lines', 129 } 130 ), 131 ) 132 133 def track_write_data(self, data: ReadableBuffer) -> None: 134 """Tracks and logs data to be written.""" 135 if isinstance(data, bytes) and len(data) > 0: 136 self.pw_bps_history['write'].log(len(data)) 137 self.pw_bps_history['total'].log(len(data)) 138 139 prefix = 'Write %2d B: ' % len(data) 140 _LOG.debug( 141 '%s%s', 142 prefix, 143 data, 144 extra=dict( 145 extra_metadata_fields={ 146 'mode': 'Write', 147 'bytes': len(data), 148 'view': 'bytes', 149 'msg': str(data), 150 } 151 ), 152 ) 153 _log_hex_strings(data, prefix=prefix) 154 155 # Print individual lines 156 for line in data.decode( 157 encoding='utf-8', errors='ignore' 158 ).splitlines(): 159 _LOG.debug( 160 '%s', 161 line, 162 extra=dict( 163 extra_metadata_fields={ 164 'msg': line, 165 'view': 'lines', 166 } 167 ), 168 ) 169 170 171class BandwidthToolbar(WindowPaneToolbar, PluginMixin): 172 """Toolbar for displaying bandwidth history.""" 173 174 TOOLBAR_HEIGHT = 1 175 176 def _update_toolbar_text(self): 177 """Format toolbar text. 178 179 This queries pyserial_wrapper's EventCountHistory context var to 180 retrieve the byte count history for read, write and totals.""" 181 tokens = [] 182 self.plugin_logger.debug('BandwidthToolbar _update_toolbar_text') 183 184 for count_name, events in self.history.items(): 185 tokens.extend( 186 [ 187 ('', ' '), 188 ( 189 'class:theme-bg-active class:theme-fg-active', 190 ' {}: '.format(count_name.title()), 191 ), 192 ( 193 'class:theme-bg-active class:theme-fg-cyan', 194 '{:.3f} '.format(events.last_count()), 195 ), 196 ( 197 'class:theme-bg-active class:theme-fg-orange', 198 '{} '.format(events.display_unit_title), 199 ), 200 ] 201 ) 202 if count_name == 'total': 203 tokens.append( 204 ('class:theme-fg-cyan', '{}'.format(events.sparkline())) 205 ) 206 207 self.formatted_text = tokens 208 209 def get_left_text_tokens(self): 210 """Formatted text to display on the far left side.""" 211 return self.formatted_text 212 213 def get_right_text_tokens(self): 214 """Formatted text to display on the far right side.""" 215 return [('class:theme-fg-blue', 'Serial Bandwidth Usage ')] 216 217 def __init__(self, *args, **kwargs): 218 super().__init__( 219 *args, center_section_align=WindowAlign.RIGHT, **kwargs 220 ) 221 222 self.history = BANDWIDTH_HISTORY_CONTEXTVAR.get() 223 self.show_toolbar = True 224 self.formatted_text = [] 225 226 # Buttons for display in the center 227 self.add_button( 228 ToolbarButton( 229 description='Refresh', mouse_handler=self._update_toolbar_text 230 ) 231 ) 232 233 # Set plugin options 234 self.background_task_update_count: int = 0 235 self.plugin_init( 236 plugin_callback=self._background_task, 237 plugin_callback_frequency=1.0, 238 plugin_logger_name='pw_console_bandwidth_toolbar', 239 ) 240 241 def _background_task(self) -> bool: 242 self.background_task_update_count += 1 243 self._update_toolbar_text() 244 self.plugin_logger.debug( 245 'BandwidthToolbar Scheduled Update: #%s', 246 self.background_task_update_count, 247 ) 248 return True 249