xref: /aosp_15_r20/external/pigweed/pw_console/py/log_view_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tests for pw_console.log_view"""
15
16import logging
17import time
18import sys
19import unittest
20from datetime import datetime
21from unittest.mock import MagicMock, patch
22from parameterized import parameterized  # type: ignore
23from prompt_toolkit.data_structures import Point
24
25from pw_console.console_prefs import ConsolePrefs
26from pw_console.log_view import LogView
27from pw_console.log_screen import ScreenLine
28from pw_console.text_formatting import (
29    flatten_formatted_text_tuples,
30    join_adjacent_style_tuples,
31)
32
33_PYTHON_3_8 = sys.version_info >= (
34    3,
35    8,
36)
37
38
39def _create_log_view():
40    log_pane = MagicMock()
41    log_pane.pane_resized = MagicMock(return_value=True)
42    log_pane.current_log_pane_width = 80
43    log_pane.current_log_pane_height = 10
44
45    application = MagicMock()
46    application.prefs = ConsolePrefs(
47        project_file=False, project_user_file=False, user_file=False
48    )
49    application.prefs.reset_config()
50    log_view = LogView(log_pane, application)
51    return log_view, log_pane
52
53
54class TestLogView(unittest.TestCase):
55    """Tests for LogView."""
56
57    maxDiff = None
58
59    def _create_log_view_with_logs(self, log_count=100):
60        log_view, log_pane = _create_log_view()
61
62        if log_count > 0:
63            test_log = logging.getLogger('log_view.test')
64            with self.assertLogs(test_log, level='DEBUG') as _log_context:
65                test_log.addHandler(log_view.log_store)
66                for i in range(log_count):
67                    test_log.debug('Test log %s', i)
68
69        return log_view, log_pane
70
71    def test_follow_toggle(self) -> None:
72        log_view, _pane = _create_log_view()
73        self.assertTrue(log_view.follow)
74        log_view.toggle_follow()
75        self.assertFalse(log_view.follow)
76
77    def test_follow_scrolls_to_bottom(self) -> None:
78        log_view, _pane = _create_log_view()
79        log_view.toggle_follow()
80        _fragments = log_view.render_content()
81        self.assertFalse(log_view.follow)
82        self.assertEqual(log_view.get_current_line(), 0)
83
84        test_log = logging.getLogger('log_view.test')
85
86        # Log 5 messagse, current_line should stay at 0
87        with self.assertLogs(test_log, level='DEBUG') as _log_context:
88            test_log.addHandler(log_view.log_store)
89            for i in range(5):
90                test_log.debug('Test log %s', i)
91        _fragments = log_view.render_content()
92
93        self.assertEqual(log_view.get_total_count(), 5)
94        self.assertEqual(log_view.get_current_line(), 0)
95        # Turn follow on
96        log_view.toggle_follow()
97        self.assertTrue(log_view.follow)
98
99        # Log another messagse, current_line should move to the last.
100        with self.assertLogs(test_log, level='DEBUG') as _log_context:
101            test_log.addHandler(log_view.log_store)
102            test_log.debug('Test log')
103        _fragments = log_view.render_content()
104
105        self.assertEqual(log_view.get_total_count(), 6)
106        self.assertEqual(log_view.get_current_line(), 5)
107
108    def test_scrolling(self) -> None:
109        """Test all scrolling methods."""
110        log_view, log_pane = self._create_log_view_with_logs(log_count=100)
111
112        # Page scrolling needs to know the current window height.
113        log_pane.pane_resized = MagicMock(return_value=True)
114        log_pane.current_log_pane_width = 80
115        log_pane.current_log_pane_height = 10
116
117        log_view.render_content()
118
119        # Follow is on by default, current line should be at the end.
120        self.assertEqual(log_view.get_current_line(), 99)
121
122        # Move to the beginning.
123        log_view.scroll_to_top()
124        log_view.render_content()
125        self.assertEqual(log_view.get_current_line(), 0)
126
127        # Should not be able to scroll before the beginning.
128        log_view.scroll_up()
129        log_view.render_content()
130        self.assertEqual(log_view.get_current_line(), 0)
131        log_view.scroll_up_one_page()
132        log_view.render_content()
133        self.assertEqual(log_view.get_current_line(), 0)
134
135        # Single and multi line movement.
136        log_view.scroll_down()
137        log_view.render_content()
138        self.assertEqual(log_view.get_current_line(), 1)
139        log_view.scroll(5)
140        log_view.render_content()
141        self.assertEqual(log_view.get_current_line(), 6)
142        log_view.scroll_up()
143        log_view.render_content()
144        self.assertEqual(log_view.get_current_line(), 5)
145
146        # Page down and up.
147        log_view.scroll_down_one_page()
148        self.assertEqual(log_view.get_current_line(), 15)
149
150        log_view.scroll_up_one_page()
151        self.assertEqual(log_view.get_current_line(), 5)
152
153        # Move to the end.
154        log_view.scroll_to_bottom()
155        log_view.render_content()
156        self.assertEqual(log_view.get_current_line(), 99)
157
158        # Should not be able to scroll beyond the end.
159        log_view.scroll_down()
160        log_view.render_content()
161        self.assertEqual(log_view.get_current_line(), 99)
162        log_view.scroll_down_one_page()
163        log_view.render_content()
164        self.assertEqual(log_view.get_current_line(), 99)
165
166        # Move up a bit to turn off follow
167        self.assertEqual(log_view.log_screen.cursor_position, 9)
168        log_view.scroll(-1)
169        self.assertEqual(log_view.log_screen.cursor_position, 8)
170        log_view.render_content()
171        self.assertEqual(log_view.get_current_line(), 98)
172
173        # Simulate a mouse click to scroll.
174        # Click 1 lines from the top of the window.
175        log_view.scroll_to_position(Point(0, 1))
176        log_view.render_content()
177        self.assertEqual(log_view.get_current_line(), 90)
178
179        # Disable follow mode if mouse click on line.
180        log_view.toggle_follow()
181        log_view.render_content()
182        self.assertTrue(log_view.follow)
183        self.assertEqual(log_view.get_current_line(), 99)
184        log_view.scroll_to_position(Point(0, 5))
185        log_view.render_content()
186        self.assertEqual(log_view.get_current_line(), 95)
187        self.assertFalse(log_view.follow)
188
189    def test_render_content_and_cursor_position(self) -> None:
190        """Test render_content results and get_cursor_position
191
192        get_cursor_position() should return the correct position depending on
193        what line is selected."""
194
195        # Mock time to always return the same value.
196        mock_time = MagicMock(  # type: ignore
197            return_value=time.mktime(datetime(2021, 7, 13, 0, 0, 0).timetuple())
198        )
199        with patch('time.time', new=mock_time):
200            log_view, log_pane = self._create_log_view_with_logs(log_count=4)
201
202        # Mock needed LogPane functions that pull info from prompt_toolkit.
203        log_pane.get_horizontal_scroll_amount = MagicMock(return_value=0)
204        log_pane.current_log_pane_width = 80
205        log_pane.current_log_pane_height = 10
206
207        log_view.render_content()
208        log_view.scroll_to_top()
209        log_view.render_content()
210        # Scroll to top keeps the cursor on the bottom of the window.
211        self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=9))
212
213        log_view.scroll_to_bottom()
214        log_view.render_content()
215        self.assertEqual(log_view.get_cursor_position(), Point(x=0, y=9))
216
217        expected_formatted_text = [
218            ('', ''),
219            ('class:log-time', '20210713 00:00:00'),
220            ('', '  '),
221            ('class:log-level-10', 'DEBUG'),
222            ('', '  Test log 0'),
223            ('class:log-time', '20210713 00:00:00'),
224            ('', '  '),
225            ('class:log-level-10', 'DEBUG'),
226            ('', '  Test log 1'),
227            ('class:log-time', '20210713 00:00:00'),
228            ('', '  '),
229            ('class:log-level-10', 'DEBUG'),
230            ('', '  Test log 2'),
231            ('class:selected-log-line class:log-time', '20210713 00:00:00'),
232            ('class:selected-log-line ', '  '),
233            ('class:selected-log-line class:log-level-10', 'DEBUG'),
234            (
235                'class:selected-log-line ',
236                '  Test log 3                                             ',
237            ),
238        ]
239
240        # pylint: disable=protected-access
241        result_text = join_adjacent_style_tuples(
242            flatten_formatted_text_tuples(log_view._line_fragment_cache)
243        )
244        # pylint: enable=protected-access
245
246        self.assertEqual(result_text, expected_formatted_text)
247
248    def test_clear_scrollback(self) -> None:
249        """Test various functions with clearing log scrollback history."""
250        # pylint: disable=protected-access
251        # Create log_view with 4 logs
252        starting_log_count = 4
253        log_view, _pane = self._create_log_view_with_logs(
254            log_count=starting_log_count
255        )
256        log_view.render_content()
257
258        # Check setup is correct
259        self.assertTrue(log_view.follow)
260        self.assertEqual(log_view.get_current_line(), 3)
261        self.assertEqual(log_view.get_total_count(), 4)
262        self.assertEqual(
263            list(
264                log.record.message for log in log_view._get_visible_log_lines()
265            ),
266            ['Test log 0', 'Test log 1', 'Test log 2', 'Test log 3'],
267        )
268
269        # Clear scrollback
270        log_view.clear_scrollback()
271        log_view.render_content()
272        # Follow is still on
273        self.assertTrue(log_view.follow)
274        self.assertEqual(log_view.hidden_line_count(), 4)
275        # Current line index should stay the same
276        self.assertEqual(log_view.get_current_line(), 3)
277        # Total count should stay the same
278        self.assertEqual(log_view.get_total_count(), 4)
279        # No lines returned
280        self.assertEqual(
281            list(
282                log.record.message for log in log_view._get_visible_log_lines()
283            ),
284            [],
285        )
286
287        # Add Log 4 more lines
288        test_log = logging.getLogger('log_view.test')
289        with self.assertLogs(test_log, level='DEBUG') as _log_context:
290            test_log.addHandler(log_view.log_store)
291            for i in range(4):
292                test_log.debug('Test log %s', i + starting_log_count)
293        log_view.render_content()
294
295        # Current line
296        self.assertEqual(log_view.hidden_line_count(), 4)
297        self.assertEqual(log_view.get_last_log_index(), 7)
298        self.assertEqual(log_view.get_current_line(), 7)
299        self.assertEqual(log_view.get_total_count(), 8)
300        # Only the last 4 logs should appear
301        self.assertEqual(
302            list(
303                log.record.message for log in log_view._get_visible_log_lines()
304            ),
305            ['Test log 4', 'Test log 5', 'Test log 6', 'Test log 7'],
306        )
307
308        log_view.scroll_to_bottom()
309        log_view.render_content()
310        self.assertEqual(log_view.get_current_line(), 7)
311        # Turn follow back on
312        log_view.toggle_follow()
313
314        log_view.undo_clear_scrollback()
315        # Current line and total are the same
316        self.assertEqual(log_view.get_current_line(), 7)
317        self.assertEqual(log_view.get_total_count(), 8)
318        # All logs should appear
319        self.assertEqual(
320            list(
321                log.record.message for log in log_view._get_visible_log_lines()
322            ),
323            [
324                'Test log 0',
325                'Test log 1',
326                'Test log 2',
327                'Test log 3',
328                'Test log 4',
329                'Test log 5',
330                'Test log 6',
331                'Test log 7',
332            ],
333        )
334
335        log_view.scroll_to_bottom()
336        log_view.render_content()
337        self.assertEqual(log_view.get_current_line(), 7)
338
339    def test_get_line_at_cursor_position(self) -> None:
340        """Tests fuctions that rely on getting a log_index for the current
341        cursor position.
342
343        Including:
344        - LogScreen.fetch_subline_up
345        - LogScreen.fetch_subline_down
346        - LogView._update_log_index
347        """
348        # pylint: disable=protected-access
349        # Create log_view with 4 logs
350        starting_log_count = 4
351        log_view, _pane = self._create_log_view_with_logs(
352            log_count=starting_log_count
353        )
354        log_view.render_content()
355
356        # Check setup is correct
357        self.assertTrue(log_view.follow)
358        self.assertEqual(log_view.get_current_line(), 3)
359        self.assertEqual(log_view.get_total_count(), 4)
360        self.assertEqual(
361            list(
362                log.record.message for log in log_view._get_visible_log_lines()
363            ),
364            ['Test log 0', 'Test log 1', 'Test log 2', 'Test log 3'],
365        )
366
367        self.assertEqual(log_view.log_screen.cursor_position, 9)
368        # Force the cursor_position to be larger than the log_screen
369        # line_buffer.
370        log_view.log_screen.cursor_position = 10
371        # Attempt to get the current line, no exception should be raised
372        result = log_view.log_screen.get_line_at_cursor_position()
373        # Log index should be None
374        self.assertEqual(result.log_index, None)
375
376        # Force the cursor_position to be < 0. This won't produce an error but
377        # would wrap around to the beginning.
378        log_view.log_screen.cursor_position = -1
379        # Attempt to get the current line, no exception should be raised
380        result = log_view.log_screen.get_line_at_cursor_position()
381        # Result should be a blank line
382        self.assertEqual(result, ScreenLine([('', '')]))
383        # Log index should be None
384        self.assertEqual(result.log_index, None)
385
386    def test_visual_select(self) -> None:
387        """Test log line selection."""
388        log_view, log_pane = self._create_log_view_with_logs(log_count=100)
389        self.assertEqual(100, log_view.get_total_count())
390
391        # Page scrolling needs to know the current window height.
392        log_pane.pane_resized = MagicMock(return_value=True)
393        log_pane.current_log_pane_width = 80
394        log_pane.current_log_pane_height = 10
395
396        log_view.log_screen.reset_logs = MagicMock(
397            wraps=log_view.log_screen.reset_logs
398        )
399        log_view.log_screen.get_lines = MagicMock(
400            wraps=log_view.log_screen.get_lines
401        )
402
403        log_view.render_content()
404        log_view.log_screen.reset_logs.assert_called_once()
405        log_view.log_screen.get_lines.assert_called_once_with(
406            marked_logs_start=None, marked_logs_end=None
407        )
408        log_view.log_screen.get_lines.reset_mock()
409        log_view.log_screen.reset_logs.reset_mock()
410
411        self.assertIsNone(log_view.marked_logs_start)
412        self.assertIsNone(log_view.marked_logs_end)
413        log_view.visual_select_line(Point(0, 9))
414        self.assertEqual(
415            (99, 99), (log_view.marked_logs_start, log_view.marked_logs_end)
416        )
417
418        log_view.visual_select_line(Point(0, 8))
419        log_view.visual_select_line(Point(0, 7))
420        self.assertEqual(
421            (97, 99), (log_view.marked_logs_start, log_view.marked_logs_end)
422        )
423
424        log_view.clear_visual_selection()
425        self.assertIsNone(log_view.marked_logs_start)
426        self.assertIsNone(log_view.marked_logs_end)
427
428        log_view.visual_select_line(Point(0, 1))
429        log_view.visual_select_line(Point(0, 2))
430        log_view.visual_select_line(Point(0, 3))
431        log_view.visual_select_line(Point(0, 4))
432        self.assertEqual(
433            (91, 94), (log_view.marked_logs_start, log_view.marked_logs_end)
434        )
435
436        # Make sure the log screen was not re-generated.
437        log_view.log_screen.reset_logs.assert_not_called()
438        log_view.log_screen.reset_logs.reset_mock()
439
440        # Render the screen
441        log_view.render_content()
442        log_view.log_screen.reset_logs.assert_called_once()
443        # Check the visual selection was specified
444        log_view.log_screen.get_lines.assert_called_once_with(
445            marked_logs_start=91, marked_logs_end=94
446        )
447        log_view.log_screen.get_lines.reset_mock()
448        log_view.log_screen.reset_logs.reset_mock()
449
450
451if _PYTHON_3_8:
452    from unittest import IsolatedAsyncioTestCase  # type: ignore # pylint: disable=no-name-in-module
453
454    class TestLogViewFiltering(
455        IsolatedAsyncioTestCase
456    ):  # pylint: disable=undefined-variable
457        """Test LogView log filtering capabilities."""
458
459        maxDiff = None
460
461        def _create_log_view_from_list(self, log_messages):
462            log_view, log_pane = _create_log_view()
463
464            test_log = logging.getLogger('log_view.test')
465            with self.assertLogs(test_log, level='DEBUG') as _log_context:
466                test_log.addHandler(log_view.log_store)
467                for log, extra_arg in log_messages:
468                    test_log.debug('%s', log, extra=extra_arg)
469
470            return log_view, log_pane
471
472        @parameterized.expand(
473            [
474                (
475                    # Test name
476                    'regex filter',
477                    # Search input_text
478                    'log.*item',
479                    # input_logs
480                    [
481                        ('Log some item', dict()),
482                        ('Log another item', dict()),
483                        ('Some exception', dict()),
484                    ],
485                    # expected_matched_lines
486                    [
487                        'Log some item',
488                        'Log another item',
489                    ],
490                    # expected_match_line_numbers
491                    {0: 0, 1: 1},
492                    # expected_export_text
493                    ('  DEBUG  Log some item\n  DEBUG  Log another item\n'),
494                    None,  # field
495                    False,  # invert
496                ),
497                (
498                    # Test name
499                    'regex filter with field',
500                    # Search input_text
501                    'earth',
502                    # input_logs
503                    [
504                        (
505                            'Log some item',
506                            dict(extra_metadata_fields={'planet': 'Jupiter'}),
507                        ),
508                        (
509                            'Log another item',
510                            dict(extra_metadata_fields={'planet': 'Earth'}),
511                        ),
512                        (
513                            'Some exception',
514                            dict(extra_metadata_fields={'planet': 'Earth'}),
515                        ),
516                    ],
517                    # expected_matched_lines
518                    [
519                        'Log another item',
520                        'Some exception',
521                    ],
522                    # expected_match_line_numbers
523                    {1: 0, 2: 1},
524                    # expected_export_text
525                    (
526                        '  DEBUG  Earth    Log another item\n'
527                        '  DEBUG  Earth    Some exception\n'
528                    ),
529                    'planet',  # field
530                    False,  # invert
531                ),
532                (
533                    # Test name
534                    'regex filter with field inverted',
535                    # Search input_text
536                    'earth',
537                    # input_logs
538                    [
539                        (
540                            'Log some item',
541                            dict(extra_metadata_fields={'planet': 'Jupiter'}),
542                        ),
543                        (
544                            'Log another item',
545                            dict(extra_metadata_fields={'planet': 'Earth'}),
546                        ),
547                        (
548                            'Some exception',
549                            dict(extra_metadata_fields={'planet': 'Earth'}),
550                        ),
551                    ],
552                    # expected_matched_lines
553                    [
554                        'Log some item',
555                    ],
556                    # expected_match_line_numbers
557                    {0: 0},
558                    # expected_export_text
559                    ('  DEBUG  Jupiter  Log some item\n'),
560                    'planet',  # field
561                    True,  # invert
562                ),
563            ]
564        )
565        async def test_log_filtering(
566            self,
567            _test_name,
568            input_text,
569            input_logs,
570            expected_matched_lines,
571            expected_match_line_numbers,
572            expected_export_text,
573            field=None,
574            invert=False,
575        ) -> None:
576            """Test run log view filtering."""
577            log_view, _log_pane = self._create_log_view_from_list(input_logs)
578            log_view.render_content()
579
580            self.assertEqual(log_view.get_total_count(), len(input_logs))
581            # Apply the search and wait for the match count background task
582            log_view.new_search(input_text, invert=invert, field=field)
583            await log_view.search_match_count_task
584            self.assertEqual(
585                log_view.search_matched_lines, expected_match_line_numbers
586            )
587
588            # Apply the filter and wait for the filter background task
589            log_view.apply_filter()
590            await log_view.filter_existing_logs_task
591
592            # Do the number of logs match the expected count?
593            self.assertEqual(
594                log_view.get_total_count(), len(expected_matched_lines)
595            )
596            self.assertEqual(
597                [log.record.message for log in log_view.filtered_logs],
598                expected_matched_lines,
599            )
600
601            # Check exported text respects filtering
602            log_text = (
603                log_view._logs_to_text(  # pylint: disable=protected-access
604                    use_table_formatting=True
605                )
606            )
607            # Remove leading time from resulting logs
608            log_text_no_datetime = ''
609            for line in log_text.splitlines():
610                log_text_no_datetime += line[17:] + '\n'
611            self.assertEqual(log_text_no_datetime, expected_export_text)
612
613            # Select the bottom log line
614            log_view.render_content()
615            log_view.visual_select_line(Point(0, 9))  # Window height is 10
616            # Export to text
617            log_text = (
618                log_view._logs_to_text(  # pylint: disable=protected-access
619                    selected_lines_only=True, use_table_formatting=False
620                )
621            )
622            self.assertEqual(
623                # Remove date, time, and level
624                log_text[24:].strip(),
625                expected_matched_lines[0].strip(),
626            )
627
628            # Clear filters and check the numbe of lines is back to normal.
629            log_view.clear_filters()
630            self.assertEqual(log_view.get_total_count(), len(input_logs))
631
632
633if __name__ == '__main__':
634    unittest.main()
635