xref: /aosp_15_r20/external/pigweed/pw_console/py/pw_console/plugins/bandwidth_toolbar.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Bandwidth Monitor Toolbar and Tracker."""
15from __future__ import annotations
16
17from contextvars import ContextVar
18import logging
19import textwrap
20from typing import TYPE_CHECKING
21
22from prompt_toolkit.layout import WindowAlign
23
24from pw_console.plugin_mixin import PluginMixin
25from pw_console.widgets import ToolbarButton, WindowPaneToolbar
26from pw_console.widgets.event_count_history import EventCountHistory
27
28if TYPE_CHECKING:
29    from _typeshed import ReadableBuffer
30
31_LOG = logging.getLogger('pw_console.serial_debug_logger')
32
33
34def _log_hex_strings(data: bytes, prefix=''):
35    """Create alinged hex number and character view log messages."""
36    # Make a list of 2 character hex number strings.
37    hex_numbers = textwrap.wrap(data.hex(), 2)
38
39    hex_chars = [
40        ('<' + str(b.to_bytes(1, byteorder='big')) + '>')
41        .replace("<b'\\x", '', 1)  # Remove b'\x from the beginning
42        .replace("<b'", '', 1)  # Remove b' from the beginning
43        .replace("'>", '', 1)  # Remove ' from the end
44        .rjust(2)
45        for b in data
46    ]
47
48    # Replace non-printable bytes with dots.
49    for i, num in enumerate(hex_numbers):
50        if num == hex_chars[i]:
51            hex_chars[i] = '..'
52
53    hex_numbers_msg = ' '.join(hex_numbers)
54    hex_chars_msg = ' '.join(hex_chars)
55
56    _LOG.debug(
57        '%s%s',
58        prefix,
59        hex_numbers_msg,
60        extra=dict(
61            extra_metadata_fields={
62                'msg': hex_numbers_msg,
63                'view': 'hex',
64            }
65        ),
66    )
67    _LOG.debug(
68        '%s%s',
69        prefix,
70        hex_chars_msg,
71        extra=dict(
72            extra_metadata_fields={
73                'msg': hex_chars_msg,
74                'view': 'chars',
75            }
76        ),
77    )
78
79
80BANDWIDTH_HISTORY_CONTEXTVAR = ContextVar(
81    'pw_console_bandwidth_history',
82    default={
83        'total': EventCountHistory(interval=3),
84        'read': EventCountHistory(interval=3),
85        'write': EventCountHistory(interval=3),
86    },
87)
88
89
90class SerialBandwidthTracker:
91    """Tracks and logs the data read and written by a serial tranport."""
92
93    def __init__(self):
94        self.pw_bps_history = BANDWIDTH_HISTORY_CONTEXTVAR.get()
95
96    def track_read_data(self, data: bytes) -> None:
97        """Tracks and logs data read."""
98        self.pw_bps_history['read'].log(len(data))
99        self.pw_bps_history['total'].log(len(data))
100
101        if len(data) > 0:
102            prefix = 'Read %2d B: ' % len(data)
103            _LOG.debug(
104                '%s%s',
105                prefix,
106                data,
107                extra=dict(
108                    extra_metadata_fields={
109                        'mode': 'Read',
110                        'bytes': len(data),
111                        'view': 'bytes',
112                        'msg': str(data),
113                    }
114                ),
115            )
116            _log_hex_strings(data, prefix=prefix)
117
118            # Print individual lines
119            for line in data.decode(
120                encoding='utf-8', errors='ignore'
121            ).splitlines():
122                _LOG.debug(
123                    '%s',
124                    line,
125                    extra=dict(
126                        extra_metadata_fields={
127                            'msg': line,
128                            'view': 'lines',
129                        }
130                    ),
131                )
132
133    def track_write_data(self, data: ReadableBuffer) -> None:
134        """Tracks and logs data to be written."""
135        if isinstance(data, bytes) and len(data) > 0:
136            self.pw_bps_history['write'].log(len(data))
137            self.pw_bps_history['total'].log(len(data))
138
139            prefix = 'Write %2d B: ' % len(data)
140            _LOG.debug(
141                '%s%s',
142                prefix,
143                data,
144                extra=dict(
145                    extra_metadata_fields={
146                        'mode': 'Write',
147                        'bytes': len(data),
148                        'view': 'bytes',
149                        'msg': str(data),
150                    }
151                ),
152            )
153            _log_hex_strings(data, prefix=prefix)
154
155            # Print individual lines
156            for line in data.decode(
157                encoding='utf-8', errors='ignore'
158            ).splitlines():
159                _LOG.debug(
160                    '%s',
161                    line,
162                    extra=dict(
163                        extra_metadata_fields={
164                            'msg': line,
165                            'view': 'lines',
166                        }
167                    ),
168                )
169
170
171class BandwidthToolbar(WindowPaneToolbar, PluginMixin):
172    """Toolbar for displaying bandwidth history."""
173
174    TOOLBAR_HEIGHT = 1
175
176    def _update_toolbar_text(self):
177        """Format toolbar text.
178
179        This queries pyserial_wrapper's EventCountHistory context var to
180        retrieve the byte count history for read, write and totals."""
181        tokens = []
182        self.plugin_logger.debug('BandwidthToolbar _update_toolbar_text')
183
184        for count_name, events in self.history.items():
185            tokens.extend(
186                [
187                    ('', '  '),
188                    (
189                        'class:theme-bg-active class:theme-fg-active',
190                        ' {}: '.format(count_name.title()),
191                    ),
192                    (
193                        'class:theme-bg-active class:theme-fg-cyan',
194                        '{:.3f} '.format(events.last_count()),
195                    ),
196                    (
197                        'class:theme-bg-active class:theme-fg-orange',
198                        '{} '.format(events.display_unit_title),
199                    ),
200                ]
201            )
202            if count_name == 'total':
203                tokens.append(
204                    ('class:theme-fg-cyan', '{}'.format(events.sparkline()))
205                )
206
207        self.formatted_text = tokens
208
209    def get_left_text_tokens(self):
210        """Formatted text to display on the far left side."""
211        return self.formatted_text
212
213    def get_right_text_tokens(self):
214        """Formatted text to display on the far right side."""
215        return [('class:theme-fg-blue', 'Serial Bandwidth Usage ')]
216
217    def __init__(self, *args, **kwargs):
218        super().__init__(
219            *args, center_section_align=WindowAlign.RIGHT, **kwargs
220        )
221
222        self.history = BANDWIDTH_HISTORY_CONTEXTVAR.get()
223        self.show_toolbar = True
224        self.formatted_text = []
225
226        # Buttons for display in the center
227        self.add_button(
228            ToolbarButton(
229                description='Refresh', mouse_handler=self._update_toolbar_text
230            )
231        )
232
233        # Set plugin options
234        self.background_task_update_count: int = 0
235        self.plugin_init(
236            plugin_callback=self._background_task,
237            plugin_callback_frequency=1.0,
238            plugin_logger_name='pw_console_bandwidth_toolbar',
239        )
240
241    def _background_task(self) -> bool:
242        self.background_task_update_count += 1
243        self._update_toolbar_text()
244        self.plugin_logger.debug(
245            'BandwidthToolbar Scheduled Update: #%s',
246            self.background_task_update_count,
247        )
248        return True
249