1#!/usr/bin/python3 2 3from __future__ import absolute_import 4from __future__ import division 5from __future__ import print_function 6import logging, os, select, six, subprocess, sys, unittest 7import common 8from autotest_lib.client.common_lib import logging_manager, logging_config 9 10 11class PipedStringIO(object): 12 """ 13 Like StringIO, but all I/O passes through a pipe. This means a 14 PipedStringIO is backed by a file descriptor is thus can do things like 15 pass down to a subprocess. However, this means the creating process must 16 call read_pipe() (or the classmethod read_all_pipes()) periodically to read 17 the pipe, and must call close() (or the classmethod cleanup()) to close the 18 pipe. 19 """ 20 _instances = set() 21 22 def __init__(self): 23 self._string_io = six.StringIO() 24 self._read_end, self._write_end = os.pipe() 25 PipedStringIO._instances.add(self) 26 27 28 def close(self): 29 self._string_io.close() 30 os.close(self._read_end) 31 os.close(self._write_end) 32 PipedStringIO._instances.remove(self) 33 34 35 def write(self, data): 36 os.write(self._write_end, data) 37 38 39 def flush(self): 40 pass 41 42 43 def fileno(self): 44 return self._write_end 45 46 47 def getvalue(self): 48 self.read_pipe() 49 return self._string_io.getvalue() 50 51 52 def read_pipe(self): 53 while True: 54 read_list, _, _ = select.select([self._read_end], [], [], 0) 55 if not read_list: 56 return 57 if six.PY2: 58 self._string_io.write(os.read(self._read_end, 1024)) 59 elif six.PY3: 60 self._string_io.write(os.read(self._read_end, 1024).decode('utf-8')) 61 62 63 @classmethod 64 def read_all_pipes(cls): 65 for instance in cls._instances: 66 instance.read_pipe() 67 68 69 @classmethod 70 def cleanup_all_instances(cls): 71 for instance in list(cls._instances): 72 instance.close() 73 74 75LOGGING_FORMAT = '%(levelname)s: %(message)s' 76 77_EXPECTED_STDOUT = """\ 78print 1 79system 1 80INFO: logging 1 81INFO: print 2 82INFO: system 2 83INFO: logging 2 84INFO: print 6 85INFO: system 6 86INFO: logging 6 87print 7 88system 7 89INFO: logging 7 90""" 91 92_EXPECTED_LOG1 = """\ 93INFO: print 3 94INFO: system 3 95INFO: logging 3 96INFO: print 4 97INFO: system 4 98INFO: logging 4 99INFO: print 5 100INFO: system 5 101INFO: logging 5 102""" 103 104_EXPECTED_LOG2 = """\ 105INFO: print 4 106INFO: system 4 107INFO: logging 4 108""" 109 110 111class StubLoggingConfig(logging_config.LoggingConfig): 112 console_formatter = logging.Formatter(LOGGING_FORMAT) 113 114 def __init__(self): 115 super(StubLoggingConfig, self).__init__() 116 self.log = PipedStringIO() 117 118 119 def add_debug_file_handlers(self, log_dir, log_name=None): 120 self.logger.addHandler(logging.StreamHandler(self.log)) 121 122 123# this isn't really a unit test since it creates subprocesses and pipes and all 124# that. i just use the unittest framework because it's convenient. 125class LoggingManagerTest(unittest.TestCase): 126 def setUp(self): 127 self.stdout = PipedStringIO() 128 self._log1 = PipedStringIO() 129 self._log2 = PipedStringIO() 130 131 self._real_system_calls = False 132 133 # the LoggingManager will change self.stdout (by design), so keep a 134 # copy around 135 self._original_stdout = self.stdout 136 137 # clear out import-time logging config and reconfigure 138 root_logger = logging.getLogger() 139 for handler in list(root_logger.handlers): 140 root_logger.removeHandler(handler) 141 # use INFO to ignore debug output from logging_manager itself 142 logging.basicConfig(level=logging.INFO, format=LOGGING_FORMAT, 143 stream=self.stdout) 144 145 self._config_object = StubLoggingConfig() 146 logging_manager.LoggingManager.logging_config_object = ( 147 self._config_object) 148 149 150 def tearDown(self): 151 PipedStringIO.cleanup_all_instances() 152 153 154 def _say(self, suffix): 155 print('print %s' % suffix, file=self.stdout) 156 if self._real_system_calls: 157 os.system('echo system %s >&%s' % (suffix, 158 self._original_stdout.fileno())) 159 else: 160 print('system %s' % suffix, file=self.stdout) 161 logging.info('logging %s', suffix) 162 PipedStringIO.read_all_pipes() 163 164 165 def _setup_manager(self, manager_class=logging_manager.LoggingManager): 166 def set_stdout(file_object): 167 self.stdout = file_object 168 manager = manager_class() 169 manager.manage_stream(self.stdout, logging.INFO, set_stdout) 170 return manager 171 172 173 def _run_test(self, manager_class): 174 manager = self._setup_manager(manager_class) 175 176 self._say(1) 177 178 manager.start_logging() 179 self._say(2) 180 181 manager.redirect_to_stream(self._log1) 182 self._say(3) 183 184 manager.tee_redirect_to_stream(self._log2) 185 self._say(4) 186 187 manager.undo_redirect() 188 self._say(5) 189 190 manager.undo_redirect() 191 self._say(6) 192 193 manager.stop_logging() 194 self._say(7) 195 196 197 def _grab_fd_info(self): 198 command = 'ls -l /proc/%s/fd' % os.getpid() 199 proc = subprocess.Popen(command.split(), shell=True, 200 stdout=subprocess.PIPE) 201 return proc.communicate()[0] 202 203 204 def _compare_logs(self, log_buffer, expected_text): 205 actual_lines = log_buffer.getvalue().splitlines() 206 expected_lines = expected_text.splitlines() 207 if self._real_system_calls: 208 # because of the many interacting processes, we can't ensure perfect 209 # interleaving. so compare sets of lines rather than ordered lines. 210 actual_lines = set(actual_lines) 211 expected_lines = set(expected_lines) 212 self.assertEquals(actual_lines, expected_lines) 213 214 215 def _check_results(self): 216 # ensure our stdout was restored 217 self.assertEquals(self.stdout, self._original_stdout) 218 219 if self._real_system_calls: 220 # ensure FDs were left in their original state 221 self.assertEquals(self._grab_fd_info(), self._fd_info) 222 223 self._compare_logs(self.stdout, _EXPECTED_STDOUT) 224 self._compare_logs(self._log1, _EXPECTED_LOG1) 225 self._compare_logs(self._log2, _EXPECTED_LOG2) 226 227 228 def test_logging_manager(self): 229 self._run_test(logging_manager.LoggingManager) 230 self._check_results() 231 232 233 def test_fd_redirection_logging_manager(self): 234 self._real_system_calls = True 235 self._fd_info = self._grab_fd_info() 236 self._run_test(logging_manager.FdRedirectionLoggingManager) 237 self._check_results() 238 239 240 def test_tee_redirect_debug_dir(self): 241 manager = self._setup_manager() 242 manager.start_logging() 243 244 manager.tee_redirect_debug_dir('/fake/dir', tag='mytag') 245 print('hello', file=self.stdout) 246 247 manager.undo_redirect() 248 print('goodbye', file=self.stdout) 249 250 manager.stop_logging() 251 252 self._compare_logs(self.stdout, 253 'INFO: mytag : hello\nINFO: goodbye') 254 self._compare_logs(self._config_object.log, 'hello\n') 255 256 257class MonkeyPatchTestCase(unittest.TestCase): 258 def setUp(self): 259 filename = os.path.split(__file__)[1] 260 if filename.endswith('.pyc'): 261 filename = filename[:-1] 262 self.expected_filename = filename 263 264 265 def check_filename(self, filename, expected=None): 266 if expected is None: 267 expected = [self.expected_filename] 268 self.assertIn(os.path.split(filename)[1], expected) 269 270 271 def _0_test_find_caller(self): 272 finder = logging_manager._logging_manager_aware_logger__find_caller 273 filename, lineno, caller_name = finder(logging_manager.logger) 274 self.check_filename(filename) 275 self.assertEquals('test_find_caller', caller_name) 276 277 278 def _1_test_find_caller(self): 279 self._0_test_find_caller() 280 281 282 def test_find_caller(self): 283 self._1_test_find_caller() 284 285 286 def _0_test_non_reported_find_caller(self): 287 finder = logging_manager._logging_manager_aware_logger__find_caller 288 filename, lineno, caller_name = finder(logging_manager.logger) 289 # Python 2.4 unittest implementation will call the unittest method in 290 # file 'unittest.py', and Python >= 2.6 does the same in 'case.py' 291 self.check_filename(filename, expected=['unittest.py', 'case.py']) 292 293 294 def _1_test_non_reported_find_caller(self): 295 self._0_test_non_reported_find_caller() 296 297 298 @logging_manager.do_not_report_as_logging_caller 299 def test_non_reported_find_caller(self): 300 self._1_test_non_reported_find_caller() 301 302 303 304if __name__ == '__main__': 305 unittest.main() 306