xref: /aosp_15_r20/external/autotest/client/cros/bluetooth/output_recorder.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""This module provides an object to record the output of command-line program.
7"""
8
9from __future__ import absolute_import
10from __future__ import division
11from __future__ import print_function
12import fcntl
13import logging
14import os
15import pty
16import re
17import subprocess
18import sys
19import threading
20import time
21
22
23class OutputRecorderError(Exception):
24    """An exception class for output_recorder module."""
25    pass
26
27
28def _may_append_encoding_kwargs(kwargs):
29    """Appends encoding kwarg if it is run in Python 3+.
30
31    @param kwargs: dict of kwargs.
32    """
33    if sys.version_info.major > 2:
34        kwargs['encoding'] = 'utf-8'
35
36
37class OutputRecorder(object):
38    """A class used to record the output of command line program.
39
40    A thread is dedicated to performing non-blocking reading of the
41    command outpt in this class. Other possible approaches include
42    1. using GObject.io_add_watch() to register a callback and
43       reading the output when available, or
44    2. using select.select() with a short timeout, and reading
45       the output if available.
46    However, the above two approaches are not very reliable. Hence,
47    this approach using non-blocking reading is adopted.
48
49    To prevent the block buffering of the command output, a pseudo
50    terminal is created through pty.openpty(). This forces the
51    line output.
52
53    This class saves the output in self.contents so that it is
54    easy to perform regular expression search(). The output is
55    also saved in a file.
56
57    """
58
59    DEFAULT_OPEN_MODE = 'a'
60    START_DELAY_SECS = 1        # Delay after starting recording.
61    STOP_DELAY_SECS = 1         # Delay before stopping recording.
62    POLLING_DELAY_SECS = 0.1    # Delay before next polling.
63    TMP_FILE = '/tmp/output_recorder.dat'
64
65    def __init__(self, cmd, open_mode=DEFAULT_OPEN_MODE,
66                 start_delay_secs=START_DELAY_SECS,
67                 stop_delay_secs=STOP_DELAY_SECS, save_file=TMP_FILE):
68        """Construction of output recorder.
69
70        @param cmd: the command of which the output is to record.
71                This may be a list or a string.
72        @param open_mode: the open mode for writing output to save_file.
73                Could be either 'w' or 'a'.
74        @param stop_delay_secs: the delay time before stopping the cmd.
75        @param save_file: the file to save the output.
76
77        """
78        self.cmd = [cmd] if isinstance(cmd, str) else cmd
79        self.open_mode = open_mode
80        self.start_delay_secs = start_delay_secs
81        self.stop_delay_secs = stop_delay_secs
82        self.save_file = save_file
83        self.contents = []
84
85        # Create a thread dedicated to record the output.
86        self._recording_thread = None
87        self._stop_recording_thread_event = threading.Event()
88
89        # Use pseudo terminal to prevent buffering of the program output.
90        self._main, self._node = pty.openpty()
91        fdopen_kwargs = {}
92        _may_append_encoding_kwargs(fdopen_kwargs)
93        self._output = os.fdopen(self._main, **fdopen_kwargs)
94
95        # Set non-blocking flag.
96        fcntl.fcntl(self._output, fcntl.F_SETFL, os.O_NONBLOCK)
97
98
99    def record(self):
100        """Record the output of the cmd."""
101        logging.info('Recording output of "%s".', ' '.join(self.cmd))
102        try:
103            popen_kwargs = {'stdout': self._node, 'stderr': self._node}
104            _may_append_encoding_kwargs(popen_kwargs)
105            self._recorder = subprocess.Popen(self.cmd, **popen_kwargs)
106        except:
107            raise OutputRecorderError('Failed to run "%s"' %
108                                      ' '.join(self.cmd))
109
110        ansi_escape_re = re.compile(r'\x1b\[[^m]*m')
111
112        open_kwargs = {}
113        _may_append_encoding_kwargs(open_kwargs)
114        with open(self.save_file, self.open_mode, **open_kwargs) as output_f:
115            output_f.write(os.linesep + '*' * 80 + os.linesep)
116            while True:
117                try:
118                    # Perform non-blocking read.
119                    line = self._output.readline()
120                except:
121                    # Set empty string if nothing to read.
122                    line = ''
123
124                if line:
125                    # Remove ANSI escape sequence so that XML converter can work.
126                    line = ansi_escape_re.sub('', line)
127                    output_f.write(line)
128                    output_f.flush()
129                    self.contents.append(line)
130                elif self._stop_recording_thread_event.is_set():
131                    self._stop_recording_thread_event.clear()
132                    break
133                else:
134                    # Sleep a while if nothing to read yet.
135                    time.sleep(self.POLLING_DELAY_SECS)
136
137
138    def start(self):
139        """Start the recording thread."""
140        logging.info('Start recording thread.')
141        self.clear_contents()
142        self._recording_thread = threading.Thread(target=self.record)
143        self._recording_thread.start()
144        time.sleep(self.start_delay_secs)
145
146
147    def stop(self):
148        """Stop the recording thread."""
149        logging.info('Stop recording thread.')
150        time.sleep(self.stop_delay_secs)
151        self._stop_recording_thread_event.set()
152        self._recording_thread.join()
153
154        # Kill the process.
155        self._recorder.terminate()
156        self._recorder.kill()
157
158
159    def clear_contents(self):
160        """Clear the contents."""
161        self.contents = []
162
163
164    def get_contents(self, search_str='', start_str=''):
165        """Get the (filtered) contents.
166
167        @param search_str: only lines with search_str would be kept.
168        @param start_str: all lines before the occurrence of start_str would be
169                          filtered.
170
171        @returns: the (filtered) contents.
172
173        """
174        search_pattern = re.compile(search_str) if search_str else None
175        start_pattern = re.compile(start_str) if start_str else None
176
177        # Just returns the original contents if no filtered conditions are
178        # specified.
179        if not search_pattern and not start_pattern:
180            return self.contents
181
182        contents = []
183        start_flag = not bool(start_pattern)
184        for line in self.contents:
185            if start_flag:
186                if search_pattern.search(line):
187                    contents.append(line.strip())
188            elif start_pattern.search(line):
189                start_flag = True
190                contents.append(line.strip())
191
192        return contents
193
194
195    def find(self, pattern_str, flags=re.I):
196        """Find a pattern string in the contents.
197
198        Note that the pattern_str is considered as an arbitrary literal string
199        that might contain re meta-characters, e.g., '(' or ')'. Hence,
200        re.escape() is applied before using re.compile.
201
202        @param pattern_str: the pattern string to search.
203        @param flags: the flags of the pattern expression behavior.
204
205        @returns: True if found. False otherwise.
206
207        """
208        pattern = re.compile(re.escape(pattern_str), flags)
209        for line in self.contents:
210            result = pattern.search(line)
211            if result:
212                return True
213        return False
214
215
216if __name__ == '__main__':
217    # A demo using btmon tool to monitor bluetoohd activity.
218    cmd = ['btmon', '-c', 'never']
219    recorder = OutputRecorder(cmd)
220
221    if True:
222        recorder.start()
223        # Perform some bluetooth activities here in another terminal.
224        time.sleep(recorder.stop_delay_secs)
225        recorder.stop()
226
227    for line in recorder.get_contents():
228        print(line)
229