1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tests for pw_console.log_store""" 15 16import logging 17import unittest 18from unittest.mock import MagicMock 19 20from pw_console.log_store import LogStore 21from pw_console.console_prefs import ConsolePrefs 22 23 24def _create_log_store(): 25 log_store = LogStore( 26 prefs=ConsolePrefs( 27 project_file=False, project_user_file=False, user_file=False 28 ) 29 ) 30 31 assert not log_store.table.prefs.show_python_file 32 viewer = MagicMock() 33 viewer.new_logs_arrived = MagicMock() 34 log_store.register_viewer(viewer) 35 return log_store, viewer 36 37 38class TestLogStore(unittest.TestCase): 39 """Tests for LogStore.""" 40 41 maxDiff = None 42 43 def test_get_total_count(self) -> None: 44 log_store, viewer = _create_log_store() 45 test_log = logging.getLogger('log_store.test') 46 # Must use the assertLogs context manager and the addHandler call. 47 with self.assertLogs(test_log, level='DEBUG') as log_context: 48 test_log.addHandler(log_store) 49 for i in range(5): 50 test_log.debug('Test log %s', i) 51 52 # Expected log message content 53 self.assertEqual( 54 ['DEBUG:log_store.test:Test log {}'.format(i) for i in range(5)], 55 log_context.output, 56 ) 57 # LogStore state checks 58 viewer.new_logs_arrived.assert_called() 59 self.assertEqual(5, log_store.get_total_count()) 60 self.assertEqual(4, log_store.get_last_log_index()) 61 62 log_store.clear_logs() 63 self.assertEqual(0, log_store.get_total_count()) 64 65 def test_channel_counts_and_prefix_width(self) -> None: 66 """Test logger names and prefix width calculations.""" 67 log_store, _viewer = _create_log_store() 68 69 # Log some messagse on 3 separate logger instances 70 for i, logger_name in enumerate( 71 [ 72 'log_store.test', 73 'log_store.dev', 74 'log_store.production', 75 ] 76 ): 77 test_log = logging.getLogger(logger_name) 78 with self.assertLogs(test_log, level='DEBUG') as _log_context: 79 test_log.addHandler(log_store) 80 test_log.debug('Test log message') 81 for j in range(i): 82 test_log.debug('%s', j) 83 84 self.assertEqual( 85 { 86 'log_store.test': 1, 87 'log_store.dev': 2, 88 'log_store.production': 3, 89 }, 90 log_store.channel_counts, 91 ) 92 self.assertEqual( 93 'log_store.test: 1, log_store.dev: 2, log_store.production: 3', 94 log_store.get_channel_counts(), 95 ) 96 97 self.assertRegex( 98 log_store.logs[0].ansi_stripped_log, 99 r'[0-9]{8} [0-9]{2}:[0-9]{2}:[0-9]{2} DEBUG Test log message', 100 ) 101 self.assertEqual( 102 { 103 'log_store.test': 0, 104 'log_store.dev': 0, 105 'log_store.production': 0, 106 }, 107 log_store.channel_formatted_prefix_widths, 108 ) 109 110 def test_render_table_header_with_metadata(self) -> None: 111 log_store, _viewer = _create_log_store() 112 test_log = logging.getLogger('log_store.test') 113 114 # Log table with extra columns 115 with self.assertLogs(test_log, level='DEBUG') as _log_context: 116 test_log.addHandler(log_store) 117 test_log.debug( 118 'Test log %s', 119 extra=dict( 120 extra_metadata_fields={ 121 'planet': 'Jupiter', 122 'galaxy': 'Milky Way', 123 } 124 ), 125 ) 126 127 self.assertEqual( 128 [ 129 ('bold', 'Time '), 130 ('', ' '), 131 ('bold', 'Level'), 132 ('', ' '), 133 ('bold', 'Planet '), 134 ('', ' '), 135 ('bold', 'Galaxy '), 136 ('', ' '), 137 ('bold', 'Message'), 138 ], 139 log_store.render_table_header(), 140 ) 141 142 143if __name__ == '__main__': 144 unittest.main() 145