xref: /aosp_15_r20/external/pigweed/pw_console/py/pw_console/log_view.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""LogView maintains a log pane's scrolling and searching state."""
15
16from __future__ import annotations
17import asyncio
18import collections
19import copy
20from enum import Enum
21import itertools
22import json
23import logging
24import operator
25from pathlib import Path
26import re
27from threading import Thread
28from typing import Callable, TYPE_CHECKING
29
30from prompt_toolkit.data_structures import Point
31from prompt_toolkit.formatted_text import StyleAndTextTuples
32import websockets
33
34from pw_console.log_filter import (
35    DEFAULT_SEARCH_MATCHER,
36    LogFilter,
37    RegexValidator,
38    SearchMatcher,
39    preprocess_search_regex,
40)
41from pw_console.log_screen import ScreenLine, LogScreen
42from pw_console.log_store import LogStore
43from pw_console.python_logging import log_record_to_json
44from pw_console.text_formatting import remove_formatting
45
46if TYPE_CHECKING:
47    from pw_console.console_app import ConsoleApp
48    from pw_console.log_line import LogLine
49    from pw_console.log_pane import LogPane
50
51_LOG = logging.getLogger(__package__)
52
53
54class FollowEvent(Enum):
55    """Follow mode scroll event types."""
56
57    SEARCH_MATCH = 'scroll_to_bottom'
58    STICKY_FOLLOW = 'scroll_to_bottom_with_sticky_follow'
59
60
61class LogView:
62    """Viewing window into a LogStore."""
63
64    # pylint: disable=too-many-instance-attributes,too-many-public-methods
65
66    def __init__(
67        self,
68        log_pane: LogPane,
69        application: ConsoleApp,
70        log_store: LogStore | None = None,
71    ):
72        # Parent LogPane reference. Updated by calling `set_log_pane()`.
73        self.log_pane = log_pane
74        self.log_store = (
75            log_store if log_store else LogStore(prefs=application.prefs)
76        )
77        self.log_store.set_prefs(application.prefs)
78        self.log_store.register_viewer(self)
79
80        self.marked_logs_start: int | None = None
81        self.marked_logs_end: int | None = None
82
83        # Search variables
84        self.search_text: str | None = None
85        self.search_filter: LogFilter | None = None
86        self.search_highlight: bool = False
87        self.search_matcher = DEFAULT_SEARCH_MATCHER
88        self.search_validator = RegexValidator()
89
90        # Container for each log_index matched by active searches.
91        self.search_matched_lines: dict[int, int] = {}
92        # Background task to find historical matched lines.
93        self.search_match_count_task: asyncio.Task | None = None
94
95        # Flag for automatically jumping to each new search match as they
96        # appear.
97        self.follow_search_match: bool = False
98        self.last_search_matched_log: int | None = None
99
100        # Follow event flag. This is set to by the new_logs_arrived() function
101        # as a signal that the log screen should be scrolled to the bottom.
102        # This is read by render_content() whenever the screen is drawn.
103        self.follow_event: FollowEvent | None = None
104
105        self.log_screen = LogScreen(
106            get_log_source=self._get_log_lines,
107            get_line_wrapping=self.wrap_lines_enabled,
108            get_log_formatter=self._get_table_formatter,
109            get_search_filter=lambda: self.search_filter,
110            get_search_highlight=lambda: self.search_highlight,
111        )
112
113        # Filter
114        self.filtering_on: bool = False
115        self.filters: collections.OrderedDict[
116            str, LogFilter
117        ] = collections.OrderedDict()
118        self.filtered_logs: collections.deque = collections.deque()
119        self.filter_existing_logs_task: asyncio.Task | None = None
120
121        # Current log line index state variables:
122        self._last_log_index = -1
123        self._log_index = 0
124        self._filtered_log_index = 0
125        self._last_start_index = 0
126        self._last_end_index = 0
127        self._current_start_index = 0
128        self._current_end_index = 0
129        self._scrollback_start_index = 0
130
131        # LogPane prompt_toolkit container render size.
132        self._window_height = 20
133        self._window_width = 80
134        self._reset_log_screen_on_next_render: bool = True
135        self._user_scroll_event: bool = False
136
137        self._last_log_store_index = 0
138        self._new_logs_since_last_render = True
139        self._new_logs_since_last_websocket_serve = True
140        self._last_served_websocket_index = -1
141
142        # Should new log lines be tailed?
143        self.follow: bool = True
144
145        self.visual_select_mode: bool = False
146
147        # Cache of formatted text tuples used in the last UI render.
148        self._line_fragment_cache: list[StyleAndTextTuples] = []
149
150        # websocket server variables
151        self.websocket_running: bool = False
152        self.websocket_server = None
153        self.websocket_port = None
154        self.websocket_loop = asyncio.new_event_loop()
155
156        # Check if any logs are already in the log_store and update the view.
157        self.new_logs_arrived()
158
159    def _websocket_thread_entry(self):
160        """Entry point for the user code thread."""
161        asyncio.set_event_loop(self.websocket_loop)
162        self.websocket_server = websockets.serve(  # type: ignore # pylint: disable=no-member
163            self._send_logs_over_websockets, '127.0.0.1'
164        )
165        self.websocket_loop.run_until_complete(self.websocket_server)
166        self.websocket_port = self.websocket_server.ws_server.sockets[
167            0
168        ].getsockname()[1]
169        self.websocket_running = True
170        self.websocket_loop.run_forever()
171
172    def start_websocket_thread(self):
173        """Create a thread for running user code so the UI isn't blocked."""
174        thread = Thread(
175            target=self._websocket_thread_entry, args=(), daemon=True
176        )
177        thread.start()
178
179    def stop_websocket_thread(self):
180        """Stop websocket server."""
181        if self.websocket_running:
182            self.websocket_loop.call_soon_threadsafe(self.websocket_loop.stop)
183            self.websocket_server = None
184            self.websocket_port = None
185            self.websocket_running = False
186            if self.filtering_on:
187                self._restart_filtering()
188
189    async def _send_logs_over_websockets(self, websocket, _path) -> None:
190        def formatter(log: LogLine) -> str:
191            return log_record_to_json(log.record)
192
193        theme_colors = json.dumps(
194            self.log_pane.application.prefs.pw_console_color_config()
195        )
196        # Send colors
197        await websocket.send(theme_colors)
198
199        while True:
200            # Wait for new logs
201            if not self._new_logs_since_last_websocket_serve:
202                await asyncio.sleep(0.5)
203
204            _start_log_index, log_source = self._get_log_lines()
205            log_index_range = range(
206                self._last_served_websocket_index + 1, self.get_total_count()
207            )
208
209            for i in log_index_range:
210                log_text = formatter(log_source[i])
211                await websocket.send(log_text)
212                self._last_served_websocket_index = i
213
214            # Flag that all logs have been served.
215            self._new_logs_since_last_websocket_serve = False
216
217    def view_mode_changed(self) -> None:
218        self._reset_log_screen_on_next_render = True
219
220    @property
221    def log_index(self):
222        if self.filtering_on:
223            return self._filtered_log_index
224        return self._log_index
225
226    @log_index.setter
227    def log_index(self, new_log_index):
228        # Save the old log_index
229        self._last_log_index = self.log_index
230        if self.filtering_on:
231            self._filtered_log_index = new_log_index
232        else:
233            self._log_index = new_log_index
234
235    def _reset_log_index_changed(self) -> None:
236        self._last_log_index = self.log_index
237
238    def log_index_changed_since_last_render(self) -> bool:
239        return self._last_log_index != self.log_index
240
241    def _set_match_position(self, position: int):
242        self.follow = False
243        self.log_index = position
244        self.save_search_matched_line(position)
245        self.log_screen.reset_logs(log_index=self.log_index)
246        self.log_screen.shift_selected_log_to_center()
247        self._user_scroll_event = True
248        self.log_pane.application.redraw_ui()
249
250    def select_next_search_matcher(self):
251        matchers = list(SearchMatcher)
252        index = matchers.index(self.search_matcher)
253        new_index = (index + 1) % len(matchers)
254        self.search_matcher = matchers[new_index]
255
256    def search_forwards(self):
257        if not self.search_filter:
258            return
259        self.search_highlight = True
260
261        log_beginning_index = self.hidden_line_count()
262
263        starting_index = self.log_index + 1
264        if starting_index > self.get_last_log_index():
265            starting_index = log_beginning_index
266
267        _, logs = self._get_log_lines()
268
269        # From current position +1 and down
270        for i in range(starting_index, self.get_last_log_index() + 1):
271            if self.search_filter.matches(logs[i]):
272                self._set_match_position(i)
273                return
274
275        # From the beginning to the original start
276        for i in range(log_beginning_index, starting_index):
277            if self.search_filter.matches(logs[i]):
278                self._set_match_position(i)
279                return
280
281    def search_backwards(self):
282        if not self.search_filter:
283            return
284        self.search_highlight = True
285
286        log_beginning_index = self.hidden_line_count()
287
288        starting_index = self.log_index - 1
289        if starting_index < 0:
290            starting_index = self.get_last_log_index()
291
292        _, logs = self._get_log_lines()
293
294        # From current position - 1 and up
295        for i in range(starting_index, log_beginning_index - 1, -1):
296            if self.search_filter.matches(logs[i]):
297                self._set_match_position(i)
298                return
299
300        # From the end to the original start
301        for i in range(self.get_last_log_index(), starting_index, -1):
302            if self.search_filter.matches(logs[i]):
303                self._set_match_position(i)
304                return
305
306    def set_search_regex(
307        self, text, invert, field, matcher: SearchMatcher | None = None
308    ) -> bool:
309        search_matcher = matcher if matcher else self.search_matcher
310        _LOG.debug(search_matcher)
311
312        regex_text, regex_flags = preprocess_search_regex(
313            text, matcher=search_matcher
314        )
315
316        try:
317            compiled_regex = re.compile(regex_text, regex_flags)
318            self.search_filter = LogFilter(
319                regex=compiled_regex,
320                input_text=text,
321                invert=invert,
322                field=field,
323            )
324            _LOG.debug(self.search_filter)
325        except re.error as error:
326            _LOG.debug(error)
327            return False
328
329        self.search_highlight = True
330        self.search_text = regex_text
331        return True
332
333    def new_search(
334        self,
335        text,
336        invert=False,
337        field: str | None = None,
338        search_matcher: str | None = None,
339        interactive: bool = True,
340    ) -> bool:
341        """Start a new search for the given text."""
342        valid_matchers = list(s.name for s in SearchMatcher)
343        selected_matcher: SearchMatcher | None = None
344        if (
345            search_matcher is not None
346            and search_matcher.upper() in valid_matchers
347        ):
348            selected_matcher = SearchMatcher(search_matcher.upper())
349
350        if not self.set_search_regex(text, invert, field, selected_matcher):
351            return False
352
353        # Clear matched lines
354        self.search_matched_lines = {}
355
356        if interactive:
357            # Start count historical search matches task.
358            self.search_match_count_task = asyncio.create_task(
359                self.count_search_matches()
360            )
361
362        # Default search direction when hitting enter in the search bar.
363        if interactive:
364            self.search_forwards()
365        return True
366
367    def save_search_matched_line(self, log_index: int) -> None:
368        """Save the log_index at position as a matched line."""
369        self.search_matched_lines[log_index] = 0
370        # Keep matched lines sorted by position
371        self.search_matched_lines = {
372            # Save this log_index and its match number.
373            log_index: match_number
374            for match_number, log_index in enumerate(
375                sorted(self.search_matched_lines.keys())
376            )
377        }
378
379    def disable_search_highlighting(self):
380        self.log_pane.log_view.search_highlight = False
381
382    def _restart_filtering(self):
383        # Turn on follow
384        if not self.follow:
385            self.toggle_follow()
386
387        # Reset filtered logs.
388        self.filtered_logs.clear()
389        # Reset scrollback start
390        self._scrollback_start_index = 0
391
392        # Start filtering existing log lines.
393        self.filter_existing_logs_task = asyncio.create_task(
394            self.filter_past_logs()
395        )
396
397        # Reset existing search
398        self.clear_search()
399
400        # Trigger a main menu update to set log window menu titles.
401        self.log_pane.application.update_menu_items()
402        # Redraw the UI
403        self.log_pane.application.redraw_ui()
404
405    def install_new_filter(self):
406        """Set a filter using the current search_regex."""
407        if not self.search_filter:
408            return
409
410        self.filtering_on = True
411        self.filters[self.search_text] = copy.deepcopy(self.search_filter)
412
413        self.clear_search()
414
415    def apply_filter(self):
416        """Set new filter and schedule historical log filter asyncio task."""
417        if self.websocket_running:
418            return
419        self.install_new_filter()
420        self._restart_filtering()
421
422    def clear_search_highlighting(self):
423        self.search_highlight = False
424        self._reset_log_screen_on_next_render = True
425
426    def clear_search(self):
427        self.search_matched_lines = {}
428        self.search_text = None
429        self.search_filter = None
430        self.search_highlight = False
431        self._reset_log_screen_on_next_render = True
432
433    def _get_log_lines(self) -> tuple[int, collections.deque[LogLine]]:
434        logs = self.log_store.logs
435        if self.filtering_on:
436            logs = self.filtered_logs
437        return self._scrollback_start_index, logs
438
439    def _get_visible_log_lines(self):
440        _, logs = self._get_log_lines()
441        if self._scrollback_start_index > 0:
442            return collections.deque(
443                itertools.islice(logs, self.hidden_line_count(), len(logs))
444            )
445        return logs
446
447    def _get_table_formatter(self) -> Callable | None:
448        table_formatter = None
449        if self.log_pane.table_view:
450            table_formatter = self.log_store.table.formatted_row
451        return table_formatter
452
453    def delete_filter(self, filter_text):
454        if filter_text not in self.filters:
455            return
456
457        # Delete this filter
458        del self.filters[filter_text]
459
460        # If no filters left, stop filtering.
461        if len(self.filters) == 0:
462            self.clear_filters()
463        else:
464            # Erase existing filtered lines.
465            self._restart_filtering()
466
467    def clear_filters(self):
468        if not self.filtering_on:
469            return
470        self.clear_search()
471        self.filtering_on = False
472        self.filters: collections.OrderedDict[
473            str, re.Pattern
474        ] = collections.OrderedDict()
475        self.filtered_logs.clear()
476        # Reset scrollback start
477        self._scrollback_start_index = 0
478        if not self.follow:
479            self.toggle_follow()
480
481    async def count_search_matches(self):
482        """Count search matches and save their locations."""
483        # Wait for any filter_existing_logs_task to finish.
484        if self.filtering_on and self.filter_existing_logs_task:
485            await self.filter_existing_logs_task
486
487        starting_index = self.get_last_log_index()
488        ending_index, logs = self._get_log_lines()
489
490        # From the end of the log store to the beginning.
491        for i in range(starting_index, ending_index - 1, -1):
492            # Is this log a match?
493            if self.search_filter.matches(logs[i]):
494                self.save_search_matched_line(i)
495            # Pause every 100 lines or so
496            if i % 100 == 0:
497                await asyncio.sleep(0.1)
498
499    async def filter_past_logs(self):
500        """Filter past log lines."""
501        starting_index = self.log_store.get_last_log_index()
502        ending_index = -1
503
504        # From the end of the log store to the beginning.
505        for i in range(starting_index, ending_index, -1):
506            # Is this log a match?
507            if self.filter_scan(self.log_store.logs[i]):
508                # Add to the beginning of the deque.
509                self.filtered_logs.appendleft(self.log_store.logs[i])
510            # TODO(tonymd): Tune these values.
511            # Pause every 100 lines or so
512            if i % 100 == 0:
513                await asyncio.sleep(0.1)
514
515    def set_log_pane(self, log_pane: LogPane):
516        """Set the parent LogPane instance."""
517        self.log_pane = log_pane
518
519    def _update_log_index(self) -> ScreenLine:
520        line_at_cursor = self.log_screen.get_line_at_cursor_position()
521        if line_at_cursor.log_index is not None:
522            self.log_index = line_at_cursor.log_index
523        return line_at_cursor
524
525    def get_current_line(self) -> int:
526        """Return the currently selected log event index."""
527        return self.log_index
528
529    def get_total_count(self):
530        """Total size of the logs store."""
531        return (
532            len(self.filtered_logs)
533            if self.filtering_on
534            else self.log_store.get_total_count()
535        )
536
537    def get_last_log_index(self):
538        total = self.get_total_count()
539        return 0 if total < 0 else total - 1
540
541    def clear_scrollback(self):
542        """Hide log lines before the max length of the stored logs."""
543        # Enable follow and scroll to the bottom, then clear.
544        if not self.follow:
545            self.toggle_follow()
546        self._scrollback_start_index = self.log_index
547        self._reset_log_screen_on_next_render = True
548
549    def hidden_line_count(self):
550        """Return the number of hidden lines."""
551        if self._scrollback_start_index > 0:
552            return self._scrollback_start_index + 1
553        return 0
554
555    def undo_clear_scrollback(self):
556        """Reset the current scrollback start index."""
557        self._scrollback_start_index = 0
558
559    def wrap_lines_enabled(self):
560        """Get the parent log pane wrap lines setting."""
561        if not self.log_pane:
562            return False
563        return self.log_pane.wrap_lines
564
565    def toggle_follow(self):
566        """Toggle auto line following."""
567        self.follow = not self.follow
568        if self.follow:
569            # Disable search match follow mode.
570            self.follow_search_match = False
571            self.scroll_to_bottom()
572
573    def filter_scan(self, log: LogLine):
574        filter_match_count = 0
575        for _filter_text, log_filter in self.filters.items():
576            if log_filter.matches(log):
577                filter_match_count += 1
578            else:
579                break
580
581        if filter_match_count == len(self.filters):
582            return True
583        return False
584
585    def new_logs_arrived(self):
586        """Check newly arrived log messages.
587
588        Depending on where log statements occur ``new_logs_arrived`` may be in a
589        separate thread since it is triggerd by the Python log handler
590        ``emit()`` function. In this case the log handler is the LogStore
591        instance ``self.log_store``. This function should not redraw the screen
592        or scroll.
593        """
594        latest_total = self.log_store.get_total_count()
595
596        if self.filtering_on:
597            # Scan newly arived log lines
598            for i in range(self._last_log_store_index, latest_total):
599                if self.filter_scan(self.log_store.logs[i]):
600                    self.filtered_logs.append(self.log_store.logs[i])
601
602        if self.search_filter:
603            last_matched_log: int | None = None
604            # Scan newly arived log lines
605            for i in range(self._last_log_store_index, latest_total):
606                if self.search_filter.matches(self.log_store.logs[i]):
607                    self.save_search_matched_line(i)
608                    last_matched_log = i
609            if last_matched_log and self.follow_search_match:
610                # Set the follow event flag for the next render_content call.
611                self.follow_event = FollowEvent.SEARCH_MATCH
612                self.last_search_matched_log = last_matched_log
613
614        self._last_log_store_index = latest_total
615        self._new_logs_since_last_render = True
616        self._new_logs_since_last_websocket_serve = True
617
618        if self.follow:
619            # Set the follow event flag for the next render_content call.
620            self.follow_event = FollowEvent.STICKY_FOLLOW
621
622        if self.websocket_running:
623            # No terminal screen redraws are required.
624            return
625
626        # Trigger a UI update if the log window is visible.
627        if self.log_pane.show_pane:
628            self.log_pane.application.logs_redraw()
629
630    def get_cursor_position(self) -> Point:
631        """Return the position of the cursor."""
632        return Point(0, self.log_screen.cursor_position)
633
634    def scroll_to_top(self):
635        """Move selected index to the beginning."""
636        # Stop following so cursor doesn't jump back down to the bottom.
637        self.follow = False
638        # First possible log index that should be displayed
639        log_beginning_index = self.hidden_line_count()
640        self.log_index = log_beginning_index
641        self.log_screen.reset_logs(log_index=self.log_index)
642        self.log_screen.shift_selected_log_to_top()
643        self._user_scroll_event = True
644
645    def move_selected_line_to_top(self):
646        self.follow = False
647
648        # Update selected line
649        self._update_log_index()
650
651        self.log_screen.reset_logs(log_index=self.log_index)
652        self.log_screen.shift_selected_log_to_top()
653        self._user_scroll_event = True
654
655    def center_log_line(self):
656        self.follow = False
657
658        # Update selected line
659        self._update_log_index()
660
661        self.log_screen.reset_logs(log_index=self.log_index)
662        self.log_screen.shift_selected_log_to_center()
663        self._user_scroll_event = True
664
665    def scroll_to_bottom(self, with_sticky_follow: bool = True):
666        """Move selected index to the end."""
667        # Don't change following state like scroll_to_top.
668        self.log_index = max(0, self.get_last_log_index())
669        self.log_screen.reset_logs(log_index=self.log_index)
670
671        # Sticky follow mode
672        if with_sticky_follow:
673            self.follow = True
674        self._user_scroll_event = True
675
676    def scroll(self, lines) -> None:
677        """Scroll up or down by plus or minus lines.
678
679        This method is only called by user keybindings.
680        """
681        # If the user starts scrolling, stop auto following.
682        self.follow = False
683
684        self.log_screen.scroll_subline(lines)
685        self._user_scroll_event = True
686
687        # Update the current log
688        current_line = self._update_log_index()
689
690        # Don't check for sticky follow mode if selecting lines.
691        if self.visual_select_mode:
692            return
693        # Is the last log line selected?
694        if self.log_index == self.get_last_log_index():
695            # Is the last line of the current log selected?
696            if current_line.subline + 1 == current_line.height:
697                # Sticky follow mode
698                self.follow = True
699
700    def visual_selected_log_count(self) -> int:
701        if self.marked_logs_start is None or self.marked_logs_end is None:
702            return 0
703        return (self.marked_logs_end - self.marked_logs_start) + 1
704
705    def clear_visual_selection(self) -> None:
706        self.marked_logs_start = None
707        self.marked_logs_end = None
708        self.visual_select_mode = False
709        self._user_scroll_event = True
710        self.log_pane.application.redraw_ui()
711
712    def visual_select_all(self) -> None:
713        self.marked_logs_start = self._scrollback_start_index
714        self.marked_logs_end = self.get_total_count() - 1
715
716        self.visual_select_mode = True
717        self._user_scroll_event = True
718        self.log_pane.application.redraw_ui()
719
720    def visual_select_up(self) -> None:
721        # Select the current line
722        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
723        # Move the cursor by 1
724        self.scroll_up(1)
725        # Select the new line
726        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
727
728    def visual_select_down(self) -> None:
729        # Select the current line
730        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
731        # Move the cursor by 1
732        self.scroll_down(1)
733        # Select the new line
734        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
735
736    def visual_select_line(
737        self, mouse_position: Point, autoscroll: bool = True
738    ) -> None:
739        """Mark the log under mouse_position as visually selected."""
740        # Check mouse_position is valid
741        if not 0 <= mouse_position.y < len(self.log_screen.line_buffer):
742            return
743        # Update mode flags
744        self.visual_select_mode = True
745        self.follow = False
746        # Get the ScreenLine for the cursor position
747        screen_line = self.log_screen.line_buffer[mouse_position.y]
748        if screen_line.log_index is None:
749            return
750
751        if self.marked_logs_start is None:
752            self.marked_logs_start = screen_line.log_index
753        if self.marked_logs_end is None:
754            self.marked_logs_end = screen_line.log_index
755
756        if screen_line.log_index < self.marked_logs_start:
757            self.marked_logs_start = screen_line.log_index
758        elif screen_line.log_index > self.marked_logs_end:
759            self.marked_logs_end = screen_line.log_index
760
761        # Update cursor position
762        self.log_screen.move_cursor_to_position(mouse_position.y)
763
764        # Autoscroll when mouse dragging on the top or bottom of the window.
765        if autoscroll:
766            if mouse_position.y == 0:
767                self.scroll_up(1)
768            elif mouse_position.y == self._window_height - 1:
769                self.scroll_down(1)
770
771        # Trigger a rerender.
772        self._user_scroll_event = True
773        self.log_pane.application.redraw_ui()
774
775    def scroll_to_position(self, mouse_position: Point):
776        """Set the selected log line to the mouse_position."""
777        # Disable follow mode when the user clicks or mouse drags on a log line.
778        self.follow = False
779
780        self.log_screen.move_cursor_to_position(mouse_position.y)
781        self._update_log_index()
782
783        self._user_scroll_event = True
784
785    def scroll_up_one_page(self):
786        """Move the selected log index up by one window height."""
787        lines = 1
788        if self._window_height > 0:
789            lines = self._window_height
790        self.scroll(-1 * lines)
791
792    def scroll_down_one_page(self):
793        """Move the selected log index down by one window height."""
794        lines = 1
795        if self._window_height > 0:
796            lines = self._window_height
797        self.scroll(lines)
798
799    def scroll_down(self, lines=1):
800        """Move the selected log index down by one or more lines."""
801        self.scroll(lines)
802
803    def scroll_up(self, lines=1):
804        """Move the selected log index up by one or more lines."""
805        self.scroll(-1 * lines)
806
807    def log_start_end_indexes_changed(self) -> bool:
808        return (
809            self._last_start_index != self._current_start_index
810            or self._last_end_index != self._current_end_index
811        )
812
813    def render_table_header(self):
814        """Get pre-formatted table header."""
815        return self.log_store.render_table_header()
816
817    def get_web_socket_url(self):
818        return f'http://127.0.0.1:3000/#ws={self.websocket_port}'
819
820    def render_content(self) -> list:
821        """Return logs to display on screen as a list of FormattedText tuples.
822
823        This function determines when the log screen requires re-rendeing based
824        on user scroll events, follow mode being on, or log pane being
825        empty. The FormattedText tuples passed to prompt_toolkit are cached if
826        no updates are required.
827        """
828        screen_update_needed = False
829
830        # Disable rendering if user is viewing logs on web
831        if self.websocket_running:
832            return []
833
834        # Check window size
835        if self.log_pane.pane_resized():
836            self._window_width = self.log_pane.current_log_pane_width
837            self._window_height = self.log_pane.current_log_pane_height
838            self.log_screen.resize(self._window_width, self._window_height)
839            self._reset_log_screen_on_next_render = True
840
841        if self.follow_event is not None:
842            if (
843                self.follow_event == FollowEvent.SEARCH_MATCH
844                and self.last_search_matched_log
845            ):
846                self.log_index = self.last_search_matched_log
847                self.last_search_matched_log = None
848                self._reset_log_screen_on_next_render = True
849
850            elif self.follow_event == FollowEvent.STICKY_FOLLOW:
851                # Jump to the last log message
852                self.log_index = max(0, self.get_last_log_index())
853
854            self.follow_event = None
855            screen_update_needed = True
856
857        if self._reset_log_screen_on_next_render or self.log_screen.empty():
858            # Clear the reset flag.
859            self._reset_log_screen_on_next_render = False
860            self.log_screen.reset_logs(log_index=self.log_index)
861            screen_update_needed = True
862
863        elif self.follow and self._new_logs_since_last_render:
864            # Follow mode is on so add new logs to the screen
865            self._new_logs_since_last_render = False
866
867            current_log_index = self.log_index
868            last_rendered_log_index = self.log_screen.last_appended_log_index
869            # If so many logs have arrived than can fit on the screen, redraw
870            # the whole screen from the new position.
871            if (
872                current_log_index - last_rendered_log_index
873            ) > self.log_screen.height:
874                self.log_screen.reset_logs(log_index=self.log_index)
875            # A small amount of logs have arrived, append them one at a time
876            # without redrawing the whole screen.
877            else:
878                for i in range(
879                    last_rendered_log_index + 1, current_log_index + 1
880                ):
881                    self.log_screen.append_log(i)
882
883            screen_update_needed = True
884
885        if self.follow:
886            # Select the last line for follow mode.
887            self.log_screen.move_cursor_to_bottom()
888            screen_update_needed = True
889
890        if self._user_scroll_event:
891            self._user_scroll_event = False
892            screen_update_needed = True
893
894        if screen_update_needed:
895            self._line_fragment_cache = self.log_screen.get_lines(
896                marked_logs_start=self.marked_logs_start,
897                marked_logs_end=self.marked_logs_end,
898            )
899        return self._line_fragment_cache
900
901    def _logs_to_text(
902        self,
903        use_table_formatting: bool = True,
904        selected_lines_only: bool = False,
905    ) -> str:
906        """Convert all or selected log messages to plaintext."""
907
908        def get_table_string(log: LogLine) -> str:
909            return remove_formatting(self.log_store.table.formatted_row(log))
910
911        formatter: Callable[[LogLine], str] = operator.attrgetter(
912            'ansi_stripped_log'
913        )
914        if use_table_formatting:
915            formatter = get_table_string
916
917        _start_log_index, log_source = self._get_log_lines()
918
919        log_index_range = range(
920            self._scrollback_start_index, self.get_total_count()
921        )
922        if (
923            selected_lines_only
924            and self.marked_logs_start is not None
925            and self.marked_logs_end is not None
926        ):
927            log_index_range = range(
928                self.marked_logs_start, self.marked_logs_end + 1
929            )
930
931        text_output = ''
932        for i in log_index_range:
933            log_text = formatter(log_source[i])
934            text_output += log_text
935            if not log_text.endswith('\n'):
936                text_output += '\n'
937
938        return text_output
939
940    def export_logs(
941        self,
942        use_table_formatting: bool = True,
943        selected_lines_only: bool = False,
944        file_name: str | None = None,
945        to_clipboard: bool = False,
946        add_markdown_fence: bool = False,
947    ) -> bool:
948        """Export log lines to file or clipboard."""
949        text_output = self._logs_to_text(
950            use_table_formatting, selected_lines_only
951        )
952
953        if file_name:
954            target_path = Path(file_name).expanduser()
955            with target_path.open('w') as output_file:
956                output_file.write(text_output)
957            _LOG.debug('Saved to file: %s', file_name)
958
959        elif to_clipboard:
960            if add_markdown_fence:
961                text_output = '```\n' + text_output + '```\n'
962            self.log_pane.application.set_system_clipboard(text_output)
963            _LOG.debug('Copied logs to clipboard.')
964
965        return True
966