255 lines
8.8 KiB
Python
255 lines
8.8 KiB
Python
# Lint as: python2, python3
|
|
# Copyright 2021 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
"""Helper class to manage bluetooth logs"""
|
|
|
|
from datetime import datetime
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import time
|
|
|
|
|
|
SYSLOG_PATH = '/var/log/messages'
|
|
|
|
|
|
class LogManager(object):
|
|
"""The LogManager class helps to collect logs without a listening thread"""
|
|
|
|
DEFAULT_ENCODING = 'utf-8'
|
|
|
|
class LoggingException(Exception):
|
|
"""A stub exception class for LogManager class."""
|
|
pass
|
|
|
|
def __init__(self, log_path=SYSLOG_PATH, raise_missing=False):
|
|
"""Initialize log manager object
|
|
|
|
@param log_path: string path to log file to manage
|
|
@param raise_missing: raise an exception if the log file is missing
|
|
|
|
@raises: LogManager.LoggingException on non-existent log file
|
|
"""
|
|
if not os.path.isfile(log_path):
|
|
msg = 'Requested log file {} does not exist'.format(log_path)
|
|
if raise_missing:
|
|
raise LogManager.LoggingException(msg)
|
|
else:
|
|
self._LogErrorToSyslog(msg)
|
|
|
|
self.log_path = log_path
|
|
|
|
self.ResetLogMarker()
|
|
self._bin_log_contents = []
|
|
|
|
def _LogErrorToSyslog(self, message):
|
|
"""Create a new syslog file and add a message to syslog."""
|
|
subprocess.call(['reload', 'syslog'])
|
|
subprocess.call(['logger', message])
|
|
|
|
def _GetSize(self):
|
|
"""Get the size of the log"""
|
|
try:
|
|
return os.path.getsize(self.log_path)
|
|
except Exception as e:
|
|
logging.error('Failed to get log size: {}'.format(e))
|
|
return 0
|
|
|
|
def ResetLogMarker(self, now_size=None):
|
|
"""Reset the start-of-log marker for later comparison"""
|
|
if now_size is None:
|
|
now_size = self._GetSize()
|
|
self.initial_log_size = now_size
|
|
|
|
def StartRecording(self):
|
|
"""Mark initial log size for later comparison"""
|
|
|
|
self._bin_log_contents = []
|
|
|
|
def StopRecording(self):
|
|
"""Gather the logs since StartRecording was called
|
|
|
|
@raises: LogManager.LoggingException if:
|
|
- Log file disappeared since StartRecording was called
|
|
- Log file is smaller than when logging began
|
|
- StartRecording was never called
|
|
"""
|
|
initial_size = self.initial_log_size
|
|
now_size = self._GetSize()
|
|
|
|
if not os.path.isfile(self.log_path):
|
|
msg = 'File {} disappeared unexpectedly'.format(self.log_path)
|
|
raise LogManager.LoggingException(msg)
|
|
|
|
if now_size < initial_size:
|
|
msg = 'Log became smaller unexpectedly'
|
|
raise LogManager.LoggingException(msg)
|
|
|
|
with open(self.log_path, 'rb') as mf:
|
|
# Skip to the point where we started recording
|
|
mf.seek(self.initial_log_size)
|
|
|
|
readsize = now_size - self.initial_log_size
|
|
self._bin_log_contents = mf.read(readsize).split(b'\n')
|
|
|
|
# Re-set start of log marker
|
|
self.ResetLogMarker(now_size)
|
|
|
|
def LogContains(self, search_str):
|
|
"""Performs simple string checking on each line from the collected log
|
|
|
|
@param search_str: string to be located within log contents. This arg
|
|
is expected to not span between lines in the logs
|
|
|
|
@returns: True if search_str was located in the collected log contents,
|
|
False otherwise
|
|
"""
|
|
pattern = re.compile(search_str.encode(self.DEFAULT_ENCODING))
|
|
for line in self._bin_log_contents:
|
|
if pattern.search(line):
|
|
return True
|
|
|
|
return False
|
|
|
|
def FilterOut(self, rm_reg_exp):
|
|
"""Remove lines with specified pattern from the log file
|
|
|
|
@param rm_reg_exp: regular expression of the lines to be removed
|
|
"""
|
|
# If log_path doesn't exist, there's nothing to do
|
|
if not os.path.isfile(self.log_path):
|
|
return
|
|
|
|
rm_line_cnt = 0
|
|
initial_size = self._GetSize()
|
|
rm_pattern = re.compile(rm_reg_exp.encode(self.DEFAULT_ENCODING))
|
|
|
|
with open(self.log_path, 'rb+') as mf:
|
|
lines = mf.readlines()
|
|
mf.seek(0)
|
|
for line in lines:
|
|
if rm_pattern.search(line):
|
|
rm_line_cnt += 1
|
|
else:
|
|
mf.write(line)
|
|
mf.truncate()
|
|
|
|
# Some tracebacks point out here causing /var/log/messages missing but
|
|
# we don't have many clues. Adding a check and logs here.
|
|
if not os.path.isfile(self.log_path):
|
|
msg = '{} does not exist after FilterOut'.format(self.log_path)
|
|
logging.warning(msg)
|
|
self._LogErrorToSyslog(msg)
|
|
|
|
new_size = self._GetSize()
|
|
rm_byte = initial_size - new_size
|
|
logging.info('Removed number of line: %d, Reduced log size: %d byte',
|
|
rm_line_cnt, rm_byte)
|
|
|
|
# Note the new size of the log
|
|
self.ResetLogMarker(new_size)
|
|
|
|
|
|
class InterleaveLogger(LogManager):
|
|
"""LogManager class that focus on interleave scan"""
|
|
|
|
# Example bluetooth kernel log:
|
|
# "2020-11-23T07:52:31.395941Z DEBUG kernel: [ 6469.811135] Bluetooth: "
|
|
# "cancel_interleave_scan() hci0: cancelling interleave scan"
|
|
KERNEL_LOG_PATTERN = ('([^ ]+) DEBUG kernel: \[.*\] Bluetooth: '
|
|
'{FUNCTION}\(\) hci0: {LOG_STR}')
|
|
STATE_PATTERN = KERNEL_LOG_PATTERN.format(
|
|
FUNCTION='hci_req_add_le_interleaved_scan',
|
|
LOG_STR='next state: (.+)')
|
|
CANCEL_PATTERN = KERNEL_LOG_PATTERN.format(
|
|
FUNCTION='cancel_interleave_scan',
|
|
LOG_STR='cancelling interleave scan')
|
|
SYSTIME_LENGTH = len('2020-12-18T00:11:22.345678')
|
|
|
|
def __init__(self):
|
|
""" Initialize object
|
|
"""
|
|
self.reset()
|
|
self.state_pattern = re.compile(
|
|
self.STATE_PATTERN.encode(self.DEFAULT_ENCODING))
|
|
self.cancel_pattern = re.compile(
|
|
self.CANCEL_PATTERN.encode(self.DEFAULT_ENCODING))
|
|
super(InterleaveLogger, self).__init__()
|
|
|
|
def reset(self):
|
|
""" Clear data between each log collection attempt
|
|
"""
|
|
self.records = []
|
|
self.cancel_events = []
|
|
|
|
def StartRecording(self):
|
|
""" Reset the previous data and start recording.
|
|
"""
|
|
self.reset()
|
|
super(InterleaveLogger, self).ResetLogMarker()
|
|
super(InterleaveLogger, self).StartRecording()
|
|
|
|
def StopRecording(self):
|
|
""" Stop recording and parse logs
|
|
The following data will be set after this call
|
|
|
|
- self.records: a dictionary where each item is a record of
|
|
interleave |state| and the |time| the state starts.
|
|
|state| could be {'no filter', 'allowlist'}
|
|
|time| is system time in sec
|
|
|
|
- self.cancel_events: a list of |time| when a interleave cancel
|
|
event log was found
|
|
|time| is system time in sec
|
|
|
|
@returns: True if StopRecording success, False otherwise
|
|
|
|
"""
|
|
try:
|
|
super(InterleaveLogger, self).StopRecording()
|
|
except Exception as e:
|
|
logging.error(e)
|
|
return False
|
|
|
|
success = True
|
|
|
|
def sys_time_to_timestamp(time_str):
|
|
""" Return timestamp of time_str """
|
|
|
|
# This is to remove the suffix of time string, in some cases the
|
|
# time string ends with an extra 'Z', in other cases, the string
|
|
# ends with time zone (ex. '+08:00')
|
|
time_str = time_str[:self.SYSTIME_LENGTH]
|
|
|
|
try:
|
|
dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%f")
|
|
except Exception as e:
|
|
logging.error(e)
|
|
success = False
|
|
return 0
|
|
|
|
return time.mktime(dt.timetuple()) + dt.microsecond * (10**-6)
|
|
|
|
for line in self._bin_log_contents:
|
|
line = line.strip().replace(b'\\r\\n', b'')
|
|
state_pattern = self.state_pattern.search(line)
|
|
cancel_pattern = self.cancel_pattern.search(line)
|
|
|
|
if cancel_pattern:
|
|
time_str = cancel_pattern.groups()[0].decode(
|
|
self.DEFAULT_ENCODING)
|
|
time_sec = sys_time_to_timestamp(time_str)
|
|
self.cancel_events.append(time_sec)
|
|
|
|
if state_pattern:
|
|
time_str, state = [
|
|
x.decode(self.DEFAULT_ENCODING)
|
|
for x in state_pattern.groups()
|
|
]
|
|
time_sec = sys_time_to_timestamp(time_str)
|
|
self.records.append({'time': time_sec, 'state': state})
|
|
|
|
return success
|