1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""LogView maintains a log pane's scrolling and searching state.""" 15 16from __future__ import annotations 17import asyncio 18import collections 19import copy 20from enum import Enum 21import itertools 22import json 23import logging 24import operator 25from pathlib import Path 26import re 27from threading import Thread 28from typing import Callable, TYPE_CHECKING 29 30from prompt_toolkit.data_structures import Point 31from prompt_toolkit.formatted_text import StyleAndTextTuples 32import websockets 33 34from pw_console.log_filter import ( 35 DEFAULT_SEARCH_MATCHER, 36 LogFilter, 37 RegexValidator, 38 SearchMatcher, 39 preprocess_search_regex, 40) 41from pw_console.log_screen import ScreenLine, LogScreen 42from pw_console.log_store import LogStore 43from pw_console.python_logging import log_record_to_json 44from pw_console.text_formatting import remove_formatting 45 46if TYPE_CHECKING: 47 from pw_console.console_app import ConsoleApp 48 from pw_console.log_line import LogLine 49 from pw_console.log_pane import LogPane 50 51_LOG = logging.getLogger(__package__) 52 53 54class FollowEvent(Enum): 55 """Follow mode scroll event types.""" 56 57 SEARCH_MATCH = 'scroll_to_bottom' 58 STICKY_FOLLOW = 'scroll_to_bottom_with_sticky_follow' 59 60 61class LogView: 62 """Viewing window into a LogStore.""" 63 64 # pylint: disable=too-many-instance-attributes,too-many-public-methods 65 66 def __init__( 67 self, 68 log_pane: LogPane, 69 application: ConsoleApp, 70 log_store: LogStore | None = None, 71 ): 72 # Parent LogPane reference. Updated by calling `set_log_pane()`. 73 self.log_pane = log_pane 74 self.log_store = ( 75 log_store if log_store else LogStore(prefs=application.prefs) 76 ) 77 self.log_store.set_prefs(application.prefs) 78 self.log_store.register_viewer(self) 79 80 self.marked_logs_start: int | None = None 81 self.marked_logs_end: int | None = None 82 83 # Search variables 84 self.search_text: str | None = None 85 self.search_filter: LogFilter | None = None 86 self.search_highlight: bool = False 87 self.search_matcher = DEFAULT_SEARCH_MATCHER 88 self.search_validator = RegexValidator() 89 90 # Container for each log_index matched by active searches. 91 self.search_matched_lines: dict[int, int] = {} 92 # Background task to find historical matched lines. 93 self.search_match_count_task: asyncio.Task | None = None 94 95 # Flag for automatically jumping to each new search match as they 96 # appear. 97 self.follow_search_match: bool = False 98 self.last_search_matched_log: int | None = None 99 100 # Follow event flag. This is set to by the new_logs_arrived() function 101 # as a signal that the log screen should be scrolled to the bottom. 102 # This is read by render_content() whenever the screen is drawn. 103 self.follow_event: FollowEvent | None = None 104 105 self.log_screen = LogScreen( 106 get_log_source=self._get_log_lines, 107 get_line_wrapping=self.wrap_lines_enabled, 108 get_log_formatter=self._get_table_formatter, 109 get_search_filter=lambda: self.search_filter, 110 get_search_highlight=lambda: self.search_highlight, 111 ) 112 113 # Filter 114 self.filtering_on: bool = False 115 self.filters: collections.OrderedDict[ 116 str, LogFilter 117 ] = collections.OrderedDict() 118 self.filtered_logs: collections.deque = collections.deque() 119 self.filter_existing_logs_task: asyncio.Task | None = None 120 121 # Current log line index state variables: 122 self._last_log_index = -1 123 self._log_index = 0 124 self._filtered_log_index = 0 125 self._last_start_index = 0 126 self._last_end_index = 0 127 self._current_start_index = 0 128 self._current_end_index = 0 129 self._scrollback_start_index = 0 130 131 # LogPane prompt_toolkit container render size. 132 self._window_height = 20 133 self._window_width = 80 134 self._reset_log_screen_on_next_render: bool = True 135 self._user_scroll_event: bool = False 136 137 self._last_log_store_index = 0 138 self._new_logs_since_last_render = True 139 self._new_logs_since_last_websocket_serve = True 140 self._last_served_websocket_index = -1 141 142 # Should new log lines be tailed? 143 self.follow: bool = True 144 145 self.visual_select_mode: bool = False 146 147 # Cache of formatted text tuples used in the last UI render. 148 self._line_fragment_cache: list[StyleAndTextTuples] = [] 149 150 # websocket server variables 151 self.websocket_running: bool = False 152 self.websocket_server = None 153 self.websocket_port = None 154 self.websocket_loop = asyncio.new_event_loop() 155 156 # Check if any logs are already in the log_store and update the view. 157 self.new_logs_arrived() 158 159 def _websocket_thread_entry(self): 160 """Entry point for the user code thread.""" 161 asyncio.set_event_loop(self.websocket_loop) 162 self.websocket_server = websockets.serve( # type: ignore # pylint: disable=no-member 163 self._send_logs_over_websockets, '127.0.0.1' 164 ) 165 self.websocket_loop.run_until_complete(self.websocket_server) 166 self.websocket_port = self.websocket_server.ws_server.sockets[ 167 0 168 ].getsockname()[1] 169 self.websocket_running = True 170 self.websocket_loop.run_forever() 171 172 def start_websocket_thread(self): 173 """Create a thread for running user code so the UI isn't blocked.""" 174 thread = Thread( 175 target=self._websocket_thread_entry, args=(), daemon=True 176 ) 177 thread.start() 178 179 def stop_websocket_thread(self): 180 """Stop websocket server.""" 181 if self.websocket_running: 182 self.websocket_loop.call_soon_threadsafe(self.websocket_loop.stop) 183 self.websocket_server = None 184 self.websocket_port = None 185 self.websocket_running = False 186 if self.filtering_on: 187 self._restart_filtering() 188 189 async def _send_logs_over_websockets(self, websocket, _path) -> None: 190 def formatter(log: LogLine) -> str: 191 return log_record_to_json(log.record) 192 193 theme_colors = json.dumps( 194 self.log_pane.application.prefs.pw_console_color_config() 195 ) 196 # Send colors 197 await websocket.send(theme_colors) 198 199 while True: 200 # Wait for new logs 201 if not self._new_logs_since_last_websocket_serve: 202 await asyncio.sleep(0.5) 203 204 _start_log_index, log_source = self._get_log_lines() 205 log_index_range = range( 206 self._last_served_websocket_index + 1, self.get_total_count() 207 ) 208 209 for i in log_index_range: 210 log_text = formatter(log_source[i]) 211 await websocket.send(log_text) 212 self._last_served_websocket_index = i 213 214 # Flag that all logs have been served. 215 self._new_logs_since_last_websocket_serve = False 216 217 def view_mode_changed(self) -> None: 218 self._reset_log_screen_on_next_render = True 219 220 @property 221 def log_index(self): 222 if self.filtering_on: 223 return self._filtered_log_index 224 return self._log_index 225 226 @log_index.setter 227 def log_index(self, new_log_index): 228 # Save the old log_index 229 self._last_log_index = self.log_index 230 if self.filtering_on: 231 self._filtered_log_index = new_log_index 232 else: 233 self._log_index = new_log_index 234 235 def _reset_log_index_changed(self) -> None: 236 self._last_log_index = self.log_index 237 238 def log_index_changed_since_last_render(self) -> bool: 239 return self._last_log_index != self.log_index 240 241 def _set_match_position(self, position: int): 242 self.follow = False 243 self.log_index = position 244 self.save_search_matched_line(position) 245 self.log_screen.reset_logs(log_index=self.log_index) 246 self.log_screen.shift_selected_log_to_center() 247 self._user_scroll_event = True 248 self.log_pane.application.redraw_ui() 249 250 def select_next_search_matcher(self): 251 matchers = list(SearchMatcher) 252 index = matchers.index(self.search_matcher) 253 new_index = (index + 1) % len(matchers) 254 self.search_matcher = matchers[new_index] 255 256 def search_forwards(self): 257 if not self.search_filter: 258 return 259 self.search_highlight = True 260 261 log_beginning_index = self.hidden_line_count() 262 263 starting_index = self.log_index + 1 264 if starting_index > self.get_last_log_index(): 265 starting_index = log_beginning_index 266 267 _, logs = self._get_log_lines() 268 269 # From current position +1 and down 270 for i in range(starting_index, self.get_last_log_index() + 1): 271 if self.search_filter.matches(logs[i]): 272 self._set_match_position(i) 273 return 274 275 # From the beginning to the original start 276 for i in range(log_beginning_index, starting_index): 277 if self.search_filter.matches(logs[i]): 278 self._set_match_position(i) 279 return 280 281 def search_backwards(self): 282 if not self.search_filter: 283 return 284 self.search_highlight = True 285 286 log_beginning_index = self.hidden_line_count() 287 288 starting_index = self.log_index - 1 289 if starting_index < 0: 290 starting_index = self.get_last_log_index() 291 292 _, logs = self._get_log_lines() 293 294 # From current position - 1 and up 295 for i in range(starting_index, log_beginning_index - 1, -1): 296 if self.search_filter.matches(logs[i]): 297 self._set_match_position(i) 298 return 299 300 # From the end to the original start 301 for i in range(self.get_last_log_index(), starting_index, -1): 302 if self.search_filter.matches(logs[i]): 303 self._set_match_position(i) 304 return 305 306 def set_search_regex( 307 self, text, invert, field, matcher: SearchMatcher | None = None 308 ) -> bool: 309 search_matcher = matcher if matcher else self.search_matcher 310 _LOG.debug(search_matcher) 311 312 regex_text, regex_flags = preprocess_search_regex( 313 text, matcher=search_matcher 314 ) 315 316 try: 317 compiled_regex = re.compile(regex_text, regex_flags) 318 self.search_filter = LogFilter( 319 regex=compiled_regex, 320 input_text=text, 321 invert=invert, 322 field=field, 323 ) 324 _LOG.debug(self.search_filter) 325 except re.error as error: 326 _LOG.debug(error) 327 return False 328 329 self.search_highlight = True 330 self.search_text = regex_text 331 return True 332 333 def new_search( 334 self, 335 text, 336 invert=False, 337 field: str | None = None, 338 search_matcher: str | None = None, 339 interactive: bool = True, 340 ) -> bool: 341 """Start a new search for the given text.""" 342 valid_matchers = list(s.name for s in SearchMatcher) 343 selected_matcher: SearchMatcher | None = None 344 if ( 345 search_matcher is not None 346 and search_matcher.upper() in valid_matchers 347 ): 348 selected_matcher = SearchMatcher(search_matcher.upper()) 349 350 if not self.set_search_regex(text, invert, field, selected_matcher): 351 return False 352 353 # Clear matched lines 354 self.search_matched_lines = {} 355 356 if interactive: 357 # Start count historical search matches task. 358 self.search_match_count_task = asyncio.create_task( 359 self.count_search_matches() 360 ) 361 362 # Default search direction when hitting enter in the search bar. 363 if interactive: 364 self.search_forwards() 365 return True 366 367 def save_search_matched_line(self, log_index: int) -> None: 368 """Save the log_index at position as a matched line.""" 369 self.search_matched_lines[log_index] = 0 370 # Keep matched lines sorted by position 371 self.search_matched_lines = { 372 # Save this log_index and its match number. 373 log_index: match_number 374 for match_number, log_index in enumerate( 375 sorted(self.search_matched_lines.keys()) 376 ) 377 } 378 379 def disable_search_highlighting(self): 380 self.log_pane.log_view.search_highlight = False 381 382 def _restart_filtering(self): 383 # Turn on follow 384 if not self.follow: 385 self.toggle_follow() 386 387 # Reset filtered logs. 388 self.filtered_logs.clear() 389 # Reset scrollback start 390 self._scrollback_start_index = 0 391 392 # Start filtering existing log lines. 393 self.filter_existing_logs_task = asyncio.create_task( 394 self.filter_past_logs() 395 ) 396 397 # Reset existing search 398 self.clear_search() 399 400 # Trigger a main menu update to set log window menu titles. 401 self.log_pane.application.update_menu_items() 402 # Redraw the UI 403 self.log_pane.application.redraw_ui() 404 405 def install_new_filter(self): 406 """Set a filter using the current search_regex.""" 407 if not self.search_filter: 408 return 409 410 self.filtering_on = True 411 self.filters[self.search_text] = copy.deepcopy(self.search_filter) 412 413 self.clear_search() 414 415 def apply_filter(self): 416 """Set new filter and schedule historical log filter asyncio task.""" 417 if self.websocket_running: 418 return 419 self.install_new_filter() 420 self._restart_filtering() 421 422 def clear_search_highlighting(self): 423 self.search_highlight = False 424 self._reset_log_screen_on_next_render = True 425 426 def clear_search(self): 427 self.search_matched_lines = {} 428 self.search_text = None 429 self.search_filter = None 430 self.search_highlight = False 431 self._reset_log_screen_on_next_render = True 432 433 def _get_log_lines(self) -> tuple[int, collections.deque[LogLine]]: 434 logs = self.log_store.logs 435 if self.filtering_on: 436 logs = self.filtered_logs 437 return self._scrollback_start_index, logs 438 439 def _get_visible_log_lines(self): 440 _, logs = self._get_log_lines() 441 if self._scrollback_start_index > 0: 442 return collections.deque( 443 itertools.islice(logs, self.hidden_line_count(), len(logs)) 444 ) 445 return logs 446 447 def _get_table_formatter(self) -> Callable | None: 448 table_formatter = None 449 if self.log_pane.table_view: 450 table_formatter = self.log_store.table.formatted_row 451 return table_formatter 452 453 def delete_filter(self, filter_text): 454 if filter_text not in self.filters: 455 return 456 457 # Delete this filter 458 del self.filters[filter_text] 459 460 # If no filters left, stop filtering. 461 if len(self.filters) == 0: 462 self.clear_filters() 463 else: 464 # Erase existing filtered lines. 465 self._restart_filtering() 466 467 def clear_filters(self): 468 if not self.filtering_on: 469 return 470 self.clear_search() 471 self.filtering_on = False 472 self.filters: collections.OrderedDict[ 473 str, re.Pattern 474 ] = collections.OrderedDict() 475 self.filtered_logs.clear() 476 # Reset scrollback start 477 self._scrollback_start_index = 0 478 if not self.follow: 479 self.toggle_follow() 480 481 async def count_search_matches(self): 482 """Count search matches and save their locations.""" 483 # Wait for any filter_existing_logs_task to finish. 484 if self.filtering_on and self.filter_existing_logs_task: 485 await self.filter_existing_logs_task 486 487 starting_index = self.get_last_log_index() 488 ending_index, logs = self._get_log_lines() 489 490 # From the end of the log store to the beginning. 491 for i in range(starting_index, ending_index - 1, -1): 492 # Is this log a match? 493 if self.search_filter.matches(logs[i]): 494 self.save_search_matched_line(i) 495 # Pause every 100 lines or so 496 if i % 100 == 0: 497 await asyncio.sleep(0.1) 498 499 async def filter_past_logs(self): 500 """Filter past log lines.""" 501 starting_index = self.log_store.get_last_log_index() 502 ending_index = -1 503 504 # From the end of the log store to the beginning. 505 for i in range(starting_index, ending_index, -1): 506 # Is this log a match? 507 if self.filter_scan(self.log_store.logs[i]): 508 # Add to the beginning of the deque. 509 self.filtered_logs.appendleft(self.log_store.logs[i]) 510 # TODO(tonymd): Tune these values. 511 # Pause every 100 lines or so 512 if i % 100 == 0: 513 await asyncio.sleep(0.1) 514 515 def set_log_pane(self, log_pane: LogPane): 516 """Set the parent LogPane instance.""" 517 self.log_pane = log_pane 518 519 def _update_log_index(self) -> ScreenLine: 520 line_at_cursor = self.log_screen.get_line_at_cursor_position() 521 if line_at_cursor.log_index is not None: 522 self.log_index = line_at_cursor.log_index 523 return line_at_cursor 524 525 def get_current_line(self) -> int: 526 """Return the currently selected log event index.""" 527 return self.log_index 528 529 def get_total_count(self): 530 """Total size of the logs store.""" 531 return ( 532 len(self.filtered_logs) 533 if self.filtering_on 534 else self.log_store.get_total_count() 535 ) 536 537 def get_last_log_index(self): 538 total = self.get_total_count() 539 return 0 if total < 0 else total - 1 540 541 def clear_scrollback(self): 542 """Hide log lines before the max length of the stored logs.""" 543 # Enable follow and scroll to the bottom, then clear. 544 if not self.follow: 545 self.toggle_follow() 546 self._scrollback_start_index = self.log_index 547 self._reset_log_screen_on_next_render = True 548 549 def hidden_line_count(self): 550 """Return the number of hidden lines.""" 551 if self._scrollback_start_index > 0: 552 return self._scrollback_start_index + 1 553 return 0 554 555 def undo_clear_scrollback(self): 556 """Reset the current scrollback start index.""" 557 self._scrollback_start_index = 0 558 559 def wrap_lines_enabled(self): 560 """Get the parent log pane wrap lines setting.""" 561 if not self.log_pane: 562 return False 563 return self.log_pane.wrap_lines 564 565 def toggle_follow(self): 566 """Toggle auto line following.""" 567 self.follow = not self.follow 568 if self.follow: 569 # Disable search match follow mode. 570 self.follow_search_match = False 571 self.scroll_to_bottom() 572 573 def filter_scan(self, log: LogLine): 574 filter_match_count = 0 575 for _filter_text, log_filter in self.filters.items(): 576 if log_filter.matches(log): 577 filter_match_count += 1 578 else: 579 break 580 581 if filter_match_count == len(self.filters): 582 return True 583 return False 584 585 def new_logs_arrived(self): 586 """Check newly arrived log messages. 587 588 Depending on where log statements occur ``new_logs_arrived`` may be in a 589 separate thread since it is triggerd by the Python log handler 590 ``emit()`` function. In this case the log handler is the LogStore 591 instance ``self.log_store``. This function should not redraw the screen 592 or scroll. 593 """ 594 latest_total = self.log_store.get_total_count() 595 596 if self.filtering_on: 597 # Scan newly arived log lines 598 for i in range(self._last_log_store_index, latest_total): 599 if self.filter_scan(self.log_store.logs[i]): 600 self.filtered_logs.append(self.log_store.logs[i]) 601 602 if self.search_filter: 603 last_matched_log: int | None = None 604 # Scan newly arived log lines 605 for i in range(self._last_log_store_index, latest_total): 606 if self.search_filter.matches(self.log_store.logs[i]): 607 self.save_search_matched_line(i) 608 last_matched_log = i 609 if last_matched_log and self.follow_search_match: 610 # Set the follow event flag for the next render_content call. 611 self.follow_event = FollowEvent.SEARCH_MATCH 612 self.last_search_matched_log = last_matched_log 613 614 self._last_log_store_index = latest_total 615 self._new_logs_since_last_render = True 616 self._new_logs_since_last_websocket_serve = True 617 618 if self.follow: 619 # Set the follow event flag for the next render_content call. 620 self.follow_event = FollowEvent.STICKY_FOLLOW 621 622 if self.websocket_running: 623 # No terminal screen redraws are required. 624 return 625 626 # Trigger a UI update if the log window is visible. 627 if self.log_pane.show_pane: 628 self.log_pane.application.logs_redraw() 629 630 def get_cursor_position(self) -> Point: 631 """Return the position of the cursor.""" 632 return Point(0, self.log_screen.cursor_position) 633 634 def scroll_to_top(self): 635 """Move selected index to the beginning.""" 636 # Stop following so cursor doesn't jump back down to the bottom. 637 self.follow = False 638 # First possible log index that should be displayed 639 log_beginning_index = self.hidden_line_count() 640 self.log_index = log_beginning_index 641 self.log_screen.reset_logs(log_index=self.log_index) 642 self.log_screen.shift_selected_log_to_top() 643 self._user_scroll_event = True 644 645 def move_selected_line_to_top(self): 646 self.follow = False 647 648 # Update selected line 649 self._update_log_index() 650 651 self.log_screen.reset_logs(log_index=self.log_index) 652 self.log_screen.shift_selected_log_to_top() 653 self._user_scroll_event = True 654 655 def center_log_line(self): 656 self.follow = False 657 658 # Update selected line 659 self._update_log_index() 660 661 self.log_screen.reset_logs(log_index=self.log_index) 662 self.log_screen.shift_selected_log_to_center() 663 self._user_scroll_event = True 664 665 def scroll_to_bottom(self, with_sticky_follow: bool = True): 666 """Move selected index to the end.""" 667 # Don't change following state like scroll_to_top. 668 self.log_index = max(0, self.get_last_log_index()) 669 self.log_screen.reset_logs(log_index=self.log_index) 670 671 # Sticky follow mode 672 if with_sticky_follow: 673 self.follow = True 674 self._user_scroll_event = True 675 676 def scroll(self, lines) -> None: 677 """Scroll up or down by plus or minus lines. 678 679 This method is only called by user keybindings. 680 """ 681 # If the user starts scrolling, stop auto following. 682 self.follow = False 683 684 self.log_screen.scroll_subline(lines) 685 self._user_scroll_event = True 686 687 # Update the current log 688 current_line = self._update_log_index() 689 690 # Don't check for sticky follow mode if selecting lines. 691 if self.visual_select_mode: 692 return 693 # Is the last log line selected? 694 if self.log_index == self.get_last_log_index(): 695 # Is the last line of the current log selected? 696 if current_line.subline + 1 == current_line.height: 697 # Sticky follow mode 698 self.follow = True 699 700 def visual_selected_log_count(self) -> int: 701 if self.marked_logs_start is None or self.marked_logs_end is None: 702 return 0 703 return (self.marked_logs_end - self.marked_logs_start) + 1 704 705 def clear_visual_selection(self) -> None: 706 self.marked_logs_start = None 707 self.marked_logs_end = None 708 self.visual_select_mode = False 709 self._user_scroll_event = True 710 self.log_pane.application.redraw_ui() 711 712 def visual_select_all(self) -> None: 713 self.marked_logs_start = self._scrollback_start_index 714 self.marked_logs_end = self.get_total_count() - 1 715 716 self.visual_select_mode = True 717 self._user_scroll_event = True 718 self.log_pane.application.redraw_ui() 719 720 def visual_select_up(self) -> None: 721 # Select the current line 722 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 723 # Move the cursor by 1 724 self.scroll_up(1) 725 # Select the new line 726 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 727 728 def visual_select_down(self) -> None: 729 # Select the current line 730 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 731 # Move the cursor by 1 732 self.scroll_down(1) 733 # Select the new line 734 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 735 736 def visual_select_line( 737 self, mouse_position: Point, autoscroll: bool = True 738 ) -> None: 739 """Mark the log under mouse_position as visually selected.""" 740 # Check mouse_position is valid 741 if not 0 <= mouse_position.y < len(self.log_screen.line_buffer): 742 return 743 # Update mode flags 744 self.visual_select_mode = True 745 self.follow = False 746 # Get the ScreenLine for the cursor position 747 screen_line = self.log_screen.line_buffer[mouse_position.y] 748 if screen_line.log_index is None: 749 return 750 751 if self.marked_logs_start is None: 752 self.marked_logs_start = screen_line.log_index 753 if self.marked_logs_end is None: 754 self.marked_logs_end = screen_line.log_index 755 756 if screen_line.log_index < self.marked_logs_start: 757 self.marked_logs_start = screen_line.log_index 758 elif screen_line.log_index > self.marked_logs_end: 759 self.marked_logs_end = screen_line.log_index 760 761 # Update cursor position 762 self.log_screen.move_cursor_to_position(mouse_position.y) 763 764 # Autoscroll when mouse dragging on the top or bottom of the window. 765 if autoscroll: 766 if mouse_position.y == 0: 767 self.scroll_up(1) 768 elif mouse_position.y == self._window_height - 1: 769 self.scroll_down(1) 770 771 # Trigger a rerender. 772 self._user_scroll_event = True 773 self.log_pane.application.redraw_ui() 774 775 def scroll_to_position(self, mouse_position: Point): 776 """Set the selected log line to the mouse_position.""" 777 # Disable follow mode when the user clicks or mouse drags on a log line. 778 self.follow = False 779 780 self.log_screen.move_cursor_to_position(mouse_position.y) 781 self._update_log_index() 782 783 self._user_scroll_event = True 784 785 def scroll_up_one_page(self): 786 """Move the selected log index up by one window height.""" 787 lines = 1 788 if self._window_height > 0: 789 lines = self._window_height 790 self.scroll(-1 * lines) 791 792 def scroll_down_one_page(self): 793 """Move the selected log index down by one window height.""" 794 lines = 1 795 if self._window_height > 0: 796 lines = self._window_height 797 self.scroll(lines) 798 799 def scroll_down(self, lines=1): 800 """Move the selected log index down by one or more lines.""" 801 self.scroll(lines) 802 803 def scroll_up(self, lines=1): 804 """Move the selected log index up by one or more lines.""" 805 self.scroll(-1 * lines) 806 807 def log_start_end_indexes_changed(self) -> bool: 808 return ( 809 self._last_start_index != self._current_start_index 810 or self._last_end_index != self._current_end_index 811 ) 812 813 def render_table_header(self): 814 """Get pre-formatted table header.""" 815 return self.log_store.render_table_header() 816 817 def get_web_socket_url(self): 818 return f'http://127.0.0.1:3000/#ws={self.websocket_port}' 819 820 def render_content(self) -> list: 821 """Return logs to display on screen as a list of FormattedText tuples. 822 823 This function determines when the log screen requires re-rendeing based 824 on user scroll events, follow mode being on, or log pane being 825 empty. The FormattedText tuples passed to prompt_toolkit are cached if 826 no updates are required. 827 """ 828 screen_update_needed = False 829 830 # Disable rendering if user is viewing logs on web 831 if self.websocket_running: 832 return [] 833 834 # Check window size 835 if self.log_pane.pane_resized(): 836 self._window_width = self.log_pane.current_log_pane_width 837 self._window_height = self.log_pane.current_log_pane_height 838 self.log_screen.resize(self._window_width, self._window_height) 839 self._reset_log_screen_on_next_render = True 840 841 if self.follow_event is not None: 842 if ( 843 self.follow_event == FollowEvent.SEARCH_MATCH 844 and self.last_search_matched_log 845 ): 846 self.log_index = self.last_search_matched_log 847 self.last_search_matched_log = None 848 self._reset_log_screen_on_next_render = True 849 850 elif self.follow_event == FollowEvent.STICKY_FOLLOW: 851 # Jump to the last log message 852 self.log_index = max(0, self.get_last_log_index()) 853 854 self.follow_event = None 855 screen_update_needed = True 856 857 if self._reset_log_screen_on_next_render or self.log_screen.empty(): 858 # Clear the reset flag. 859 self._reset_log_screen_on_next_render = False 860 self.log_screen.reset_logs(log_index=self.log_index) 861 screen_update_needed = True 862 863 elif self.follow and self._new_logs_since_last_render: 864 # Follow mode is on so add new logs to the screen 865 self._new_logs_since_last_render = False 866 867 current_log_index = self.log_index 868 last_rendered_log_index = self.log_screen.last_appended_log_index 869 # If so many logs have arrived than can fit on the screen, redraw 870 # the whole screen from the new position. 871 if ( 872 current_log_index - last_rendered_log_index 873 ) > self.log_screen.height: 874 self.log_screen.reset_logs(log_index=self.log_index) 875 # A small amount of logs have arrived, append them one at a time 876 # without redrawing the whole screen. 877 else: 878 for i in range( 879 last_rendered_log_index + 1, current_log_index + 1 880 ): 881 self.log_screen.append_log(i) 882 883 screen_update_needed = True 884 885 if self.follow: 886 # Select the last line for follow mode. 887 self.log_screen.move_cursor_to_bottom() 888 screen_update_needed = True 889 890 if self._user_scroll_event: 891 self._user_scroll_event = False 892 screen_update_needed = True 893 894 if screen_update_needed: 895 self._line_fragment_cache = self.log_screen.get_lines( 896 marked_logs_start=self.marked_logs_start, 897 marked_logs_end=self.marked_logs_end, 898 ) 899 return self._line_fragment_cache 900 901 def _logs_to_text( 902 self, 903 use_table_formatting: bool = True, 904 selected_lines_only: bool = False, 905 ) -> str: 906 """Convert all or selected log messages to plaintext.""" 907 908 def get_table_string(log: LogLine) -> str: 909 return remove_formatting(self.log_store.table.formatted_row(log)) 910 911 formatter: Callable[[LogLine], str] = operator.attrgetter( 912 'ansi_stripped_log' 913 ) 914 if use_table_formatting: 915 formatter = get_table_string 916 917 _start_log_index, log_source = self._get_log_lines() 918 919 log_index_range = range( 920 self._scrollback_start_index, self.get_total_count() 921 ) 922 if ( 923 selected_lines_only 924 and self.marked_logs_start is not None 925 and self.marked_logs_end is not None 926 ): 927 log_index_range = range( 928 self.marked_logs_start, self.marked_logs_end + 1 929 ) 930 931 text_output = '' 932 for i in log_index_range: 933 log_text = formatter(log_source[i]) 934 text_output += log_text 935 if not log_text.endswith('\n'): 936 text_output += '\n' 937 938 return text_output 939 940 def export_logs( 941 self, 942 use_table_formatting: bool = True, 943 selected_lines_only: bool = False, 944 file_name: str | None = None, 945 to_clipboard: bool = False, 946 add_markdown_fence: bool = False, 947 ) -> bool: 948 """Export log lines to file or clipboard.""" 949 text_output = self._logs_to_text( 950 use_table_formatting, selected_lines_only 951 ) 952 953 if file_name: 954 target_path = Path(file_name).expanduser() 955 with target_path.open('w') as output_file: 956 output_file.write(text_output) 957 _LOG.debug('Saved to file: %s', file_name) 958 959 elif to_clipboard: 960 if add_markdown_fence: 961 text_output = '```\n' + text_output + '```\n' 962 self.log_pane.application.set_system_clipboard(text_output) 963 _LOG.debug('Copied logs to clipboard.') 964 965 return True 966