# Lint as: python2, python3 # Copyright 2021 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Helper class to manage bluetooth logs""" from datetime import datetime import logging import os import re import subprocess import time SYSLOG_PATH = '/var/log/messages' class LogManager(object): """The LogManager class helps to collect logs without a listening thread""" DEFAULT_ENCODING = 'utf-8' class LoggingException(Exception): """A stub exception class for LogManager class.""" pass def __init__(self, log_path=SYSLOG_PATH, raise_missing=False): """Initialize log manager object @param log_path: string path to log file to manage @param raise_missing: raise an exception if the log file is missing @raises: LogManager.LoggingException on non-existent log file """ if not os.path.isfile(log_path): msg = 'Requested log file {} does not exist'.format(log_path) if raise_missing: raise LogManager.LoggingException(msg) else: self._LogErrorToSyslog(msg) self.log_path = log_path self.ResetLogMarker() self._bin_log_contents = [] def _LogErrorToSyslog(self, message): """Create a new syslog file and add a message to syslog.""" subprocess.call(['reload', 'syslog']) subprocess.call(['logger', message]) def _GetSize(self): """Get the size of the log""" try: return os.path.getsize(self.log_path) except Exception as e: logging.error('Failed to get log size: {}'.format(e)) return 0 def ResetLogMarker(self, now_size=None): """Reset the start-of-log marker for later comparison""" if now_size is None: now_size = self._GetSize() self.initial_log_size = now_size def StartRecording(self): """Mark initial log size for later comparison""" self._bin_log_contents = [] def StopRecording(self): """Gather the logs since StartRecording was called @raises: LogManager.LoggingException if: - Log file disappeared since StartRecording was called - Log file is smaller than when logging began - StartRecording was never called """ initial_size = self.initial_log_size now_size = self._GetSize() if not os.path.isfile(self.log_path): msg = 'File {} disappeared unexpectedly'.format(self.log_path) raise LogManager.LoggingException(msg) if now_size < initial_size: msg = 'Log became smaller unexpectedly' raise LogManager.LoggingException(msg) with open(self.log_path, 'rb') as mf: # Skip to the point where we started recording mf.seek(self.initial_log_size) readsize = now_size - self.initial_log_size self._bin_log_contents = mf.read(readsize).split(b'\n') # Re-set start of log marker self.ResetLogMarker(now_size) def LogContains(self, search_str): """Performs simple string checking on each line from the collected log @param search_str: string to be located within log contents. This arg is expected to not span between lines in the logs @returns: True if search_str was located in the collected log contents, False otherwise """ pattern = re.compile(search_str.encode(self.DEFAULT_ENCODING)) for line in self._bin_log_contents: if pattern.search(line): return True return False def FilterOut(self, rm_reg_exp): """Remove lines with specified pattern from the log file @param rm_reg_exp: regular expression of the lines to be removed """ # If log_path doesn't exist, there's nothing to do if not os.path.isfile(self.log_path): return rm_line_cnt = 0 initial_size = self._GetSize() rm_pattern = re.compile(rm_reg_exp.encode(self.DEFAULT_ENCODING)) with open(self.log_path, 'rb+') as mf: lines = mf.readlines() mf.seek(0) for line in lines: if rm_pattern.search(line): rm_line_cnt += 1 else: mf.write(line) mf.truncate() # Some tracebacks point out here causing /var/log/messages missing but # we don't have many clues. Adding a check and logs here. if not os.path.isfile(self.log_path): msg = '{} does not exist after FilterOut'.format(self.log_path) logging.warning(msg) self._LogErrorToSyslog(msg) new_size = self._GetSize() rm_byte = initial_size - new_size logging.info('Removed number of line: %d, Reduced log size: %d byte', rm_line_cnt, rm_byte) # Note the new size of the log self.ResetLogMarker(new_size) class InterleaveLogger(LogManager): """LogManager class that focus on interleave scan""" # Example bluetooth kernel log: # "2020-11-23T07:52:31.395941Z DEBUG kernel: [ 6469.811135] Bluetooth: " # "cancel_interleave_scan() hci0: cancelling interleave scan" KERNEL_LOG_PATTERN = ('([^ ]+) DEBUG kernel: \[.*\] Bluetooth: ' '{FUNCTION}\(\) hci0: {LOG_STR}') STATE_PATTERN = KERNEL_LOG_PATTERN.format( FUNCTION='hci_req_add_le_interleaved_scan', LOG_STR='next state: (.+)') CANCEL_PATTERN = KERNEL_LOG_PATTERN.format( FUNCTION='cancel_interleave_scan', LOG_STR='cancelling interleave scan') SYSTIME_LENGTH = len('2020-12-18T00:11:22.345678') def __init__(self): """ Initialize object """ self.reset() self.state_pattern = re.compile( self.STATE_PATTERN.encode(self.DEFAULT_ENCODING)) self.cancel_pattern = re.compile( self.CANCEL_PATTERN.encode(self.DEFAULT_ENCODING)) super(InterleaveLogger, self).__init__() def reset(self): """ Clear data between each log collection attempt """ self.records = [] self.cancel_events = [] def StartRecording(self): """ Reset the previous data and start recording. """ self.reset() super(InterleaveLogger, self).ResetLogMarker() super(InterleaveLogger, self).StartRecording() def StopRecording(self): """ Stop recording and parse logs The following data will be set after this call - self.records: a dictionary where each item is a record of interleave |state| and the |time| the state starts. |state| could be {'no filter', 'allowlist'} |time| is system time in sec - self.cancel_events: a list of |time| when a interleave cancel event log was found |time| is system time in sec @returns: True if StopRecording success, False otherwise """ try: super(InterleaveLogger, self).StopRecording() except Exception as e: logging.error(e) return False success = True def sys_time_to_timestamp(time_str): """ Return timestamp of time_str """ # This is to remove the suffix of time string, in some cases the # time string ends with an extra 'Z', in other cases, the string # ends with time zone (ex. '+08:00') time_str = time_str[:self.SYSTIME_LENGTH] try: dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%f") except Exception as e: logging.error(e) success = False return 0 return time.mktime(dt.timetuple()) + dt.microsecond * (10**-6) for line in self._bin_log_contents: line = line.strip().replace(b'\\r\\n', b'') state_pattern = self.state_pattern.search(line) cancel_pattern = self.cancel_pattern.search(line) if cancel_pattern: time_str = cancel_pattern.groups()[0].decode( self.DEFAULT_ENCODING) time_sec = sys_time_to_timestamp(time_str) self.cancel_events.append(time_sec) if state_pattern: time_str, state = [ x.decode(self.DEFAULT_ENCODING) for x in state_pattern.groups() ] time_sec = sys_time_to_timestamp(time_str) self.records.append({'time': time_sec, 'state': state}) return success