# Copyright 2023 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. """Log decoder tests.""" from dataclasses import dataclass import logging from random import randint from typing import Any from unittest import TestCase, main from pw_log.log_decoder import ( Log, LogStreamDecoder, log_decoded_log, pw_status_code_to_name, timestamp_parser_ms_since_boot, timestamp_parser_ns_since_boot, ) from pw_log.proto import log_pb2 import pw_tokenizer _MESSAGE_NO_FILENAME = 'World' _MESSAGE_AND_ARGS_NO_FILENAME = f'■msg♦{_MESSAGE_NO_FILENAME}' _MESSAGE_TOKEN_NO_FILENAME = pw_tokenizer.tokens.pw_tokenizer_65599_hash( _MESSAGE_AND_ARGS_NO_FILENAME ) # Creating a database with tokenized information for the core to detokenize # tokenized log entries. _TOKEN_DATABASE = pw_tokenizer.tokens.Database( [ pw_tokenizer.tokens.TokenizedStringEntry(0x01148A48, 'total_dropped'), pw_tokenizer.tokens.TokenizedStringEntry( 0x03796798, 'min_queue_remaining' ), pw_tokenizer.tokens.TokenizedStringEntry(0x2E668CD6, 'Jello, world!'), pw_tokenizer.tokens.TokenizedStringEntry(0x329481A2, 'parser_errors'), pw_tokenizer.tokens.TokenizedStringEntry(0x7F35A9A5, 'TestName'), pw_tokenizer.tokens.TokenizedStringEntry(0xCC6D3131, 'Jello?'), pw_tokenizer.tokens.TokenizedStringEntry( 0x144C501D, '■msg♦SampleMessage■module♦MODULE■file♦file/path.cc' ), pw_tokenizer.tokens.TokenizedStringEntry(0x0000106A, 'ModuleOrMessage'), pw_tokenizer.tokens.TokenizedStringEntry( pw_tokenizer.tokens.pw_tokenizer_65599_hash( '■msg♦World■module♦wifi■file♦/path/to/file.cc' ), '■msg♦World■module♦wifi■file♦/path/to/file.cc', ), pw_tokenizer.tokens.TokenizedStringEntry( _MESSAGE_TOKEN_NO_FILENAME, _MESSAGE_AND_ARGS_NO_FILENAME ), ] ) _DETOKENIZER = pw_tokenizer.Detokenizer(_TOKEN_DATABASE) _ZERO_NS_TIMESTAMP_STR = '00:00:00.000000' def _create_log_entry_with_tokenized_fields( message: str, module: str, file: str, thread: str, line: int, level: int ) -> log_pb2.LogEntry: """Tokenizing tokenizable LogEntry fields to become a detoknized log.""" tokenized_message = pw_tokenizer.encode.encode_token_and_args( pw_tokenizer.tokens.pw_tokenizer_65599_hash(message) ) tokenized_module = pw_tokenizer.encode.encode_token_and_args( pw_tokenizer.tokens.pw_tokenizer_65599_hash(module) ) tokenized_file = pw_tokenizer.encode.encode_token_and_args( pw_tokenizer.tokens.pw_tokenizer_65599_hash(file) ) tokenized_thread = pw_tokenizer.encode.encode_token_and_args( pw_tokenizer.tokens.pw_tokenizer_65599_hash(thread) ) return log_pb2.LogEntry( message=tokenized_message, module=tokenized_module, file=tokenized_file, line_level=Log.pack_line_level(line, level), thread=tokenized_thread, ) def _create_random_log_entry() -> log_pb2.LogEntry: return log_pb2.LogEntry( message=bytes(f'message {randint(1,100)}'.encode('utf-8')), line_level=Log.pack_line_level( randint(0, 2000), randint(logging.DEBUG, logging.CRITICAL) ), file=b'main.cc', thread=bytes(f'thread {randint(1,5)}'.encode('utf-8')), ) def _create_drop_count_message_log_entry( drop_count: int, reason: str = '' ) -> log_pb2.LogEntry: log_entry = log_pb2.LogEntry(dropped=drop_count) if reason: log_entry.message = bytes(reason.encode('utf-8')) return log_entry class TestLogStreamDecoderBase(TestCase): """Base Test class for LogStreamDecoder.""" def setUp(self) -> None: """Set up logs decoder.""" def parse_pw_status(msg: str) -> str: return pw_status_code_to_name(msg) self.captured_logs: list[Log] = [] def decoded_log_handler(log: Log) -> None: self.captured_logs.append(log) self.decoder = LogStreamDecoder( decoded_log_handler=decoded_log_handler, detokenizer=_DETOKENIZER, source_name='source', timestamp_parser=timestamp_parser_ns_since_boot, message_parser=parse_pw_status, ) def _captured_logs_as_str(self) -> str: return '\n'.join(map(str, self.captured_logs)) class TestLogStreamDecoderDecodingFunctionality(TestLogStreamDecoderBase): """Tests LogStreamDecoder decoding functionality.""" def test_parse_log_entry_valid_non_tokenized(self) -> None: """Test that valid LogEntry protos are parsed correctly.""" expected_log = Log( message='Hello', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Hello', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual(result, expected_log) def test_parse_log_entry_valid_packed_message(self) -> None: """Test that valid LogEntry protos are parsed correctly.""" log_with_metadata_in_message = Log( message='World', file_and_line='/path/to/file.cc', level=logging.DEBUG, source_name=self.decoder.source_name, module_name='wifi', timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=bytes( '■msg♦World■module♦wifi■file♦/path/to/file.cc'.encode( 'utf-8' ) ), line_level=Log.pack_line_level(0, logging.DEBUG), timestamp=100, ) ) self.assertEqual(result, log_with_metadata_in_message) def test_parse_log_entry_valid_logs_drop_message(self) -> None: """Test that valid LogEntry protos are parsed correctly.""" dropped_message = Log( message='Dropped 30 logs due to buffer too small', level=logging.WARNING, source_name=self.decoder.source_name, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry(message=b'buffer too small', dropped=30) ) self.assertEqual(result, dropped_message) def test_parse_log_entry_valid_tokenized(self) -> None: """Test that tokenized LogEntry protos are parsed correctly.""" message = 'Jello, world!' module_name = 'TestName' file = 'parser_errors' thread_name = 'Jello?' line = 123 level = logging.INFO expected_log = Log( message=message, module_name=module_name, file_and_line=file + ':123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, thread_name=thread_name, ) log_entry = _create_log_entry_with_tokenized_fields( message, module_name, file, thread_name, line, level ) result = self.decoder.parse_log_entry_proto(log_entry) self.assertEqual(result, expected_log, msg='Log was not detokenized') def test_tokenized_contents_not_detokenized(self): """Test fields with tokens not in the database are not decrypted.""" # The following strings do not have tokens in the device token db. message_not_in_db = 'device is shutting down.' module_name_not_in_db = 'Battery' file_not_in_db = 'charger.cc' thread_name_not_in_db = 'BatteryStatus' line = 123 level = logging.INFO log_entry = _create_log_entry_with_tokenized_fields( message_not_in_db, module_name_not_in_db, file_not_in_db, thread_name_not_in_db, line, level, ) message = pw_tokenizer.proto.decode_optionally_tokenized( _DETOKENIZER, log_entry.message ) module = pw_tokenizer.proto.decode_optionally_tokenized( _DETOKENIZER, log_entry.module ) file = pw_tokenizer.proto.decode_optionally_tokenized( _DETOKENIZER, log_entry.file ) thread = pw_tokenizer.proto.decode_optionally_tokenized( _DETOKENIZER, log_entry.thread ) expected_log = Log( message=message, module_name=module, file_and_line=file + ':123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, thread_name=thread, ) result = self.decoder.parse_log_entry_proto(log_entry) self.assertEqual( result, expected_log, msg='Log was unexpectedly detokenized' ) def test_extracting_log_entry_fields_from_tokenized_metadata(self): """Test that tokenized metadata can be used to extract other fields.""" metadata = '■msg♦World■module♦wifi■file♦/path/to/file.cc' thread_name = 'M0Log' log_entry = log_pb2.LogEntry( message=pw_tokenizer.encode.encode_token_and_args( pw_tokenizer.tokens.pw_tokenizer_65599_hash(metadata) ), line_level=Log.pack_line_level(0, logging.DEBUG), thread=bytes(thread_name.encode('utf-8')), ) log_with_metadata_in_message = Log( message='World', file_and_line='/path/to/file.cc', level=logging.DEBUG, source_name=self.decoder.source_name, module_name='wifi', timestamp=_ZERO_NS_TIMESTAMP_STR, thread_name=thread_name, ) result = self.decoder.parse_log_entry_proto(log_entry) self.assertEqual( result, log_with_metadata_in_message, msg='Log was detokenized.' ) def test_extracting_status_argument_from_log_message(self): """Test extract status from log message.""" expected_log = Log( message='Could not start flux capacitor: PERMISSION_DENIED', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Could not start flux capacitor: pw::Status=7', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) expected_log = Log( message='Error connecting to server: UNAVAILABLE', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Error connecting to server: pw::Status=14', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_extracting_status_with_improper_spacing(self): """Test spaces before pw::Status are ignored.""" expected_log = Log( message='Error connecting to server:UNAVAILABLE', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Error connecting to server:pw::Status=14', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) expected_log = Log( message='Error connecting to server: UNAVAILABLE', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Error connecting to server: pw::Status=14', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_not_extracting_status_extra_space_before_code(self): """Test spaces after pw::Status are not allowed.""" expected_log = Log( message='Could not start flux capacitor: pw::Status= 7', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Could not start flux capacitor: pw::Status= 7', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_not_extracting_status_new_line_before_code(self): """Test new line characters after pw::Status are not allowed.""" expected_log = Log( message='Could not start flux capacitor: pw::Status=\n7', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Could not start flux capacitor: pw::Status=\n7', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_not_extracting_status_from_log_message_with_improper_format(self): """Test status not extracted from log message with incorrect format.""" expected_log = Log( message='Could not start flux capacitor: pw::Status12', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Could not start flux capacitor: pw::Status12', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_status_code_in_message_does_not_exist(self): """Test status does not exist in pw_status.""" expected_log = Log( message='Could not start flux capacitor: pw::Status=17', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Could not start flux capacitor: pw::Status=17', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_status_code_in_message_is_negative(self): """Test status code is negative.""" expected_log = Log( message='Could not start flux capacitor: pw::Status=-1', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Could not start flux capacitor: pw::Status=-1', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_status_code_is_name(self): """Test if the status code format includes the name instead.""" expected_log = Log( message='Cannot use flux capacitor: pw::Status=PERMISSION_DENIED', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=( b'Cannot use flux capacitor: pw::Status=PERMISSION_DENIED' ), file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_spelling_mistakes_with_status_keyword(self): """Test spelling mistakes with status keyword.""" expected_log = Log( message='Could not start flux capacitor: pw::Rtatus=12', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Could not start flux capacitor: pw::Rtatus=12', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_spelling_mistakes_with_status_keyword_lowercase_s(self): """Test spelling mistakes with status keyword.""" expected_log = Log( message='Could not start flux capacitor: pw::status=13', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Could not start flux capacitor: pw::status=13', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_status_code_at_beginning_of_message(self): """Test embedded status argument is found.""" expected_log = Log( message='UNAVAILABLE to connect to server.', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'pw::Status=14 to connect to server.', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_status_code_in_the_middle_of_message(self): """Test embedded status argument is found.""" expected_log = Log( message='Connection error: UNAVAILABLE connecting to server.', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=( b'Connection error: pw::Status=14 connecting to server.' ), file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_status_code_with_no_surrounding_spaces(self): """Test embedded status argument is found.""" expected_log = Log( message='Connection error:UNAVAILABLEconnecting to server.', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Connection error:pw::Status=14connecting to server.', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_multiple_status_arguments_in_log_message(self): """Test replacement of multiple status arguments into status string.""" expected_log = Log( message='Connection error: UNAVAILABLE and PERMISSION_DENIED.', file_and_line='my/path/file.cc:123', level=logging.INFO, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, ) result = self.decoder.parse_log_entry_proto( log_pb2.LogEntry( message=b'Connection error: pw::Status=14 and pw::Status=7.', file=b'my/path/file.cc', line_level=Log.pack_line_level(123, logging.INFO), ) ) self.assertEqual( result, expected_log, msg='Status was not extracted from log message.', ) def test_no_filename_in_message_parses_successfully(self): """Test that if the file name is not present the log entry is parsed.""" thread_name = 'thread' log_entry = log_pb2.LogEntry( message=pw_tokenizer.encode.encode_token_and_args( _MESSAGE_TOKEN_NO_FILENAME ), line_level=Log.pack_line_level(123, logging.DEBUG), thread=bytes(thread_name.encode('utf-8')), ) expected_log = Log( message=_MESSAGE_NO_FILENAME, file_and_line=':123', level=logging.DEBUG, source_name=self.decoder.source_name, timestamp=_ZERO_NS_TIMESTAMP_STR, thread_name=thread_name, ) result = self.decoder.parse_log_entry_proto(log_entry) self.assertEqual(result, expected_log) def test_log_decoded_log(self): """Test that the logger correctly formats a decoded log.""" test_log = Log( message="SampleMessage", level=logging.DEBUG, timestamp='1:30', module_name="MyModule", source_name="MySource", thread_name="MyThread", file_and_line='my_file.cc:123', metadata_fields={'field1': 432, 'field2': 'value'}, ) class CapturingLogger(logging.Logger): """Captures values passed to log(). Tests that calls to log() have the correct level, extra arguments and the message format string and arguments match. """ @dataclass(frozen=True) class LoggerLog: """Represents a process log() call.""" level: int message: str kwargs: Any def __init__(self): super().__init__(name="CapturingLogger") self.log_calls: list[CapturingLogger.LoggerLog] = [] def log(self, level, msg, *args, **kwargs) -> None: log = CapturingLogger.LoggerLog( level=level, message=msg % args, kwargs=kwargs ) self.log_calls.append(log) test_logger = CapturingLogger() log_decoded_log(test_log, test_logger) self.assertEqual(len(test_logger.log_calls), 1) self.assertEqual(test_logger.log_calls[0].level, test_log.level) self.assertEqual( test_logger.log_calls[0].message, '[%s] %s %s %s %s' % ( test_log.source_name, test_log.module_name, test_log.timestamp, test_log.message, test_log.file_and_line, ), ) self.assertEqual( test_logger.log_calls[0].kwargs['extra']['extra_metadata_fields'], test_log.metadata_fields, ) class TestLogStreamDecoderLogDropDetectionFunctionality( TestLogStreamDecoderBase ): """Tests LogStreamDecoder log drop detection functionality.""" def test_log_drops_transport_error(self): """Tests log drops at transport.""" log_entry_in_log_entries_1 = 3 log_entries_1 = log_pb2.LogEntries( first_entry_sequence_id=0, entries=[ _create_random_log_entry() for _ in range(log_entry_in_log_entries_1) ], ) self.decoder.parse_log_entries_proto(log_entries_1) # Assume a second LogEntries was dropped with 5 log entries. I.e. a # log_entries_2 with sequence_ie = 4 and 5 log entries. log_entry_in_log_entries_2 = 5 log_entry_in_log_entries_3 = 3 log_entries_3 = log_pb2.LogEntries( first_entry_sequence_id=log_entry_in_log_entries_2 + log_entry_in_log_entries_3, entries=[ _create_random_log_entry() for _ in range(log_entry_in_log_entries_3) ], ) self.decoder.parse_log_entries_proto(log_entries_3) # The drop message is placed where the dropped logs were detected. self.assertEqual( len(self.captured_logs), log_entry_in_log_entries_1 + 1 + log_entry_in_log_entries_3, msg=( 'Unexpected number of messages received: ' f'{self._captured_logs_as_str()}' ), ) self.assertEqual( ( f'Dropped {log_entry_in_log_entries_2} logs due to ' f'{LogStreamDecoder.DROP_REASON_LOSS_AT_TRANSPORT}' ), self.captured_logs[log_entry_in_log_entries_1].message, ) def test_log_drops_source_not_connected(self): """Tests log drops when source of the logs was not connected.""" log_entry_in_log_entries = 4 drop_count = 7 log_entries = log_pb2.LogEntries( first_entry_sequence_id=drop_count, entries=[ _create_random_log_entry() for _ in range(log_entry_in_log_entries) ], ) self.decoder.parse_log_entries_proto(log_entries) # The drop message is placed where the log drops was detected. self.assertEqual( len(self.captured_logs), 1 + log_entry_in_log_entries, msg=( 'Unexpected number of messages received: ' f'{self._captured_logs_as_str()}' ), ) self.assertEqual( ( f'Dropped {drop_count} logs due to ' f'{LogStreamDecoder.DROP_REASON_SOURCE_NOT_CONNECTED}' ), self.captured_logs[0].message, ) def test_log_drops_source_enqueue_failure_no_message(self): """Tests log drops when source reports log drops.""" drop_count = 5 log_entries = log_pb2.LogEntries( first_entry_sequence_id=0, entries=[ _create_random_log_entry(), _create_random_log_entry(), _create_drop_count_message_log_entry(drop_count), _create_random_log_entry(), ], ) self.decoder.parse_log_entries_proto(log_entries) # The drop message is placed where the log drops was detected. self.assertEqual( len(self.captured_logs), 4, msg=( 'Unexpected number of messages received: ' f'{self._captured_logs_as_str()}' ), ) self.assertEqual( ( f'Dropped {drop_count} logs due to ' f'{LogStreamDecoder.DROP_REASON_SOURCE_ENQUEUE_FAILURE}' ), self.captured_logs[2].message, ) def test_log_drops_source_enqueue_failure_with_message(self): """Tests log drops when source reports log drops.""" drop_count = 8 reason = 'Flux Capacitor exploded' log_entries = log_pb2.LogEntries( first_entry_sequence_id=0, entries=[ _create_random_log_entry(), _create_random_log_entry(), _create_drop_count_message_log_entry(drop_count, reason), _create_random_log_entry(), ], ) self.decoder.parse_log_entries_proto(log_entries) # The drop message is placed where the log drops was detected. self.assertEqual( len(self.captured_logs), 4, msg=( 'Unexpected number of messages received: ' f'{self._captured_logs_as_str()}' ), ) self.assertEqual( f'Dropped {drop_count} logs due to {reason.lower()}', self.captured_logs[2].message, ) class TestTimestampFormatting(TestCase): """Tests for log_decoder.timestamp_parser_* functions.""" maxDiff = None def test_zero_millisecond_resolution(self) -> None: # Check for 3 trailing 0s on a millisecond resolution timestamp. self.assertEqual( '00:45:45.587', timestamp_parser_ms_since_boot(2745587) ) self.assertEqual( '00:45:45.000', timestamp_parser_ms_since_boot(2745000) ) def test_zero_nanosecond_resolution(self) -> None: # Check for 6 trailing 0s on a nanosecond resolution timestamp. self.assertEqual( '00:45:45.587123', timestamp_parser_ns_since_boot(2745587123456) ) self.assertEqual( '00:45:45.000000', timestamp_parser_ns_since_boot(2745000000000) ) if __name__ == '__main__': main()