1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tests for pw_console.log_view""" 15 16import logging 17import time 18import sys 19import unittest 20from datetime import datetime 21from unittest.mock import MagicMock, patch 22from parameterized import parameterized # type: ignore 23from prompt_toolkit.data_structures import Point 24 25from pw_console.console_prefs import ConsolePrefs 26from pw_console.log_view import LogView 27from pw_console.log_screen import ScreenLine 28from pw_console.text_formatting import ( 29 flatten_formatted_text_tuples, 30 join_adjacent_style_tuples, 31) 32 33_PYTHON_3_8 = sys.version_info >= ( 34 3, 35 8, 36) 37 38 39def _create_log_view(): 40 log_pane = MagicMock() 41 log_pane.pane_resized = MagicMock(return_value=True) 42 log_pane.current_log_pane_width = 80 43 log_pane.current_log_pane_height = 10 44 45 application = MagicMock() 46 application.prefs = ConsolePrefs( 47 project_file=False, project_user_file=False, user_file=False 48 ) 49 application.prefs.reset_config() 50 log_view = LogView(log_pane, application) 51 return log_view, log_pane 52 53 54class TestLogView(unittest.TestCase): 55 """Tests for LogView.""" 56 57 maxDiff = None 58 59 def _create_log_view_with_logs(self, log_count=100): 60 log_view, log_pane = _create_log_view() 61 62 if log_count > 0: 63 test_log = logging.getLogger('log_view.test') 64 with self.assertLogs(test_log, level='DEBUG') as _log_context: 65 test_log.addHandler(log_view.log_store) 66 for i in range(log_count): 67 test_log.debug('Test log %s', i) 68 69 return log_view, log_pane 70 71 def test_follow_toggle(self) -> None: 72 log_view, _pane = _create_log_view() 73 self.assertTrue(log_view.follow) 74 log_view.toggle_follow() 75 self.assertFalse(log_view.follow) 76 77 def test_follow_scrolls_to_bottom(self) -> None: 78 log_view, _pane = _create_log_view() 79 log_view.toggle_follow() 80 _fragments = log_view.render_content() 81 self.assertFalse(log_view.follow) 82 self.assertEqual(log_view.get_current_line(), 0) 83 84 test_log = logging.getLogger('log_view.test') 85 86 # Log 5 messagse, current_line should stay at 0 87 with self.assertLogs(test_log, level='DEBUG') as _log_context: 88 test_log.addHandler(log_view.log_store) 89 for i in range(5): 90 test_log.debug('Test log %s', i) 91 _fragments = log_view.render_content() 92 93 self.assertEqual(log_view.get_total_count(), 5) 94 self.assertEqual(log_view.get_current_line(), 0) 95 # Turn follow on 96 log_view.toggle_follow() 97 self.assertTrue(log_view.follow) 98 99 # Log another messagse, current_line should move to the last. 100 with self.assertLogs(test_log, level='DEBUG') as _log_context: 101 test_log.addHandler(log_view.log_store) 102 test_log.debug('Test log') 103 _fragments = log_view.render_content() 104 105 self.assertEqual(log_view.get_total_count(), 6) 106 self.assertEqual(log_view.get_current_line(), 5) 107 108 def test_scrolling(self) -> None: 109 """Test all scrolling methods.""" 110 log_view, log_pane = self._create_log_view_with_logs(log_count=100) 111 112 # Page scrolling needs to know the current window height. 113 log_pane.pane_resized = MagicMock(return_value=True) 114 log_pane.current_log_pane_width = 80 115 log_pane.current_log_pane_height = 10 116 117 log_view.render_content() 118 119 # Follow is on by default, current line should be at the end. 120 self.assertEqual(log_view.get_current_line(), 99) 121 122 # Move to the beginning. 123 log_view.scroll_to_top() 124 log_view.render_content() 125 self.assertEqual(log_view.get_current_line(), 0) 126 127 # Should not be able to scroll before the beginning. 128 log_view.scroll_up() 129 log_view.render_content() 130 self.assertEqual(log_view.get_current_line(), 0) 131 log_view.scroll_up_one_page() 132 log_view.render_content() 133 self.assertEqual(log_view.get_current_line(), 0) 134 135 # Single and multi line movement. 136 log_view.scroll_down() 137 log_view.render_content() 138 self.assertEqual(log_view.get_current_line(), 1) 139 log_view.scroll(5) 140 log_view.render_content() 141 self.assertEqual(log_view.get_current_line(), 6) 142 log_view.scroll_up() 143 log_view.render_content() 144 self.assertEqual(log_view.get_current_line(), 5) 145 146 # Page down and up. 147 log_view.scroll_down_one_page() 148 self.assertEqual(log_view.get_current_line(), 15) 149 150 log_view.scroll_up_one_page() 151 self.assertEqual(log_view.get_current_line(), 5) 152 153 # Move to the end. 154 log_view.scroll_to_bottom() 155 log_view.render_content() 156 self.assertEqual(log_view.get_current_line(), 99) 157 158 # Should not be able to scroll beyond the end. 159 log_view.scroll_down() 160 log_view.render_content() 161 self.assertEqual(log_view.get_current_line(), 99) 162 log_view.scroll_down_one_page() 163 log_view.render_content() 164 self.assertEqual(log_view.get_current_line(), 99) 165 166 # Move up a bit to turn off follow 167 self.assertEqual(log_view.log_screen.cursor_position, 9) 168 log_view.scroll(-1) 169 self.assertEqual(log_view.log_screen.cursor_position, 8) 170 log_view.render_content() 171 self.assertEqual(log_view.get_current_line(), 98) 172 173 # Simulate a mouse click to scroll. 174 # Click 1 lines from the top of the window. 175 log_view.scroll_to_position(Point(0, 1)) 176 log_view.render_content() 177 self.assertEqual(log_view.get_current_line(), 90) 178 179 # Disable follow mode if mouse click on line. 180 log_view.toggle_follow() 181 log_view.render_content() 182 self.assertTrue(log_view.follow) 183 self.assertEqual(log_view.get_current_line(), 99) 184 log_view.scroll_to_position(Point(0, 5)) 185 log_view.render_content() 186 self.assertEqual(log_view.get_current_line(), 95) 187 self.assertFalse(log_view.follow) 188 189 def test_render_content_and_cursor_position(self) -> None: 190 """Test render_content results and get_cursor_position 191 192 get_cursor_position() should return the correct position depending on 193 what line is selected.""" 194 195 # Mock time to always return the same value. 196 mock_time = MagicMock( # type: ignore 197 return_value=time.mktime(datetime(2021, 7, 13, 0, 0, 0).timetuple()) 198 ) 199 with patch('time.time', new=mock_time): 200 log_view, log_pane = self._create_log_view_with_logs(log_count=4) 201 202 # Mock needed LogPane functions that pull info from prompt_toolkit. 203 log_pane.get_horizontal_scroll_amount = MagicMock(return_value=0) 204 log_pane.current_log_pane_width = 80 205 log_pane.current_log_pane_height = 10 206 207 log_view.render_content() 208 log_view.scroll_to_top() 209 log_view.render_content() 210 # Scroll to top keeps the cursor on the bottom of the window. 211 self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=9)) 212 213 log_view.scroll_to_bottom() 214 log_view.render_content() 215 self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=9)) 216 217 expected_formatted_text = [ 218 ('', ''), 219 ('class:log-time', '20210713 00:00:00'), 220 ('', ' '), 221 ('class:log-level-10', 'DEBUG'), 222 ('', ' Test log 0'), 223 ('class:log-time', '20210713 00:00:00'), 224 ('', ' '), 225 ('class:log-level-10', 'DEBUG'), 226 ('', ' Test log 1'), 227 ('class:log-time', '20210713 00:00:00'), 228 ('', ' '), 229 ('class:log-level-10', 'DEBUG'), 230 ('', ' Test log 2'), 231 ('class:selected-log-line class:log-time', '20210713 00:00:00'), 232 ('class:selected-log-line ', ' '), 233 ('class:selected-log-line class:log-level-10', 'DEBUG'), 234 ( 235 'class:selected-log-line ', 236 ' Test log 3 ', 237 ), 238 ] 239 240 # pylint: disable=protected-access 241 result_text = join_adjacent_style_tuples( 242 flatten_formatted_text_tuples(log_view._line_fragment_cache) 243 ) 244 # pylint: enable=protected-access 245 246 self.assertEqual(result_text, expected_formatted_text) 247 248 def test_clear_scrollback(self) -> None: 249 """Test various functions with clearing log scrollback history.""" 250 # pylint: disable=protected-access 251 # Create log_view with 4 logs 252 starting_log_count = 4 253 log_view, _pane = self._create_log_view_with_logs( 254 log_count=starting_log_count 255 ) 256 log_view.render_content() 257 258 # Check setup is correct 259 self.assertTrue(log_view.follow) 260 self.assertEqual(log_view.get_current_line(), 3) 261 self.assertEqual(log_view.get_total_count(), 4) 262 self.assertEqual( 263 list( 264 log.record.message for log in log_view._get_visible_log_lines() 265 ), 266 ['Test log 0', 'Test log 1', 'Test log 2', 'Test log 3'], 267 ) 268 269 # Clear scrollback 270 log_view.clear_scrollback() 271 log_view.render_content() 272 # Follow is still on 273 self.assertTrue(log_view.follow) 274 self.assertEqual(log_view.hidden_line_count(), 4) 275 # Current line index should stay the same 276 self.assertEqual(log_view.get_current_line(), 3) 277 # Total count should stay the same 278 self.assertEqual(log_view.get_total_count(), 4) 279 # No lines returned 280 self.assertEqual( 281 list( 282 log.record.message for log in log_view._get_visible_log_lines() 283 ), 284 [], 285 ) 286 287 # Add Log 4 more lines 288 test_log = logging.getLogger('log_view.test') 289 with self.assertLogs(test_log, level='DEBUG') as _log_context: 290 test_log.addHandler(log_view.log_store) 291 for i in range(4): 292 test_log.debug('Test log %s', i + starting_log_count) 293 log_view.render_content() 294 295 # Current line 296 self.assertEqual(log_view.hidden_line_count(), 4) 297 self.assertEqual(log_view.get_last_log_index(), 7) 298 self.assertEqual(log_view.get_current_line(), 7) 299 self.assertEqual(log_view.get_total_count(), 8) 300 # Only the last 4 logs should appear 301 self.assertEqual( 302 list( 303 log.record.message for log in log_view._get_visible_log_lines() 304 ), 305 ['Test log 4', 'Test log 5', 'Test log 6', 'Test log 7'], 306 ) 307 308 log_view.scroll_to_bottom() 309 log_view.render_content() 310 self.assertEqual(log_view.get_current_line(), 7) 311 # Turn follow back on 312 log_view.toggle_follow() 313 314 log_view.undo_clear_scrollback() 315 # Current line and total are the same 316 self.assertEqual(log_view.get_current_line(), 7) 317 self.assertEqual(log_view.get_total_count(), 8) 318 # All logs should appear 319 self.assertEqual( 320 list( 321 log.record.message for log in log_view._get_visible_log_lines() 322 ), 323 [ 324 'Test log 0', 325 'Test log 1', 326 'Test log 2', 327 'Test log 3', 328 'Test log 4', 329 'Test log 5', 330 'Test log 6', 331 'Test log 7', 332 ], 333 ) 334 335 log_view.scroll_to_bottom() 336 log_view.render_content() 337 self.assertEqual(log_view.get_current_line(), 7) 338 339 def test_get_line_at_cursor_position(self) -> None: 340 """Tests fuctions that rely on getting a log_index for the current 341 cursor position. 342 343 Including: 344 - LogScreen.fetch_subline_up 345 - LogScreen.fetch_subline_down 346 - LogView._update_log_index 347 """ 348 # pylint: disable=protected-access 349 # Create log_view with 4 logs 350 starting_log_count = 4 351 log_view, _pane = self._create_log_view_with_logs( 352 log_count=starting_log_count 353 ) 354 log_view.render_content() 355 356 # Check setup is correct 357 self.assertTrue(log_view.follow) 358 self.assertEqual(log_view.get_current_line(), 3) 359 self.assertEqual(log_view.get_total_count(), 4) 360 self.assertEqual( 361 list( 362 log.record.message for log in log_view._get_visible_log_lines() 363 ), 364 ['Test log 0', 'Test log 1', 'Test log 2', 'Test log 3'], 365 ) 366 367 self.assertEqual(log_view.log_screen.cursor_position, 9) 368 # Force the cursor_position to be larger than the log_screen 369 # line_buffer. 370 log_view.log_screen.cursor_position = 10 371 # Attempt to get the current line, no exception should be raised 372 result = log_view.log_screen.get_line_at_cursor_position() 373 # Log index should be None 374 self.assertEqual(result.log_index, None) 375 376 # Force the cursor_position to be < 0. This won't produce an error but 377 # would wrap around to the beginning. 378 log_view.log_screen.cursor_position = -1 379 # Attempt to get the current line, no exception should be raised 380 result = log_view.log_screen.get_line_at_cursor_position() 381 # Result should be a blank line 382 self.assertEqual(result, ScreenLine([('', '')])) 383 # Log index should be None 384 self.assertEqual(result.log_index, None) 385 386 def test_visual_select(self) -> None: 387 """Test log line selection.""" 388 log_view, log_pane = self._create_log_view_with_logs(log_count=100) 389 self.assertEqual(100, log_view.get_total_count()) 390 391 # Page scrolling needs to know the current window height. 392 log_pane.pane_resized = MagicMock(return_value=True) 393 log_pane.current_log_pane_width = 80 394 log_pane.current_log_pane_height = 10 395 396 log_view.log_screen.reset_logs = MagicMock( 397 wraps=log_view.log_screen.reset_logs 398 ) 399 log_view.log_screen.get_lines = MagicMock( 400 wraps=log_view.log_screen.get_lines 401 ) 402 403 log_view.render_content() 404 log_view.log_screen.reset_logs.assert_called_once() 405 log_view.log_screen.get_lines.assert_called_once_with( 406 marked_logs_start=None, marked_logs_end=None 407 ) 408 log_view.log_screen.get_lines.reset_mock() 409 log_view.log_screen.reset_logs.reset_mock() 410 411 self.assertIsNone(log_view.marked_logs_start) 412 self.assertIsNone(log_view.marked_logs_end) 413 log_view.visual_select_line(Point(0, 9)) 414 self.assertEqual( 415 (99, 99), (log_view.marked_logs_start, log_view.marked_logs_end) 416 ) 417 418 log_view.visual_select_line(Point(0, 8)) 419 log_view.visual_select_line(Point(0, 7)) 420 self.assertEqual( 421 (97, 99), (log_view.marked_logs_start, log_view.marked_logs_end) 422 ) 423 424 log_view.clear_visual_selection() 425 self.assertIsNone(log_view.marked_logs_start) 426 self.assertIsNone(log_view.marked_logs_end) 427 428 log_view.visual_select_line(Point(0, 1)) 429 log_view.visual_select_line(Point(0, 2)) 430 log_view.visual_select_line(Point(0, 3)) 431 log_view.visual_select_line(Point(0, 4)) 432 self.assertEqual( 433 (91, 94), (log_view.marked_logs_start, log_view.marked_logs_end) 434 ) 435 436 # Make sure the log screen was not re-generated. 437 log_view.log_screen.reset_logs.assert_not_called() 438 log_view.log_screen.reset_logs.reset_mock() 439 440 # Render the screen 441 log_view.render_content() 442 log_view.log_screen.reset_logs.assert_called_once() 443 # Check the visual selection was specified 444 log_view.log_screen.get_lines.assert_called_once_with( 445 marked_logs_start=91, marked_logs_end=94 446 ) 447 log_view.log_screen.get_lines.reset_mock() 448 log_view.log_screen.reset_logs.reset_mock() 449 450 451if _PYTHON_3_8: 452 from unittest import IsolatedAsyncioTestCase # type: ignore # pylint: disable=no-name-in-module 453 454 class TestLogViewFiltering( 455 IsolatedAsyncioTestCase 456 ): # pylint: disable=undefined-variable 457 """Test LogView log filtering capabilities.""" 458 459 maxDiff = None 460 461 def _create_log_view_from_list(self, log_messages): 462 log_view, log_pane = _create_log_view() 463 464 test_log = logging.getLogger('log_view.test') 465 with self.assertLogs(test_log, level='DEBUG') as _log_context: 466 test_log.addHandler(log_view.log_store) 467 for log, extra_arg in log_messages: 468 test_log.debug('%s', log, extra=extra_arg) 469 470 return log_view, log_pane 471 472 @parameterized.expand( 473 [ 474 ( 475 # Test name 476 'regex filter', 477 # Search input_text 478 'log.*item', 479 # input_logs 480 [ 481 ('Log some item', dict()), 482 ('Log another item', dict()), 483 ('Some exception', dict()), 484 ], 485 # expected_matched_lines 486 [ 487 'Log some item', 488 'Log another item', 489 ], 490 # expected_match_line_numbers 491 {0: 0, 1: 1}, 492 # expected_export_text 493 (' DEBUG Log some item\n DEBUG Log another item\n'), 494 None, # field 495 False, # invert 496 ), 497 ( 498 # Test name 499 'regex filter with field', 500 # Search input_text 501 'earth', 502 # input_logs 503 [ 504 ( 505 'Log some item', 506 dict(extra_metadata_fields={'planet': 'Jupiter'}), 507 ), 508 ( 509 'Log another item', 510 dict(extra_metadata_fields={'planet': 'Earth'}), 511 ), 512 ( 513 'Some exception', 514 dict(extra_metadata_fields={'planet': 'Earth'}), 515 ), 516 ], 517 # expected_matched_lines 518 [ 519 'Log another item', 520 'Some exception', 521 ], 522 # expected_match_line_numbers 523 {1: 0, 2: 1}, 524 # expected_export_text 525 ( 526 ' DEBUG Earth Log another item\n' 527 ' DEBUG Earth Some exception\n' 528 ), 529 'planet', # field 530 False, # invert 531 ), 532 ( 533 # Test name 534 'regex filter with field inverted', 535 # Search input_text 536 'earth', 537 # input_logs 538 [ 539 ( 540 'Log some item', 541 dict(extra_metadata_fields={'planet': 'Jupiter'}), 542 ), 543 ( 544 'Log another item', 545 dict(extra_metadata_fields={'planet': 'Earth'}), 546 ), 547 ( 548 'Some exception', 549 dict(extra_metadata_fields={'planet': 'Earth'}), 550 ), 551 ], 552 # expected_matched_lines 553 [ 554 'Log some item', 555 ], 556 # expected_match_line_numbers 557 {0: 0}, 558 # expected_export_text 559 (' DEBUG Jupiter Log some item\n'), 560 'planet', # field 561 True, # invert 562 ), 563 ] 564 ) 565 async def test_log_filtering( 566 self, 567 _test_name, 568 input_text, 569 input_logs, 570 expected_matched_lines, 571 expected_match_line_numbers, 572 expected_export_text, 573 field=None, 574 invert=False, 575 ) -> None: 576 """Test run log view filtering.""" 577 log_view, _log_pane = self._create_log_view_from_list(input_logs) 578 log_view.render_content() 579 580 self.assertEqual(log_view.get_total_count(), len(input_logs)) 581 # Apply the search and wait for the match count background task 582 log_view.new_search(input_text, invert=invert, field=field) 583 await log_view.search_match_count_task 584 self.assertEqual( 585 log_view.search_matched_lines, expected_match_line_numbers 586 ) 587 588 # Apply the filter and wait for the filter background task 589 log_view.apply_filter() 590 await log_view.filter_existing_logs_task 591 592 # Do the number of logs match the expected count? 593 self.assertEqual( 594 log_view.get_total_count(), len(expected_matched_lines) 595 ) 596 self.assertEqual( 597 [log.record.message for log in log_view.filtered_logs], 598 expected_matched_lines, 599 ) 600 601 # Check exported text respects filtering 602 log_text = ( 603 log_view._logs_to_text( # pylint: disable=protected-access 604 use_table_formatting=True 605 ) 606 ) 607 # Remove leading time from resulting logs 608 log_text_no_datetime = '' 609 for line in log_text.splitlines(): 610 log_text_no_datetime += line[17:] + '\n' 611 self.assertEqual(log_text_no_datetime, expected_export_text) 612 613 # Select the bottom log line 614 log_view.render_content() 615 log_view.visual_select_line(Point(0, 9)) # Window height is 10 616 # Export to text 617 log_text = ( 618 log_view._logs_to_text( # pylint: disable=protected-access 619 selected_lines_only=True, use_table_formatting=False 620 ) 621 ) 622 self.assertEqual( 623 # Remove date, time, and level 624 log_text[24:].strip(), 625 expected_matched_lines[0].strip(), 626 ) 627 628 # Clear filters and check the numbe of lines is back to normal. 629 log_view.clear_filters() 630 self.assertEqual(log_view.get_total_count(), len(input_logs)) 631 632 633if __name__ == '__main__': 634 unittest.main() 635