xref: /aosp_15_r20/external/pigweed/pw_console/py/pw_console/log_screen.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2022 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""LogScreen tracks lines to display on screen with a set of ScreenLines."""
15
16from __future__ import annotations
17import collections
18import dataclasses
19import logging
20from typing import Callable, TYPE_CHECKING
21
22from prompt_toolkit.formatted_text import (
23    to_formatted_text,
24    StyleAndTextTuples,
25)
26
27from pw_console.log_filter import LogFilter
28from pw_console.text_formatting import (
29    fill_character_width,
30    insert_linebreaks,
31    split_lines,
32)
33
34if TYPE_CHECKING:
35    from pw_console.log_line import LogLine
36    from pw_console.log_pane import LogPane
37
38_LOG = logging.getLogger(__package__)
39
40
41@dataclasses.dataclass
42class ScreenLine:
43    """A single line of text for displaying on screen.
44
45    Instances of ScreenLine are stored in a LogScreen's line_buffer deque. When
46    a new log message is added it may be converted into multiple ScreenLine
47    instances if the text is wrapped across multiple lines.
48
49    For example: say our screen is 80 characters wide and a log message 240
50    characters long needs to be displayed. With line wrapping on we will need
51    240/80 = 3 lines to show the full message. Say also that this single log
52    message is at index #5 in the LogStore classes deque, this is the log_index
53    value. This single log message will then be split into 3 separate
54    ScreenLine instances:
55
56    ::
57        ScreenLine(fragments=[('', 'Log message text line one')],
58                   log_index=5, subline=0, height=3)
59        ScreenLine(fragments=[('', 'Log message text line two')],
60                   log_index=5, subline=1, height=3)
61        ScreenLine(fragments=[('', 'Log message text line three')],
62                   log_index=5, subline=2, height=3)
63
64    Each `fragments` attribute will store the formatted text indended to be
65    direcly drawn to the screen. Since these three lines are all displaying the
66    same log message their `log_index` reference will be the same. The `subline`
67    attribute is the zero-indexed number for this log's wrapped line count and
68    `height` is the total ScreenLines needed to show this log message.
69
70    Continuing with this example say the next two log messages to display both
71    fit on screen with no wrapping. They will both be represented with one
72    ScreenLine each:
73
74    ::
75        ScreenLine(fragments=[('', 'Another log message')],
76                   log_index=6, subline=0, height=1)
77        ScreenLine(fragments=[('', 'Yet another log message')],
78                   log_index=7, subline=0, height=1)
79
80    The `log_index` is different for each since these are both separate
81    logs. The subline is 0 since each line is the first one for this log. Both
82    have a height of 1 since no line wrapping was performed.
83    """
84
85    # The StyleAndTextTuples for this line ending with a '\n'. These are the raw
86    # prompt_toolkit formatted text tuples to display on screen. The colors and
87    # spacing can change depending on the formatters used in the
88    # LogScreen._get_fragments_per_line() function.
89    fragments: StyleAndTextTuples
90
91    # Log index reference for this screen line. This is the index to where the
92    # log message resides in the parent LogStore.logs deque. It is set to None
93    # if this is an empty ScreenLine. If a log message requires line wrapping
94    # then each resulting ScreenLine instance will have the same log_index
95    # value.
96    #
97    # This log_index may also be the integer index into a LogView.filtered_logs
98    # deque depending on if log messages are being filtered by the user. The
99    # LogScreen class below doesn't need to do anything different in either
100    # case. It's the responsibility of LogScreen.get_log_source() to return the
101    # correct source.
102    #
103    # Note this is NOT an index into LogScreen.line_buffer.
104    log_index: int | None = None
105
106    # Keep track the total height and subline number for this log message.
107    # For example this line could be subline (0, 1, or 2) of a log message with
108    # a total height 3.
109
110    # Subline index.
111    subline: int = 0
112    # Total height in lines of text (also ScreenLine count) that the log message
113    # referred to by log_index requires. When a log message is split across
114    # multiple lines height will be set to the same value for each ScreenLine
115    # instance.
116    height: int = 1
117
118    # Empty lines will have no log_index
119    def empty(self) -> bool:
120        return self.log_index is None
121
122
123@dataclasses.dataclass
124class LogScreen:
125    """LogScreen maintains the state of visible logs on screen.
126
127    It is responsible for moving the cursor_position, prepending and appending
128    log lines as the user moves the cursor."""
129
130    # Callable functions to retrieve logs and display formatting.
131    get_log_source: Callable[[], tuple[int, collections.deque[LogLine]]]
132    get_line_wrapping: Callable[[], bool]
133    get_log_formatter: Callable[
134        [], Callable[[LogLine], StyleAndTextTuples] | None
135    ]
136    get_search_filter: Callable[[], LogFilter | None]
137    get_search_highlight: Callable[[], bool]
138
139    # Window row of the current cursor position
140    cursor_position: int = 0
141    # Screen width and height in number of characters.
142    width: int = 0
143    height: int = 0
144    # Buffer of literal text lines to be displayed on screen. Each visual line
145    # is represented by a ScreenLine instance and will have a max width equal
146    # to the screen's width. If any given whole log message requires line
147    # wrapping to be displayed it will be represented by multiple ScreenLine
148    # instances in this deque.
149    line_buffer: collections.deque[ScreenLine] = dataclasses.field(
150        default_factory=collections.deque
151    )
152
153    def __post_init__(self) -> None:
154        # Empty screen flag. Will be true if the screen contains only newlines.
155        self._empty: bool = True
156        # Save the last log index when appending. Useful for tracking how many
157        # new lines need appending in follow mode.
158        self.last_appended_log_index: int = 0
159
160    def _fill_top_with_empty_lines(self) -> None:
161        """Add empty lines to fill the remaining empty screen space."""
162        for _ in range(self.height - len(self.line_buffer)):
163            self.line_buffer.appendleft(ScreenLine([('', '')]))
164
165    def clear_screen(self) -> None:
166        """Erase all lines and fill with empty lines."""
167        self.line_buffer.clear()
168        self._fill_top_with_empty_lines()
169        self._empty = True
170
171    def empty(self) -> bool:
172        """Return True if the screen has no lines with content."""
173        return self._empty
174
175    def reset_logs(
176        self,
177        log_index: int = 0,
178    ) -> None:
179        """Erase the screen and append logs starting from log_index."""
180        self.clear_screen()
181
182        start_log_index, log_source = self.get_log_source()
183        if len(log_source) == 0:
184            return
185
186        # Append at most at most the window height number worth of logs. If the
187        # number of available logs is less, use that amount.
188        max_log_messages_to_fetch = min(self.height, len(log_source))
189
190        # Including the target log_index, fetch the desired logs.
191        # For example if we are rendering log_index 10 and the window height is
192        # 6 the range below will be:
193        # >>> list(i for i in range((10 - 6) + 1, 10 + 1))
194        # [5, 6, 7, 8, 9, 10]
195        for i in range(
196            (log_index - max_log_messages_to_fetch) + 1, log_index + 1
197        ):
198            # If i is < 0 it's an invalid log, skip to the next line. The next
199            # index could be 0 or higher since we are traversing in increasing
200            # order.
201            if i < start_log_index:
202                continue
203            self.append_log(i)
204        # Make sure the bottom line is highlighted.
205        self.move_cursor_to_bottom()
206
207    def resize(self, width, height) -> None:
208        """Update screen width and height.
209
210        Following a resize the caller should run reset_logs()."""
211        self.width = width
212        self.height = height
213
214    def get_lines(
215        self,
216        marked_logs_start: int | None = None,
217        marked_logs_end: int | None = None,
218    ) -> list[StyleAndTextTuples]:
219        """Return lines for final display.
220
221        Styling is added for the line under the cursor."""
222        if not marked_logs_start:
223            marked_logs_start = -1
224        if not marked_logs_end:
225            marked_logs_end = -1
226
227        all_lines: list[StyleAndTextTuples] = []
228        # Loop through a copy of the line_buffer in case it is mutated before
229        # this function is complete.
230        for i, line in enumerate(list(self.line_buffer)):
231            # Is this line the cursor_position? Apply line highlighting
232            if (
233                i == self.cursor_position
234                and (self.cursor_position < len(self.line_buffer))
235                and not self.line_buffer[self.cursor_position].empty()
236            ):
237                # Fill in empty charaters to the width of the screen. This
238                # ensures the backgound is highlighted to the edge of the
239                # screen.
240                new_fragments = fill_character_width(
241                    line.fragments,
242                    len(line.fragments) - 1,  # -1 for the ending line break
243                    self.width,
244                )
245
246                # Apply a style to highlight this line.
247                all_lines.append(
248                    to_formatted_text(
249                        new_fragments, style='class:selected-log-line'
250                    )
251                )
252            elif line.log_index is not None and (
253                marked_logs_start <= line.log_index <= marked_logs_end
254            ):
255                new_fragments = fill_character_width(
256                    line.fragments,
257                    len(line.fragments) - 1,  # -1 for the ending line break
258                    self.width,
259                )
260
261                # Apply a style to highlight this line.
262                all_lines.append(
263                    to_formatted_text(
264                        new_fragments, style='class:marked-log-line'
265                    )
266                )
267
268            else:
269                all_lines.append(line.fragments)
270
271        return all_lines
272
273    def _prepend_line(self, line: ScreenLine) -> None:
274        """Add a line to the top of the screen."""
275        self.line_buffer.appendleft(line)
276        self._empty = False
277
278    def _append_line(self, line: ScreenLine) -> None:
279        """Add a line to the bottom of the screen."""
280        self.line_buffer.append(line)
281        self._empty = False
282
283    def _trim_top_lines(self) -> None:
284        """Remove lines from the top if larger than the screen height."""
285        overflow_amount = len(self.line_buffer) - self.height
286        for _ in range(overflow_amount):
287            self.line_buffer.popleft()
288
289    def _trim_bottom_lines(self) -> None:
290        """Remove lines from the bottom if larger than the screen height."""
291        overflow_amount = len(self.line_buffer) - self.height
292        for _ in range(overflow_amount):
293            self.line_buffer.pop()
294
295    def move_cursor_up(self, line_count: int) -> int:
296        """Move the cursor up as far as it can go without fetching new lines.
297
298        Args:
299            line_count: A negative number of lines to move the cursor by.
300
301        Returns:
302            int: The remaining line count that was not moved. This is the number
303            of new lines that need to be fetched and prepended to the screen
304            line buffer."""
305        remaining_lines = line_count
306
307        # Loop from a negative line_count value to zero.
308        # For example if line_count is -5 the loop will traverse:
309        # >>> list(i for i in range(-5, 0, 1))
310        # [-5, -4, -3, -2, -1]
311        for _ in range(line_count, 0, 1):
312            new_index = self.cursor_position - 1
313            if new_index < 0:
314                break
315            if (
316                new_index < len(self.line_buffer)
317                and self.line_buffer[new_index].empty()
318            ):
319                # The next line is empty and has no content.
320                break
321            self.cursor_position -= 1
322            remaining_lines += 1
323        return remaining_lines
324
325    def move_cursor_down(self, line_count: int) -> int:
326        """Move the cursor down as far as it can go without fetching new lines.
327
328        Args:
329            line_count: A positive number of lines to move the cursor down by.
330
331        Returns:
332            int: The remaining line count that was not moved. This is the number
333            of new lines that need to be fetched and appended to the screen line
334            buffer."""
335        remaining_lines = line_count
336        for _ in range(line_count):
337            new_index = self.cursor_position + 1
338            if new_index >= self.height:
339                break
340            if (
341                new_index < len(self.line_buffer)
342                and self.line_buffer[new_index].empty()
343            ):
344                # The next line is empty and has no content.
345                break
346            self.cursor_position += 1
347            remaining_lines -= 1
348        return remaining_lines
349
350    def move_cursor_to_bottom(self) -> None:
351        """Move the cursor to the bottom of the screen.
352
353        Only use this for movement not initiated by users. For example if new
354        logs were just added to the bottom of the screen in follow
355        mode. The LogScreen class does not allow scrolling beyond the bottom of
356        the content so the cursor will fall on a log message as long as there
357        are some log messages. If there are no log messages the line is not
358        highlighted by get_lines()."""
359        self.cursor_position = self.height - 1
360
361    def move_cursor_to_position(self, window_row: int) -> None:
362        """Move the cursor to a line if there is a log message there."""
363        if window_row >= len(self.line_buffer):
364            return
365        if 0 <= window_row < self.height:
366            current_line = self.line_buffer[window_row]
367            if current_line.log_index is not None:
368                self.cursor_position = window_row
369
370    def _move_selection_to_log(self, log_index: int, subline: int) -> None:
371        """Move the cursor to the location of log_index."""
372        for i, line in enumerate(self.line_buffer):
373            if line.log_index == log_index and line.subline == subline:
374                self.cursor_position = i
375                return
376
377    def shift_selected_log_to_top(self) -> None:
378        """Shift the selected line to the top.
379
380        This moves the lines on screen and keeps the originally selected line
381        highlighted. Example use case: when jumping to a search match the
382        matched line will be shown at the top of the screen."""
383        if not 0 <= self.cursor_position < len(self.line_buffer):
384            return
385
386        current_line = self.line_buffer[self.cursor_position]
387        amount = max(self.cursor_position, current_line.height)
388        amount -= current_line.subline
389        remaining_lines = self.scroll_subline(amount)
390        if remaining_lines != 0 and current_line.log_index is not None:
391            # Restore original selected line.
392            self._move_selection_to_log(
393                current_line.log_index, current_line.subline
394            )
395            return
396        # Lines scrolled as expected, set cursor_position to top.
397        self.cursor_position = 0
398
399    def shift_selected_log_to_center(self) -> None:
400        """Shift the selected line to the center.
401
402        This moves the lines on screen and keeps the originally selected line
403        highlighted. Example use case: when jumping to a search match the
404        matched line will be shown at the center of the screen."""
405        if not 0 <= self.cursor_position < len(self.line_buffer):
406            return
407
408        half_height = int(self.height / 2)
409        current_line = self.line_buffer[self.cursor_position]
410
411        amount = max(self.cursor_position - half_height, current_line.height)
412        amount -= current_line.subline
413
414        remaining_lines = self.scroll_subline(amount)
415        if remaining_lines != 0 and current_line.log_index is not None:
416            # Restore original selected line.
417            self._move_selection_to_log(
418                current_line.log_index, current_line.subline
419            )
420            return
421
422        # Lines scrolled as expected, set cursor_position to center.
423        self.cursor_position -= amount
424        self.cursor_position -= current_line.height - 1
425
426    def scroll_subline(self, line_count: int = 1) -> int:
427        """Move the cursor down or up by positive or negative lines.
428
429        Args:
430            line_count: A positive or negative number of lines the cursor should
431                move. Positive for down, negative for up.
432
433        Returns:
434            int: The remaining line count that was not moved. This is the number
435            of new lines that could not be fetched in the case that the top or
436            bottom of available log message lines was reached."""
437        # Move self.cursor_position as far as it can go on screen without
438        # fetching new log message lines.
439        if line_count > 0:
440            remaining_lines = self.move_cursor_down(line_count)
441        else:
442            remaining_lines = self.move_cursor_up(line_count)
443
444        if remaining_lines == 0:
445            # No more lines needed, return
446            return remaining_lines
447
448        # Top or bottom of the screen was reached, fetch and add new log lines.
449        if remaining_lines < 0:
450            return self.fetch_subline_up(remaining_lines)
451        return self.fetch_subline_down(remaining_lines)
452
453    def fetch_subline_up(self, line_count: int = -1) -> int:
454        """Fetch new lines from the top in order of decreasing log_indexes.
455
456        Args:
457            line_count: A negative number of lines that should be fetched and
458                added to the top of the screen.
459
460        Returns:
461            int: The number of lines that were not fetched. Returns 0 if the
462                desired number of lines were fetched successfully."""
463        start_log_index, _log_source = self.get_log_source()
464        remaining_lines = line_count
465        for _ in range(line_count, 0, 1):
466            current_line = self.get_line_at_cursor_position()
467            if current_line.log_index is None:
468                return remaining_lines + 1
469
470            target_log_index: int
471            target_subline: int
472
473            # If the current subline is at the start of this log, fetch the
474            # previous log message's last subline.
475            if current_line.subline == 0:
476                target_log_index = current_line.log_index - 1
477                # Set -1 to signal fetching the previous log's last subline
478                target_subline = -1
479            else:
480                # Get previous sub line of current log
481                target_log_index = current_line.log_index
482                target_subline = current_line.subline - 1
483
484            if target_log_index < start_log_index:
485                # Invalid log_index, don't scroll further
486                return remaining_lines + 1
487
488            self.prepend_log(target_log_index, subline=target_subline)
489            remaining_lines += 1
490
491        return remaining_lines
492
493    def get_line_at_cursor_position(self) -> ScreenLine:
494        """Returns the ScreenLine under the cursor."""
495        if (
496            self.cursor_position >= len(self.line_buffer)
497            or self.cursor_position < 0
498        ):
499            return ScreenLine([('', '')])
500        return self.line_buffer[self.cursor_position]
501
502    def fetch_subline_down(self, line_count: int = 1) -> int:
503        """Fetch new lines from the bottom in order of increasing log_indexes.
504
505        Args:
506            line_count: A positive number of lines that should be fetched and
507                added to the bottom of the screen.
508
509        Returns:
510            int: The number of lines that were not fetched. Returns 0 if the
511                desired number of lines were fetched successfully."""
512        _start_log_index, log_source = self.get_log_source()
513        remaining_lines = line_count
514        for _ in range(line_count):
515            # Skip this line if not at the bottom
516            if self.cursor_position < self.height - 1:
517                self.cursor_position += 1
518                continue
519
520            current_line = self.get_line_at_cursor_position()
521            if current_line.log_index is None:
522                return remaining_lines - 1
523
524            target_log_index: int
525            target_subline: int
526
527            # If the current subline is at the height of this log, fetch the
528            # next log message.
529            if current_line.subline == current_line.height - 1:
530                # Get next log's first subline
531                target_log_index = current_line.log_index + 1
532                target_subline = 0
533            else:
534                # Get next sub line of current log
535                target_log_index = current_line.log_index
536                target_subline = current_line.subline + 1
537
538            if target_log_index >= len(log_source):
539                # Invalid log_index, don't scroll further
540                return remaining_lines - 1
541
542            self.append_log(target_log_index, subline=target_subline)
543            remaining_lines -= 1
544
545        return remaining_lines
546
547    def first_rendered_log_index(self) -> int | None:
548        """Scan the screen for the first valid log_index and return it."""
549        log_index = None
550        for i in range(self.height):
551            if i >= len(self.line_buffer):
552                break
553            if self.line_buffer[i].log_index is not None:
554                log_index = self.line_buffer[i].log_index
555                break
556        return log_index
557
558    def last_rendered_log_index(self) -> int | None:
559        """Return the last log_index shown on screen."""
560        log_index = None
561        if len(self.line_buffer) == 0:
562            return None
563        if self.line_buffer[-1].log_index is not None:
564            log_index = self.line_buffer[-1].log_index
565        return log_index
566
567    def _get_fragments_per_line(
568        self, log_index: int
569    ) -> list[StyleAndTextTuples]:
570        """Return a list of lines wrapped to the screen width for a log.
571
572        Before fetching the log message this function updates the log_source and
573        formatting options."""
574        _start_log_index, log_source = self.get_log_source()
575        if log_index >= len(log_source):
576            return []
577        log = log_source[log_index]
578        table_formatter = self.get_log_formatter()
579        truncate_lines = not self.get_line_wrapping()
580        search_filter = self.get_search_filter()
581        search_highlight = self.get_search_highlight()
582
583        # Select the log display formatter; table or standard.
584        fragments: StyleAndTextTuples = []
585        if table_formatter:
586            fragments = table_formatter(log)
587        else:
588            fragments = log.get_fragments()
589
590        # Apply search term highlighting.
591        if search_filter and search_highlight and search_filter.matches(log):
592            fragments = search_filter.highlight_search_matches(fragments)
593
594        # Word wrap the log message or truncate to screen width
595        line_fragments, _log_line_height = insert_linebreaks(
596            fragments,
597            max_line_width=self.width,
598            truncate_long_lines=truncate_lines,
599        )
600        # Convert the existing flattened fragments to a list of lines.
601        fragments_per_line = split_lines(line_fragments)
602
603        return fragments_per_line
604
605    def prepend_log(
606        self,
607        log_index: int,
608        subline: int | None = None,
609    ) -> None:
610        """Add a log message or a single line to the top of the screen.
611
612        Args:
613            log_index: The index of the log message to fetch.
614            subline: The desired subline of the log message. When displayed on
615                screen the log message may take up more than one line. If
616                subline is 0 or higher that line will be added. If subline is -1
617                the last subline will be prepended regardless of the total log
618                message height.
619        """
620        fragments_per_line = self._get_fragments_per_line(log_index)
621
622        # Target the last subline if the subline arg is set to -1.
623        fetch_last_subline = subline == -1
624
625        for line_index, line in enumerate(fragments_per_line):
626            # If we are looking for a specific subline and this isn't it, skip.
627            if subline is not None:
628                # If subline is set to -1 we need to append the last subline of
629                # this log message. Skip this line if it isn't the last one.
630                if fetch_last_subline and (
631                    line_index != len(fragments_per_line) - 1
632                ):
633                    continue
634                # If subline is not -1 (0 or higher) and this isn't the desired
635                # line, skip to the next one.
636                if not fetch_last_subline and line_index != subline:
637                    continue
638
639            self._prepend_line(
640                ScreenLine(
641                    fragments=line,
642                    log_index=log_index,
643                    subline=line_index,
644                    height=len(fragments_per_line),
645                )
646            )
647
648        # Remove lines from the bottom if over the screen height.
649        if len(self.line_buffer) > self.height:
650            self._trim_bottom_lines()
651
652    def append_log(
653        self,
654        log_index: int,
655        subline: int | None = None,
656    ) -> None:
657        """Add a log message or a single line to the bottom of the screen."""
658        # Save this log_index
659        self.last_appended_log_index = log_index
660        fragments_per_line = self._get_fragments_per_line(log_index)
661
662        for line_index, line in enumerate(fragments_per_line):
663            # If we are looking for a specific subline and this isn't it, skip.
664            if subline is not None and line_index != subline:
665                continue
666
667            self._append_line(
668                ScreenLine(
669                    fragments=line,
670                    log_index=log_index,
671                    subline=line_index,
672                    height=len(fragments_per_line),
673                )
674            )
675
676        # Remove lines from the top if over the screen height.
677        if len(self.line_buffer) > self.height:
678            self._trim_top_lines()
679