xref: /aosp_15_r20/external/pigweed/pw_console/py/log_store_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tests for pw_console.log_store"""
15
16import logging
17import unittest
18from unittest.mock import MagicMock
19
20from pw_console.log_store import LogStore
21from pw_console.console_prefs import ConsolePrefs
22
23
24def _create_log_store():
25    log_store = LogStore(
26        prefs=ConsolePrefs(
27            project_file=False, project_user_file=False, user_file=False
28        )
29    )
30
31    assert not log_store.table.prefs.show_python_file
32    viewer = MagicMock()
33    viewer.new_logs_arrived = MagicMock()
34    log_store.register_viewer(viewer)
35    return log_store, viewer
36
37
38class TestLogStore(unittest.TestCase):
39    """Tests for LogStore."""
40
41    maxDiff = None
42
43    def test_get_total_count(self) -> None:
44        log_store, viewer = _create_log_store()
45        test_log = logging.getLogger('log_store.test')
46        # Must use the assertLogs context manager and the addHandler call.
47        with self.assertLogs(test_log, level='DEBUG') as log_context:
48            test_log.addHandler(log_store)
49            for i in range(5):
50                test_log.debug('Test log %s', i)
51
52        # Expected log message content
53        self.assertEqual(
54            ['DEBUG:log_store.test:Test log {}'.format(i) for i in range(5)],
55            log_context.output,
56        )
57        # LogStore state checks
58        viewer.new_logs_arrived.assert_called()
59        self.assertEqual(5, log_store.get_total_count())
60        self.assertEqual(4, log_store.get_last_log_index())
61
62        log_store.clear_logs()
63        self.assertEqual(0, log_store.get_total_count())
64
65    def test_channel_counts_and_prefix_width(self) -> None:
66        """Test logger names and prefix width calculations."""
67        log_store, _viewer = _create_log_store()
68
69        # Log some messagse on 3 separate logger instances
70        for i, logger_name in enumerate(
71            [
72                'log_store.test',
73                'log_store.dev',
74                'log_store.production',
75            ]
76        ):
77            test_log = logging.getLogger(logger_name)
78            with self.assertLogs(test_log, level='DEBUG') as _log_context:
79                test_log.addHandler(log_store)
80                test_log.debug('Test log message')
81                for j in range(i):
82                    test_log.debug('%s', j)
83
84        self.assertEqual(
85            {
86                'log_store.test': 1,
87                'log_store.dev': 2,
88                'log_store.production': 3,
89            },
90            log_store.channel_counts,
91        )
92        self.assertEqual(
93            'log_store.test: 1, log_store.dev: 2, log_store.production: 3',
94            log_store.get_channel_counts(),
95        )
96
97        self.assertRegex(
98            log_store.logs[0].ansi_stripped_log,
99            r'[0-9]{8} [0-9]{2}:[0-9]{2}:[0-9]{2} DEBUG Test log message',
100        )
101        self.assertEqual(
102            {
103                'log_store.test': 0,
104                'log_store.dev': 0,
105                'log_store.production': 0,
106            },
107            log_store.channel_formatted_prefix_widths,
108        )
109
110    def test_render_table_header_with_metadata(self) -> None:
111        log_store, _viewer = _create_log_store()
112        test_log = logging.getLogger('log_store.test')
113
114        # Log table with extra columns
115        with self.assertLogs(test_log, level='DEBUG') as _log_context:
116            test_log.addHandler(log_store)
117            test_log.debug(
118                'Test log %s',
119                extra=dict(
120                    extra_metadata_fields={
121                        'planet': 'Jupiter',
122                        'galaxy': 'Milky Way',
123                    }
124                ),
125            )
126
127        self.assertEqual(
128            [
129                ('bold', 'Time             '),
130                ('', '  '),
131                ('bold', 'Level'),
132                ('', '  '),
133                ('bold', 'Planet '),
134                ('', '  '),
135                ('bold', 'Galaxy   '),
136                ('', '  '),
137                ('bold', 'Message'),
138            ],
139            log_store.render_table_header(),
140        )
141
142
143if __name__ == '__main__':
144    unittest.main()
145