1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Wrapers for pyserial classes to log read and write data.""" 15 16import collections 17import logging 18from dataclasses import dataclass, field 19import time 20 21_LOG = logging.getLogger('pw_console') 22 23 24@dataclass 25class EventCountHistory: 26 """Track counts of events over time. 27 28 Example usage: :: 29 30 events = EventCountHistory( 31 base_count_units='Bytes', 32 display_unit_title='KiB/s', 33 display_unit_factor=0.001, 34 interval=1.0, 35 show_sparkline=True) 36 37 # Log 1 event now. 38 events.log(1) 39 time.sleep(1) 40 41 # Log 100 events at this time. 42 events.log(100) 43 time.sleep(1) 44 45 events.log(200) 46 time.sleep(1) 47 events.log(400) 48 print(events) 49 ▂▄█ 0.400 [KiB/s] 50 51 """ 52 53 base_count_units: str = 'Bytes' 54 display_unit_title: str = 'KiB/s' 55 display_unit_factor: float = 0.001 56 interval: float = 1.0 # Number of seconds per sum of events. 57 history_limit: int = 20 58 scale_characters = ' ▁▂▃▄▅▆▇█' 59 history: collections.deque = field(default_factory=collections.deque) 60 show_sparkline: bool = False 61 _this_count: int = 0 62 _last_count: int = 0 63 _last_update_time: float = field(default_factory=time.time) 64 65 def log(self, count: int) -> None: 66 self._this_count += count 67 68 this_time = time.time() 69 if this_time - self._last_update_time >= self.interval: 70 self._last_update_time = this_time 71 self._last_count = self._this_count 72 self._this_count = 0 73 self.history.append(self._last_count) 74 75 if len(self.history) > self.history_limit: 76 self.history.popleft() 77 78 def last_count(self) -> float: 79 return self._last_count * self.display_unit_factor 80 81 def last_count_raw(self) -> int: 82 return self._last_count 83 84 def last_count_with_units(self) -> str: 85 return '{:.3f} [{}]'.format( 86 self._last_count * self.display_unit_factor, self.display_unit_title 87 ) 88 89 def __repr__(self) -> str: 90 sparkline = '' 91 if self.show_sparkline: 92 sparkline = self.sparkline() 93 return ' '.join([sparkline, self.last_count_with_units()]) 94 95 def __pt_formatted_text__(self): 96 return [('', self.__repr__())] 97 98 def sparkline( 99 self, min_value: int = 0, max_value: int | None = None 100 ) -> str: 101 msg = ''.rjust(self.history_limit) 102 if len(self.history) == 0: 103 return msg 104 105 minimum = min_value 106 maximum = max_value if max_value else max(self.history) 107 max_minus_min = maximum - min_value 108 if max_minus_min == 0: 109 return msg 110 111 msg = '' 112 for i in self.history: 113 # (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min 114 index = int( 115 (((1.0 * i) - minimum) / max_minus_min) 116 * len(self.scale_characters) 117 ) 118 if index >= len(self.scale_characters): 119 index = len(self.scale_characters) - 1 120 msg += self.scale_characters[index] 121 return msg.rjust(self.history_limit) 122