1# Copyright 2022 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""LogScreen tracks lines to display on screen with a set of ScreenLines.""" 15 16from __future__ import annotations 17import collections 18import dataclasses 19import logging 20from typing import Callable, TYPE_CHECKING 21 22from prompt_toolkit.formatted_text import ( 23 to_formatted_text, 24 StyleAndTextTuples, 25) 26 27from pw_console.log_filter import LogFilter 28from pw_console.text_formatting import ( 29 fill_character_width, 30 insert_linebreaks, 31 split_lines, 32) 33 34if TYPE_CHECKING: 35 from pw_console.log_line import LogLine 36 from pw_console.log_pane import LogPane 37 38_LOG = logging.getLogger(__package__) 39 40 41@dataclasses.dataclass 42class ScreenLine: 43 """A single line of text for displaying on screen. 44 45 Instances of ScreenLine are stored in a LogScreen's line_buffer deque. When 46 a new log message is added it may be converted into multiple ScreenLine 47 instances if the text is wrapped across multiple lines. 48 49 For example: say our screen is 80 characters wide and a log message 240 50 characters long needs to be displayed. With line wrapping on we will need 51 240/80 = 3 lines to show the full message. Say also that this single log 52 message is at index #5 in the LogStore classes deque, this is the log_index 53 value. This single log message will then be split into 3 separate 54 ScreenLine instances: 55 56 :: 57 ScreenLine(fragments=[('', 'Log message text line one')], 58 log_index=5, subline=0, height=3) 59 ScreenLine(fragments=[('', 'Log message text line two')], 60 log_index=5, subline=1, height=3) 61 ScreenLine(fragments=[('', 'Log message text line three')], 62 log_index=5, subline=2, height=3) 63 64 Each `fragments` attribute will store the formatted text indended to be 65 direcly drawn to the screen. Since these three lines are all displaying the 66 same log message their `log_index` reference will be the same. The `subline` 67 attribute is the zero-indexed number for this log's wrapped line count and 68 `height` is the total ScreenLines needed to show this log message. 69 70 Continuing with this example say the next two log messages to display both 71 fit on screen with no wrapping. They will both be represented with one 72 ScreenLine each: 73 74 :: 75 ScreenLine(fragments=[('', 'Another log message')], 76 log_index=6, subline=0, height=1) 77 ScreenLine(fragments=[('', 'Yet another log message')], 78 log_index=7, subline=0, height=1) 79 80 The `log_index` is different for each since these are both separate 81 logs. The subline is 0 since each line is the first one for this log. Both 82 have a height of 1 since no line wrapping was performed. 83 """ 84 85 # The StyleAndTextTuples for this line ending with a '\n'. These are the raw 86 # prompt_toolkit formatted text tuples to display on screen. The colors and 87 # spacing can change depending on the formatters used in the 88 # LogScreen._get_fragments_per_line() function. 89 fragments: StyleAndTextTuples 90 91 # Log index reference for this screen line. This is the index to where the 92 # log message resides in the parent LogStore.logs deque. It is set to None 93 # if this is an empty ScreenLine. If a log message requires line wrapping 94 # then each resulting ScreenLine instance will have the same log_index 95 # value. 96 # 97 # This log_index may also be the integer index into a LogView.filtered_logs 98 # deque depending on if log messages are being filtered by the user. The 99 # LogScreen class below doesn't need to do anything different in either 100 # case. It's the responsibility of LogScreen.get_log_source() to return the 101 # correct source. 102 # 103 # Note this is NOT an index into LogScreen.line_buffer. 104 log_index: int | None = None 105 106 # Keep track the total height and subline number for this log message. 107 # For example this line could be subline (0, 1, or 2) of a log message with 108 # a total height 3. 109 110 # Subline index. 111 subline: int = 0 112 # Total height in lines of text (also ScreenLine count) that the log message 113 # referred to by log_index requires. When a log message is split across 114 # multiple lines height will be set to the same value for each ScreenLine 115 # instance. 116 height: int = 1 117 118 # Empty lines will have no log_index 119 def empty(self) -> bool: 120 return self.log_index is None 121 122 123@dataclasses.dataclass 124class LogScreen: 125 """LogScreen maintains the state of visible logs on screen. 126 127 It is responsible for moving the cursor_position, prepending and appending 128 log lines as the user moves the cursor.""" 129 130 # Callable functions to retrieve logs and display formatting. 131 get_log_source: Callable[[], tuple[int, collections.deque[LogLine]]] 132 get_line_wrapping: Callable[[], bool] 133 get_log_formatter: Callable[ 134 [], Callable[[LogLine], StyleAndTextTuples] | None 135 ] 136 get_search_filter: Callable[[], LogFilter | None] 137 get_search_highlight: Callable[[], bool] 138 139 # Window row of the current cursor position 140 cursor_position: int = 0 141 # Screen width and height in number of characters. 142 width: int = 0 143 height: int = 0 144 # Buffer of literal text lines to be displayed on screen. Each visual line 145 # is represented by a ScreenLine instance and will have a max width equal 146 # to the screen's width. If any given whole log message requires line 147 # wrapping to be displayed it will be represented by multiple ScreenLine 148 # instances in this deque. 149 line_buffer: collections.deque[ScreenLine] = dataclasses.field( 150 default_factory=collections.deque 151 ) 152 153 def __post_init__(self) -> None: 154 # Empty screen flag. Will be true if the screen contains only newlines. 155 self._empty: bool = True 156 # Save the last log index when appending. Useful for tracking how many 157 # new lines need appending in follow mode. 158 self.last_appended_log_index: int = 0 159 160 def _fill_top_with_empty_lines(self) -> None: 161 """Add empty lines to fill the remaining empty screen space.""" 162 for _ in range(self.height - len(self.line_buffer)): 163 self.line_buffer.appendleft(ScreenLine([('', '')])) 164 165 def clear_screen(self) -> None: 166 """Erase all lines and fill with empty lines.""" 167 self.line_buffer.clear() 168 self._fill_top_with_empty_lines() 169 self._empty = True 170 171 def empty(self) -> bool: 172 """Return True if the screen has no lines with content.""" 173 return self._empty 174 175 def reset_logs( 176 self, 177 log_index: int = 0, 178 ) -> None: 179 """Erase the screen and append logs starting from log_index.""" 180 self.clear_screen() 181 182 start_log_index, log_source = self.get_log_source() 183 if len(log_source) == 0: 184 return 185 186 # Append at most at most the window height number worth of logs. If the 187 # number of available logs is less, use that amount. 188 max_log_messages_to_fetch = min(self.height, len(log_source)) 189 190 # Including the target log_index, fetch the desired logs. 191 # For example if we are rendering log_index 10 and the window height is 192 # 6 the range below will be: 193 # >>> list(i for i in range((10 - 6) + 1, 10 + 1)) 194 # [5, 6, 7, 8, 9, 10] 195 for i in range( 196 (log_index - max_log_messages_to_fetch) + 1, log_index + 1 197 ): 198 # If i is < 0 it's an invalid log, skip to the next line. The next 199 # index could be 0 or higher since we are traversing in increasing 200 # order. 201 if i < start_log_index: 202 continue 203 self.append_log(i) 204 # Make sure the bottom line is highlighted. 205 self.move_cursor_to_bottom() 206 207 def resize(self, width, height) -> None: 208 """Update screen width and height. 209 210 Following a resize the caller should run reset_logs().""" 211 self.width = width 212 self.height = height 213 214 def get_lines( 215 self, 216 marked_logs_start: int | None = None, 217 marked_logs_end: int | None = None, 218 ) -> list[StyleAndTextTuples]: 219 """Return lines for final display. 220 221 Styling is added for the line under the cursor.""" 222 if not marked_logs_start: 223 marked_logs_start = -1 224 if not marked_logs_end: 225 marked_logs_end = -1 226 227 all_lines: list[StyleAndTextTuples] = [] 228 # Loop through a copy of the line_buffer in case it is mutated before 229 # this function is complete. 230 for i, line in enumerate(list(self.line_buffer)): 231 # Is this line the cursor_position? Apply line highlighting 232 if ( 233 i == self.cursor_position 234 and (self.cursor_position < len(self.line_buffer)) 235 and not self.line_buffer[self.cursor_position].empty() 236 ): 237 # Fill in empty charaters to the width of the screen. This 238 # ensures the backgound is highlighted to the edge of the 239 # screen. 240 new_fragments = fill_character_width( 241 line.fragments, 242 len(line.fragments) - 1, # -1 for the ending line break 243 self.width, 244 ) 245 246 # Apply a style to highlight this line. 247 all_lines.append( 248 to_formatted_text( 249 new_fragments, style='class:selected-log-line' 250 ) 251 ) 252 elif line.log_index is not None and ( 253 marked_logs_start <= line.log_index <= marked_logs_end 254 ): 255 new_fragments = fill_character_width( 256 line.fragments, 257 len(line.fragments) - 1, # -1 for the ending line break 258 self.width, 259 ) 260 261 # Apply a style to highlight this line. 262 all_lines.append( 263 to_formatted_text( 264 new_fragments, style='class:marked-log-line' 265 ) 266 ) 267 268 else: 269 all_lines.append(line.fragments) 270 271 return all_lines 272 273 def _prepend_line(self, line: ScreenLine) -> None: 274 """Add a line to the top of the screen.""" 275 self.line_buffer.appendleft(line) 276 self._empty = False 277 278 def _append_line(self, line: ScreenLine) -> None: 279 """Add a line to the bottom of the screen.""" 280 self.line_buffer.append(line) 281 self._empty = False 282 283 def _trim_top_lines(self) -> None: 284 """Remove lines from the top if larger than the screen height.""" 285 overflow_amount = len(self.line_buffer) - self.height 286 for _ in range(overflow_amount): 287 self.line_buffer.popleft() 288 289 def _trim_bottom_lines(self) -> None: 290 """Remove lines from the bottom if larger than the screen height.""" 291 overflow_amount = len(self.line_buffer) - self.height 292 for _ in range(overflow_amount): 293 self.line_buffer.pop() 294 295 def move_cursor_up(self, line_count: int) -> int: 296 """Move the cursor up as far as it can go without fetching new lines. 297 298 Args: 299 line_count: A negative number of lines to move the cursor by. 300 301 Returns: 302 int: The remaining line count that was not moved. This is the number 303 of new lines that need to be fetched and prepended to the screen 304 line buffer.""" 305 remaining_lines = line_count 306 307 # Loop from a negative line_count value to zero. 308 # For example if line_count is -5 the loop will traverse: 309 # >>> list(i for i in range(-5, 0, 1)) 310 # [-5, -4, -3, -2, -1] 311 for _ in range(line_count, 0, 1): 312 new_index = self.cursor_position - 1 313 if new_index < 0: 314 break 315 if ( 316 new_index < len(self.line_buffer) 317 and self.line_buffer[new_index].empty() 318 ): 319 # The next line is empty and has no content. 320 break 321 self.cursor_position -= 1 322 remaining_lines += 1 323 return remaining_lines 324 325 def move_cursor_down(self, line_count: int) -> int: 326 """Move the cursor down as far as it can go without fetching new lines. 327 328 Args: 329 line_count: A positive number of lines to move the cursor down by. 330 331 Returns: 332 int: The remaining line count that was not moved. This is the number 333 of new lines that need to be fetched and appended to the screen line 334 buffer.""" 335 remaining_lines = line_count 336 for _ in range(line_count): 337 new_index = self.cursor_position + 1 338 if new_index >= self.height: 339 break 340 if ( 341 new_index < len(self.line_buffer) 342 and self.line_buffer[new_index].empty() 343 ): 344 # The next line is empty and has no content. 345 break 346 self.cursor_position += 1 347 remaining_lines -= 1 348 return remaining_lines 349 350 def move_cursor_to_bottom(self) -> None: 351 """Move the cursor to the bottom of the screen. 352 353 Only use this for movement not initiated by users. For example if new 354 logs were just added to the bottom of the screen in follow 355 mode. The LogScreen class does not allow scrolling beyond the bottom of 356 the content so the cursor will fall on a log message as long as there 357 are some log messages. If there are no log messages the line is not 358 highlighted by get_lines().""" 359 self.cursor_position = self.height - 1 360 361 def move_cursor_to_position(self, window_row: int) -> None: 362 """Move the cursor to a line if there is a log message there.""" 363 if window_row >= len(self.line_buffer): 364 return 365 if 0 <= window_row < self.height: 366 current_line = self.line_buffer[window_row] 367 if current_line.log_index is not None: 368 self.cursor_position = window_row 369 370 def _move_selection_to_log(self, log_index: int, subline: int) -> None: 371 """Move the cursor to the location of log_index.""" 372 for i, line in enumerate(self.line_buffer): 373 if line.log_index == log_index and line.subline == subline: 374 self.cursor_position = i 375 return 376 377 def shift_selected_log_to_top(self) -> None: 378 """Shift the selected line to the top. 379 380 This moves the lines on screen and keeps the originally selected line 381 highlighted. Example use case: when jumping to a search match the 382 matched line will be shown at the top of the screen.""" 383 if not 0 <= self.cursor_position < len(self.line_buffer): 384 return 385 386 current_line = self.line_buffer[self.cursor_position] 387 amount = max(self.cursor_position, current_line.height) 388 amount -= current_line.subline 389 remaining_lines = self.scroll_subline(amount) 390 if remaining_lines != 0 and current_line.log_index is not None: 391 # Restore original selected line. 392 self._move_selection_to_log( 393 current_line.log_index, current_line.subline 394 ) 395 return 396 # Lines scrolled as expected, set cursor_position to top. 397 self.cursor_position = 0 398 399 def shift_selected_log_to_center(self) -> None: 400 """Shift the selected line to the center. 401 402 This moves the lines on screen and keeps the originally selected line 403 highlighted. Example use case: when jumping to a search match the 404 matched line will be shown at the center of the screen.""" 405 if not 0 <= self.cursor_position < len(self.line_buffer): 406 return 407 408 half_height = int(self.height / 2) 409 current_line = self.line_buffer[self.cursor_position] 410 411 amount = max(self.cursor_position - half_height, current_line.height) 412 amount -= current_line.subline 413 414 remaining_lines = self.scroll_subline(amount) 415 if remaining_lines != 0 and current_line.log_index is not None: 416 # Restore original selected line. 417 self._move_selection_to_log( 418 current_line.log_index, current_line.subline 419 ) 420 return 421 422 # Lines scrolled as expected, set cursor_position to center. 423 self.cursor_position -= amount 424 self.cursor_position -= current_line.height - 1 425 426 def scroll_subline(self, line_count: int = 1) -> int: 427 """Move the cursor down or up by positive or negative lines. 428 429 Args: 430 line_count: A positive or negative number of lines the cursor should 431 move. Positive for down, negative for up. 432 433 Returns: 434 int: The remaining line count that was not moved. This is the number 435 of new lines that could not be fetched in the case that the top or 436 bottom of available log message lines was reached.""" 437 # Move self.cursor_position as far as it can go on screen without 438 # fetching new log message lines. 439 if line_count > 0: 440 remaining_lines = self.move_cursor_down(line_count) 441 else: 442 remaining_lines = self.move_cursor_up(line_count) 443 444 if remaining_lines == 0: 445 # No more lines needed, return 446 return remaining_lines 447 448 # Top or bottom of the screen was reached, fetch and add new log lines. 449 if remaining_lines < 0: 450 return self.fetch_subline_up(remaining_lines) 451 return self.fetch_subline_down(remaining_lines) 452 453 def fetch_subline_up(self, line_count: int = -1) -> int: 454 """Fetch new lines from the top in order of decreasing log_indexes. 455 456 Args: 457 line_count: A negative number of lines that should be fetched and 458 added to the top of the screen. 459 460 Returns: 461 int: The number of lines that were not fetched. Returns 0 if the 462 desired number of lines were fetched successfully.""" 463 start_log_index, _log_source = self.get_log_source() 464 remaining_lines = line_count 465 for _ in range(line_count, 0, 1): 466 current_line = self.get_line_at_cursor_position() 467 if current_line.log_index is None: 468 return remaining_lines + 1 469 470 target_log_index: int 471 target_subline: int 472 473 # If the current subline is at the start of this log, fetch the 474 # previous log message's last subline. 475 if current_line.subline == 0: 476 target_log_index = current_line.log_index - 1 477 # Set -1 to signal fetching the previous log's last subline 478 target_subline = -1 479 else: 480 # Get previous sub line of current log 481 target_log_index = current_line.log_index 482 target_subline = current_line.subline - 1 483 484 if target_log_index < start_log_index: 485 # Invalid log_index, don't scroll further 486 return remaining_lines + 1 487 488 self.prepend_log(target_log_index, subline=target_subline) 489 remaining_lines += 1 490 491 return remaining_lines 492 493 def get_line_at_cursor_position(self) -> ScreenLine: 494 """Returns the ScreenLine under the cursor.""" 495 if ( 496 self.cursor_position >= len(self.line_buffer) 497 or self.cursor_position < 0 498 ): 499 return ScreenLine([('', '')]) 500 return self.line_buffer[self.cursor_position] 501 502 def fetch_subline_down(self, line_count: int = 1) -> int: 503 """Fetch new lines from the bottom in order of increasing log_indexes. 504 505 Args: 506 line_count: A positive number of lines that should be fetched and 507 added to the bottom of the screen. 508 509 Returns: 510 int: The number of lines that were not fetched. Returns 0 if the 511 desired number of lines were fetched successfully.""" 512 _start_log_index, log_source = self.get_log_source() 513 remaining_lines = line_count 514 for _ in range(line_count): 515 # Skip this line if not at the bottom 516 if self.cursor_position < self.height - 1: 517 self.cursor_position += 1 518 continue 519 520 current_line = self.get_line_at_cursor_position() 521 if current_line.log_index is None: 522 return remaining_lines - 1 523 524 target_log_index: int 525 target_subline: int 526 527 # If the current subline is at the height of this log, fetch the 528 # next log message. 529 if current_line.subline == current_line.height - 1: 530 # Get next log's first subline 531 target_log_index = current_line.log_index + 1 532 target_subline = 0 533 else: 534 # Get next sub line of current log 535 target_log_index = current_line.log_index 536 target_subline = current_line.subline + 1 537 538 if target_log_index >= len(log_source): 539 # Invalid log_index, don't scroll further 540 return remaining_lines - 1 541 542 self.append_log(target_log_index, subline=target_subline) 543 remaining_lines -= 1 544 545 return remaining_lines 546 547 def first_rendered_log_index(self) -> int | None: 548 """Scan the screen for the first valid log_index and return it.""" 549 log_index = None 550 for i in range(self.height): 551 if i >= len(self.line_buffer): 552 break 553 if self.line_buffer[i].log_index is not None: 554 log_index = self.line_buffer[i].log_index 555 break 556 return log_index 557 558 def last_rendered_log_index(self) -> int | None: 559 """Return the last log_index shown on screen.""" 560 log_index = None 561 if len(self.line_buffer) == 0: 562 return None 563 if self.line_buffer[-1].log_index is not None: 564 log_index = self.line_buffer[-1].log_index 565 return log_index 566 567 def _get_fragments_per_line( 568 self, log_index: int 569 ) -> list[StyleAndTextTuples]: 570 """Return a list of lines wrapped to the screen width for a log. 571 572 Before fetching the log message this function updates the log_source and 573 formatting options.""" 574 _start_log_index, log_source = self.get_log_source() 575 if log_index >= len(log_source): 576 return [] 577 log = log_source[log_index] 578 table_formatter = self.get_log_formatter() 579 truncate_lines = not self.get_line_wrapping() 580 search_filter = self.get_search_filter() 581 search_highlight = self.get_search_highlight() 582 583 # Select the log display formatter; table or standard. 584 fragments: StyleAndTextTuples = [] 585 if table_formatter: 586 fragments = table_formatter(log) 587 else: 588 fragments = log.get_fragments() 589 590 # Apply search term highlighting. 591 if search_filter and search_highlight and search_filter.matches(log): 592 fragments = search_filter.highlight_search_matches(fragments) 593 594 # Word wrap the log message or truncate to screen width 595 line_fragments, _log_line_height = insert_linebreaks( 596 fragments, 597 max_line_width=self.width, 598 truncate_long_lines=truncate_lines, 599 ) 600 # Convert the existing flattened fragments to a list of lines. 601 fragments_per_line = split_lines(line_fragments) 602 603 return fragments_per_line 604 605 def prepend_log( 606 self, 607 log_index: int, 608 subline: int | None = None, 609 ) -> None: 610 """Add a log message or a single line to the top of the screen. 611 612 Args: 613 log_index: The index of the log message to fetch. 614 subline: The desired subline of the log message. When displayed on 615 screen the log message may take up more than one line. If 616 subline is 0 or higher that line will be added. If subline is -1 617 the last subline will be prepended regardless of the total log 618 message height. 619 """ 620 fragments_per_line = self._get_fragments_per_line(log_index) 621 622 # Target the last subline if the subline arg is set to -1. 623 fetch_last_subline = subline == -1 624 625 for line_index, line in enumerate(fragments_per_line): 626 # If we are looking for a specific subline and this isn't it, skip. 627 if subline is not None: 628 # If subline is set to -1 we need to append the last subline of 629 # this log message. Skip this line if it isn't the last one. 630 if fetch_last_subline and ( 631 line_index != len(fragments_per_line) - 1 632 ): 633 continue 634 # If subline is not -1 (0 or higher) and this isn't the desired 635 # line, skip to the next one. 636 if not fetch_last_subline and line_index != subline: 637 continue 638 639 self._prepend_line( 640 ScreenLine( 641 fragments=line, 642 log_index=log_index, 643 subline=line_index, 644 height=len(fragments_per_line), 645 ) 646 ) 647 648 # Remove lines from the bottom if over the screen height. 649 if len(self.line_buffer) > self.height: 650 self._trim_bottom_lines() 651 652 def append_log( 653 self, 654 log_index: int, 655 subline: int | None = None, 656 ) -> None: 657 """Add a log message or a single line to the bottom of the screen.""" 658 # Save this log_index 659 self.last_appended_log_index = log_index 660 fragments_per_line = self._get_fragments_per_line(log_index) 661 662 for line_index, line in enumerate(fragments_per_line): 663 # If we are looking for a specific subline and this isn't it, skip. 664 if subline is not None and line_index != subline: 665 continue 666 667 self._append_line( 668 ScreenLine( 669 fragments=line, 670 log_index=log_index, 671 subline=line_index, 672 height=len(fragments_per_line), 673 ) 674 ) 675 676 # Remove lines from the top if over the screen height. 677 if len(self.line_buffer) > self.height: 678 self._trim_top_lines() 679