1# Copyright 2001-2022 by Vinay Sajip. All Rights Reserved. 2# 3# Permission to use, copy, modify, and distribute this software and its 4# documentation for any purpose and without fee is hereby granted, 5# provided that the above copyright notice appear in all copies and that 6# both that copyright notice and this permission notice appear in 7# supporting documentation, and that the name of Vinay Sajip 8# not be used in advertising or publicity pertaining to distribution 9# of the software without specific, written prior permission. 10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING 11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR 13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER 14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 17"""Test harness for the logging module. Run all tests. 18 19Copyright (C) 2001-2022 Vinay Sajip. All Rights Reserved. 20""" 21 22import logging 23import logging.handlers 24import logging.config 25 26import codecs 27import configparser 28import copy 29import datetime 30import pathlib 31import pickle 32import io 33import gc 34import json 35import os 36import queue 37import random 38import re 39import shutil 40import socket 41import struct 42import sys 43import tempfile 44from test.support.script_helper import assert_python_ok, assert_python_failure 45from test import support 46from test.support import os_helper 47from test.support import socket_helper 48from test.support import threading_helper 49from test.support import warnings_helper 50from test.support.logging_helper import TestHandler 51import textwrap 52import threading 53import time 54import unittest 55import warnings 56import weakref 57 58from http.server import HTTPServer, BaseHTTPRequestHandler 59from urllib.parse import urlparse, parse_qs 60from socketserver import (ThreadingUDPServer, DatagramRequestHandler, 61 ThreadingTCPServer, StreamRequestHandler) 62 63 64asyncore = warnings_helper.import_deprecated('asyncore') 65smtpd = warnings_helper.import_deprecated('smtpd') 66 67 68try: 69 import win32evtlog, win32evtlogutil, pywintypes 70except ImportError: 71 win32evtlog = win32evtlogutil = pywintypes = None 72 73try: 74 import zlib 75except ImportError: 76 pass 77 78 79class BaseTest(unittest.TestCase): 80 81 """Base class for logging tests.""" 82 83 log_format = "%(name)s -> %(levelname)s: %(message)s" 84 expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$" 85 message_num = 0 86 87 def setUp(self): 88 """Setup the default logging stream to an internal StringIO instance, 89 so that we can examine log output as we want.""" 90 self._threading_key = threading_helper.threading_setup() 91 92 logger_dict = logging.getLogger().manager.loggerDict 93 logging._acquireLock() 94 try: 95 self.saved_handlers = logging._handlers.copy() 96 self.saved_handler_list = logging._handlerList[:] 97 self.saved_loggers = saved_loggers = logger_dict.copy() 98 self.saved_name_to_level = logging._nameToLevel.copy() 99 self.saved_level_to_name = logging._levelToName.copy() 100 self.logger_states = logger_states = {} 101 for name in saved_loggers: 102 logger_states[name] = getattr(saved_loggers[name], 103 'disabled', None) 104 finally: 105 logging._releaseLock() 106 107 # Set two unused loggers 108 self.logger1 = logging.getLogger("\xab\xd7\xbb") 109 self.logger2 = logging.getLogger("\u013f\u00d6\u0047") 110 111 self.root_logger = logging.getLogger("") 112 self.original_logging_level = self.root_logger.getEffectiveLevel() 113 114 self.stream = io.StringIO() 115 self.root_logger.setLevel(logging.DEBUG) 116 self.root_hdlr = logging.StreamHandler(self.stream) 117 self.root_formatter = logging.Formatter(self.log_format) 118 self.root_hdlr.setFormatter(self.root_formatter) 119 if self.logger1.hasHandlers(): 120 hlist = self.logger1.handlers + self.root_logger.handlers 121 raise AssertionError('Unexpected handlers: %s' % hlist) 122 if self.logger2.hasHandlers(): 123 hlist = self.logger2.handlers + self.root_logger.handlers 124 raise AssertionError('Unexpected handlers: %s' % hlist) 125 self.root_logger.addHandler(self.root_hdlr) 126 self.assertTrue(self.logger1.hasHandlers()) 127 self.assertTrue(self.logger2.hasHandlers()) 128 129 def tearDown(self): 130 """Remove our logging stream, and restore the original logging 131 level.""" 132 self.stream.close() 133 self.root_logger.removeHandler(self.root_hdlr) 134 while self.root_logger.handlers: 135 h = self.root_logger.handlers[0] 136 self.root_logger.removeHandler(h) 137 h.close() 138 self.root_logger.setLevel(self.original_logging_level) 139 logging._acquireLock() 140 try: 141 logging._levelToName.clear() 142 logging._levelToName.update(self.saved_level_to_name) 143 logging._nameToLevel.clear() 144 logging._nameToLevel.update(self.saved_name_to_level) 145 logging._handlers.clear() 146 logging._handlers.update(self.saved_handlers) 147 logging._handlerList[:] = self.saved_handler_list 148 manager = logging.getLogger().manager 149 manager.disable = 0 150 loggerDict = manager.loggerDict 151 loggerDict.clear() 152 loggerDict.update(self.saved_loggers) 153 logger_states = self.logger_states 154 for name in self.logger_states: 155 if logger_states[name] is not None: 156 self.saved_loggers[name].disabled = logger_states[name] 157 finally: 158 logging._releaseLock() 159 160 self.doCleanups() 161 threading_helper.threading_cleanup(*self._threading_key) 162 163 def assert_log_lines(self, expected_values, stream=None, pat=None): 164 """Match the collected log lines against the regular expression 165 self.expected_log_pat, and compare the extracted group values to 166 the expected_values list of tuples.""" 167 stream = stream or self.stream 168 pat = re.compile(pat or self.expected_log_pat) 169 actual_lines = stream.getvalue().splitlines() 170 self.assertEqual(len(actual_lines), len(expected_values)) 171 for actual, expected in zip(actual_lines, expected_values): 172 match = pat.search(actual) 173 if not match: 174 self.fail("Log line does not match expected pattern:\n" + 175 actual) 176 self.assertEqual(tuple(match.groups()), expected) 177 s = stream.read() 178 if s: 179 self.fail("Remaining output at end of log stream:\n" + s) 180 181 def next_message(self): 182 """Generate a message consisting solely of an auto-incrementing 183 integer.""" 184 self.message_num += 1 185 return "%d" % self.message_num 186 187 188class BuiltinLevelsTest(BaseTest): 189 """Test builtin levels and their inheritance.""" 190 191 def test_flat(self): 192 # Logging levels in a flat logger namespace. 193 m = self.next_message 194 195 ERR = logging.getLogger("ERR") 196 ERR.setLevel(logging.ERROR) 197 INF = logging.LoggerAdapter(logging.getLogger("INF"), {}) 198 INF.setLevel(logging.INFO) 199 DEB = logging.getLogger("DEB") 200 DEB.setLevel(logging.DEBUG) 201 202 # These should log. 203 ERR.log(logging.CRITICAL, m()) 204 ERR.error(m()) 205 206 INF.log(logging.CRITICAL, m()) 207 INF.error(m()) 208 INF.warning(m()) 209 INF.info(m()) 210 211 DEB.log(logging.CRITICAL, m()) 212 DEB.error(m()) 213 DEB.warning(m()) 214 DEB.info(m()) 215 DEB.debug(m()) 216 217 # These should not log. 218 ERR.warning(m()) 219 ERR.info(m()) 220 ERR.debug(m()) 221 222 INF.debug(m()) 223 224 self.assert_log_lines([ 225 ('ERR', 'CRITICAL', '1'), 226 ('ERR', 'ERROR', '2'), 227 ('INF', 'CRITICAL', '3'), 228 ('INF', 'ERROR', '4'), 229 ('INF', 'WARNING', '5'), 230 ('INF', 'INFO', '6'), 231 ('DEB', 'CRITICAL', '7'), 232 ('DEB', 'ERROR', '8'), 233 ('DEB', 'WARNING', '9'), 234 ('DEB', 'INFO', '10'), 235 ('DEB', 'DEBUG', '11'), 236 ]) 237 238 def test_nested_explicit(self): 239 # Logging levels in a nested namespace, all explicitly set. 240 m = self.next_message 241 242 INF = logging.getLogger("INF") 243 INF.setLevel(logging.INFO) 244 INF_ERR = logging.getLogger("INF.ERR") 245 INF_ERR.setLevel(logging.ERROR) 246 247 # These should log. 248 INF_ERR.log(logging.CRITICAL, m()) 249 INF_ERR.error(m()) 250 251 # These should not log. 252 INF_ERR.warning(m()) 253 INF_ERR.info(m()) 254 INF_ERR.debug(m()) 255 256 self.assert_log_lines([ 257 ('INF.ERR', 'CRITICAL', '1'), 258 ('INF.ERR', 'ERROR', '2'), 259 ]) 260 261 def test_nested_inherited(self): 262 # Logging levels in a nested namespace, inherited from parent loggers. 263 m = self.next_message 264 265 INF = logging.getLogger("INF") 266 INF.setLevel(logging.INFO) 267 INF_ERR = logging.getLogger("INF.ERR") 268 INF_ERR.setLevel(logging.ERROR) 269 INF_UNDEF = logging.getLogger("INF.UNDEF") 270 INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") 271 UNDEF = logging.getLogger("UNDEF") 272 273 # These should log. 274 INF_UNDEF.log(logging.CRITICAL, m()) 275 INF_UNDEF.error(m()) 276 INF_UNDEF.warning(m()) 277 INF_UNDEF.info(m()) 278 INF_ERR_UNDEF.log(logging.CRITICAL, m()) 279 INF_ERR_UNDEF.error(m()) 280 281 # These should not log. 282 INF_UNDEF.debug(m()) 283 INF_ERR_UNDEF.warning(m()) 284 INF_ERR_UNDEF.info(m()) 285 INF_ERR_UNDEF.debug(m()) 286 287 self.assert_log_lines([ 288 ('INF.UNDEF', 'CRITICAL', '1'), 289 ('INF.UNDEF', 'ERROR', '2'), 290 ('INF.UNDEF', 'WARNING', '3'), 291 ('INF.UNDEF', 'INFO', '4'), 292 ('INF.ERR.UNDEF', 'CRITICAL', '5'), 293 ('INF.ERR.UNDEF', 'ERROR', '6'), 294 ]) 295 296 def test_nested_with_virtual_parent(self): 297 # Logging levels when some parent does not exist yet. 298 m = self.next_message 299 300 INF = logging.getLogger("INF") 301 GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") 302 CHILD = logging.getLogger("INF.BADPARENT") 303 INF.setLevel(logging.INFO) 304 305 # These should log. 306 GRANDCHILD.log(logging.FATAL, m()) 307 GRANDCHILD.info(m()) 308 CHILD.log(logging.FATAL, m()) 309 CHILD.info(m()) 310 311 # These should not log. 312 GRANDCHILD.debug(m()) 313 CHILD.debug(m()) 314 315 self.assert_log_lines([ 316 ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), 317 ('INF.BADPARENT.UNDEF', 'INFO', '2'), 318 ('INF.BADPARENT', 'CRITICAL', '3'), 319 ('INF.BADPARENT', 'INFO', '4'), 320 ]) 321 322 def test_regression_22386(self): 323 """See issue #22386 for more information.""" 324 self.assertEqual(logging.getLevelName('INFO'), logging.INFO) 325 self.assertEqual(logging.getLevelName(logging.INFO), 'INFO') 326 327 def test_issue27935(self): 328 fatal = logging.getLevelName('FATAL') 329 self.assertEqual(fatal, logging.FATAL) 330 331 def test_regression_29220(self): 332 """See issue #29220 for more information.""" 333 logging.addLevelName(logging.INFO, '') 334 self.addCleanup(logging.addLevelName, logging.INFO, 'INFO') 335 self.assertEqual(logging.getLevelName(logging.INFO), '') 336 self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET') 337 self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET) 338 339class BasicFilterTest(BaseTest): 340 341 """Test the bundled Filter class.""" 342 343 def test_filter(self): 344 # Only messages satisfying the specified criteria pass through the 345 # filter. 346 filter_ = logging.Filter("spam.eggs") 347 handler = self.root_logger.handlers[0] 348 try: 349 handler.addFilter(filter_) 350 spam = logging.getLogger("spam") 351 spam_eggs = logging.getLogger("spam.eggs") 352 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 353 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 354 355 spam.info(self.next_message()) 356 spam_eggs.info(self.next_message()) # Good. 357 spam_eggs_fish.info(self.next_message()) # Good. 358 spam_bakedbeans.info(self.next_message()) 359 360 self.assert_log_lines([ 361 ('spam.eggs', 'INFO', '2'), 362 ('spam.eggs.fish', 'INFO', '3'), 363 ]) 364 finally: 365 handler.removeFilter(filter_) 366 367 def test_callable_filter(self): 368 # Only messages satisfying the specified criteria pass through the 369 # filter. 370 371 def filterfunc(record): 372 parts = record.name.split('.') 373 prefix = '.'.join(parts[:2]) 374 return prefix == 'spam.eggs' 375 376 handler = self.root_logger.handlers[0] 377 try: 378 handler.addFilter(filterfunc) 379 spam = logging.getLogger("spam") 380 spam_eggs = logging.getLogger("spam.eggs") 381 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 382 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 383 384 spam.info(self.next_message()) 385 spam_eggs.info(self.next_message()) # Good. 386 spam_eggs_fish.info(self.next_message()) # Good. 387 spam_bakedbeans.info(self.next_message()) 388 389 self.assert_log_lines([ 390 ('spam.eggs', 'INFO', '2'), 391 ('spam.eggs.fish', 'INFO', '3'), 392 ]) 393 finally: 394 handler.removeFilter(filterfunc) 395 396 def test_empty_filter(self): 397 f = logging.Filter() 398 r = logging.makeLogRecord({'name': 'spam.eggs'}) 399 self.assertTrue(f.filter(r)) 400 401# 402# First, we define our levels. There can be as many as you want - the only 403# limitations are that they should be integers, the lowest should be > 0 and 404# larger values mean less information being logged. If you need specific 405# level values which do not fit into these limitations, you can use a 406# mapping dictionary to convert between your application levels and the 407# logging system. 408# 409SILENT = 120 410TACITURN = 119 411TERSE = 118 412EFFUSIVE = 117 413SOCIABLE = 116 414VERBOSE = 115 415TALKATIVE = 114 416GARRULOUS = 113 417CHATTERBOX = 112 418BORING = 111 419 420LEVEL_RANGE = range(BORING, SILENT + 1) 421 422# 423# Next, we define names for our levels. You don't need to do this - in which 424# case the system will use "Level n" to denote the text for the level. 425# 426my_logging_levels = { 427 SILENT : 'Silent', 428 TACITURN : 'Taciturn', 429 TERSE : 'Terse', 430 EFFUSIVE : 'Effusive', 431 SOCIABLE : 'Sociable', 432 VERBOSE : 'Verbose', 433 TALKATIVE : 'Talkative', 434 GARRULOUS : 'Garrulous', 435 CHATTERBOX : 'Chatterbox', 436 BORING : 'Boring', 437} 438 439class GarrulousFilter(logging.Filter): 440 441 """A filter which blocks garrulous messages.""" 442 443 def filter(self, record): 444 return record.levelno != GARRULOUS 445 446class VerySpecificFilter(logging.Filter): 447 448 """A filter which blocks sociable and taciturn messages.""" 449 450 def filter(self, record): 451 return record.levelno not in [SOCIABLE, TACITURN] 452 453 454class CustomLevelsAndFiltersTest(BaseTest): 455 456 """Test various filtering possibilities with custom logging levels.""" 457 458 # Skip the logger name group. 459 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 460 461 def setUp(self): 462 BaseTest.setUp(self) 463 for k, v in my_logging_levels.items(): 464 logging.addLevelName(k, v) 465 466 def log_at_all_levels(self, logger): 467 for lvl in LEVEL_RANGE: 468 logger.log(lvl, self.next_message()) 469 470 def test_logger_filter(self): 471 # Filter at logger level. 472 self.root_logger.setLevel(VERBOSE) 473 # Levels >= 'Verbose' are good. 474 self.log_at_all_levels(self.root_logger) 475 self.assert_log_lines([ 476 ('Verbose', '5'), 477 ('Sociable', '6'), 478 ('Effusive', '7'), 479 ('Terse', '8'), 480 ('Taciturn', '9'), 481 ('Silent', '10'), 482 ]) 483 484 def test_handler_filter(self): 485 # Filter at handler level. 486 self.root_logger.handlers[0].setLevel(SOCIABLE) 487 try: 488 # Levels >= 'Sociable' are good. 489 self.log_at_all_levels(self.root_logger) 490 self.assert_log_lines([ 491 ('Sociable', '6'), 492 ('Effusive', '7'), 493 ('Terse', '8'), 494 ('Taciturn', '9'), 495 ('Silent', '10'), 496 ]) 497 finally: 498 self.root_logger.handlers[0].setLevel(logging.NOTSET) 499 500 def test_specific_filters(self): 501 # Set a specific filter object on the handler, and then add another 502 # filter object on the logger itself. 503 handler = self.root_logger.handlers[0] 504 specific_filter = None 505 garr = GarrulousFilter() 506 handler.addFilter(garr) 507 try: 508 self.log_at_all_levels(self.root_logger) 509 first_lines = [ 510 # Notice how 'Garrulous' is missing 511 ('Boring', '1'), 512 ('Chatterbox', '2'), 513 ('Talkative', '4'), 514 ('Verbose', '5'), 515 ('Sociable', '6'), 516 ('Effusive', '7'), 517 ('Terse', '8'), 518 ('Taciturn', '9'), 519 ('Silent', '10'), 520 ] 521 self.assert_log_lines(first_lines) 522 523 specific_filter = VerySpecificFilter() 524 self.root_logger.addFilter(specific_filter) 525 self.log_at_all_levels(self.root_logger) 526 self.assert_log_lines(first_lines + [ 527 # Not only 'Garrulous' is still missing, but also 'Sociable' 528 # and 'Taciturn' 529 ('Boring', '11'), 530 ('Chatterbox', '12'), 531 ('Talkative', '14'), 532 ('Verbose', '15'), 533 ('Effusive', '17'), 534 ('Terse', '18'), 535 ('Silent', '20'), 536 ]) 537 finally: 538 if specific_filter: 539 self.root_logger.removeFilter(specific_filter) 540 handler.removeFilter(garr) 541 542 543class HandlerTest(BaseTest): 544 def test_name(self): 545 h = logging.Handler() 546 h.name = 'generic' 547 self.assertEqual(h.name, 'generic') 548 h.name = 'anothergeneric' 549 self.assertEqual(h.name, 'anothergeneric') 550 self.assertRaises(NotImplementedError, h.emit, None) 551 552 def test_builtin_handlers(self): 553 # We can't actually *use* too many handlers in the tests, 554 # but we can try instantiating them with various options 555 if sys.platform in ('linux', 'darwin'): 556 for existing in (True, False): 557 fd, fn = tempfile.mkstemp() 558 os.close(fd) 559 if not existing: 560 os.unlink(fn) 561 h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=True) 562 if existing: 563 dev, ino = h.dev, h.ino 564 self.assertEqual(dev, -1) 565 self.assertEqual(ino, -1) 566 r = logging.makeLogRecord({'msg': 'Test'}) 567 h.handle(r) 568 # Now remove the file. 569 os.unlink(fn) 570 self.assertFalse(os.path.exists(fn)) 571 # The next call should recreate the file. 572 h.handle(r) 573 self.assertTrue(os.path.exists(fn)) 574 else: 575 self.assertEqual(h.dev, -1) 576 self.assertEqual(h.ino, -1) 577 h.close() 578 if existing: 579 os.unlink(fn) 580 if sys.platform == 'darwin': 581 sockname = '/var/run/syslog' 582 else: 583 sockname = '/dev/log' 584 try: 585 h = logging.handlers.SysLogHandler(sockname) 586 self.assertEqual(h.facility, h.LOG_USER) 587 self.assertTrue(h.unixsocket) 588 h.close() 589 except OSError: # syslogd might not be available 590 pass 591 for method in ('GET', 'POST', 'PUT'): 592 if method == 'PUT': 593 self.assertRaises(ValueError, logging.handlers.HTTPHandler, 594 'localhost', '/log', method) 595 else: 596 h = logging.handlers.HTTPHandler('localhost', '/log', method) 597 h.close() 598 h = logging.handlers.BufferingHandler(0) 599 r = logging.makeLogRecord({}) 600 self.assertTrue(h.shouldFlush(r)) 601 h.close() 602 h = logging.handlers.BufferingHandler(1) 603 self.assertFalse(h.shouldFlush(r)) 604 h.close() 605 606 def test_path_objects(self): 607 """ 608 Test that Path objects are accepted as filename arguments to handlers. 609 610 See Issue #27493. 611 """ 612 fd, fn = tempfile.mkstemp() 613 os.close(fd) 614 os.unlink(fn) 615 pfn = pathlib.Path(fn) 616 cases = ( 617 (logging.FileHandler, (pfn, 'w')), 618 (logging.handlers.RotatingFileHandler, (pfn, 'a')), 619 (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')), 620 ) 621 if sys.platform in ('linux', 'darwin'): 622 cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),) 623 for cls, args in cases: 624 h = cls(*args, encoding="utf-8") 625 self.assertTrue(os.path.exists(fn)) 626 h.close() 627 os.unlink(fn) 628 629 @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') 630 @unittest.skipIf( 631 support.is_emscripten, "Emscripten cannot fstat unlinked files." 632 ) 633 @threading_helper.requires_working_threading() 634 def test_race(self): 635 # Issue #14632 refers. 636 def remove_loop(fname, tries): 637 for _ in range(tries): 638 try: 639 os.unlink(fname) 640 self.deletion_time = time.time() 641 except OSError: 642 pass 643 time.sleep(0.004 * random.randint(0, 4)) 644 645 del_count = 500 646 log_count = 500 647 648 self.handle_time = None 649 self.deletion_time = None 650 651 for delay in (False, True): 652 fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') 653 os.close(fd) 654 remover = threading.Thread(target=remove_loop, args=(fn, del_count)) 655 remover.daemon = True 656 remover.start() 657 h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=delay) 658 f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') 659 h.setFormatter(f) 660 try: 661 for _ in range(log_count): 662 time.sleep(0.005) 663 r = logging.makeLogRecord({'msg': 'testing' }) 664 try: 665 self.handle_time = time.time() 666 h.handle(r) 667 except Exception: 668 print('Deleted at %s, ' 669 'opened at %s' % (self.deletion_time, 670 self.handle_time)) 671 raise 672 finally: 673 remover.join() 674 h.close() 675 if os.path.exists(fn): 676 os.unlink(fn) 677 678 # The implementation relies on os.register_at_fork existing, but we test 679 # based on os.fork existing because that is what users and this test use. 680 # This helps ensure that when fork exists (the important concept) that the 681 # register_at_fork mechanism is also present and used. 682 @support.requires_fork() 683 @threading_helper.requires_working_threading() 684 def test_post_fork_child_no_deadlock(self): 685 """Ensure child logging locks are not held; bpo-6721 & bpo-36533.""" 686 class _OurHandler(logging.Handler): 687 def __init__(self): 688 super().__init__() 689 self.sub_handler = logging.StreamHandler( 690 stream=open('/dev/null', 'wt', encoding='utf-8')) 691 692 def emit(self, record): 693 self.sub_handler.acquire() 694 try: 695 self.sub_handler.emit(record) 696 finally: 697 self.sub_handler.release() 698 699 self.assertEqual(len(logging._handlers), 0) 700 refed_h = _OurHandler() 701 self.addCleanup(refed_h.sub_handler.stream.close) 702 refed_h.name = 'because we need at least one for this test' 703 self.assertGreater(len(logging._handlers), 0) 704 self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1) 705 test_logger = logging.getLogger('test_post_fork_child_no_deadlock') 706 test_logger.addHandler(refed_h) 707 test_logger.setLevel(logging.DEBUG) 708 709 locks_held__ready_to_fork = threading.Event() 710 fork_happened__release_locks_and_end_thread = threading.Event() 711 712 def lock_holder_thread_fn(): 713 logging._acquireLock() 714 try: 715 refed_h.acquire() 716 try: 717 # Tell the main thread to do the fork. 718 locks_held__ready_to_fork.set() 719 720 # If the deadlock bug exists, the fork will happen 721 # without dealing with the locks we hold, deadlocking 722 # the child. 723 724 # Wait for a successful fork or an unreasonable amount of 725 # time before releasing our locks. To avoid a timing based 726 # test we'd need communication from os.fork() as to when it 727 # has actually happened. Given this is a regression test 728 # for a fixed issue, potentially less reliably detecting 729 # regression via timing is acceptable for simplicity. 730 # The test will always take at least this long. :( 731 fork_happened__release_locks_and_end_thread.wait(0.5) 732 finally: 733 refed_h.release() 734 finally: 735 logging._releaseLock() 736 737 lock_holder_thread = threading.Thread( 738 target=lock_holder_thread_fn, 739 name='test_post_fork_child_no_deadlock lock holder') 740 lock_holder_thread.start() 741 742 locks_held__ready_to_fork.wait() 743 pid = os.fork() 744 if pid == 0: 745 # Child process 746 try: 747 test_logger.info(r'Child process did not deadlock. \o/') 748 finally: 749 os._exit(0) 750 else: 751 # Parent process 752 test_logger.info(r'Parent process returned from fork. \o/') 753 fork_happened__release_locks_and_end_thread.set() 754 lock_holder_thread.join() 755 756 support.wait_process(pid, exitcode=0) 757 758 759class BadStream(object): 760 def write(self, data): 761 raise RuntimeError('deliberate mistake') 762 763class TestStreamHandler(logging.StreamHandler): 764 def handleError(self, record): 765 self.error_record = record 766 767class StreamWithIntName(object): 768 level = logging.NOTSET 769 name = 2 770 771class StreamHandlerTest(BaseTest): 772 def test_error_handling(self): 773 h = TestStreamHandler(BadStream()) 774 r = logging.makeLogRecord({}) 775 old_raise = logging.raiseExceptions 776 777 try: 778 h.handle(r) 779 self.assertIs(h.error_record, r) 780 781 h = logging.StreamHandler(BadStream()) 782 with support.captured_stderr() as stderr: 783 h.handle(r) 784 msg = '\nRuntimeError: deliberate mistake\n' 785 self.assertIn(msg, stderr.getvalue()) 786 787 logging.raiseExceptions = False 788 with support.captured_stderr() as stderr: 789 h.handle(r) 790 self.assertEqual('', stderr.getvalue()) 791 finally: 792 logging.raiseExceptions = old_raise 793 794 def test_stream_setting(self): 795 """ 796 Test setting the handler's stream 797 """ 798 h = logging.StreamHandler() 799 stream = io.StringIO() 800 old = h.setStream(stream) 801 self.assertIs(old, sys.stderr) 802 actual = h.setStream(old) 803 self.assertIs(actual, stream) 804 # test that setting to existing value returns None 805 actual = h.setStream(old) 806 self.assertIsNone(actual) 807 808 def test_can_represent_stream_with_int_name(self): 809 h = logging.StreamHandler(StreamWithIntName()) 810 self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>') 811 812# -- The following section could be moved into a server_helper.py module 813# -- if it proves to be of wider utility than just test_logging 814 815class TestSMTPServer(smtpd.SMTPServer): 816 """ 817 This class implements a test SMTP server. 818 819 :param addr: A (host, port) tuple which the server listens on. 820 You can specify a port value of zero: the server's 821 *port* attribute will hold the actual port number 822 used, which can be used in client connections. 823 :param handler: A callable which will be called to process 824 incoming messages. The handler will be passed 825 the client address tuple, who the message is from, 826 a list of recipients and the message data. 827 :param poll_interval: The interval, in seconds, used in the underlying 828 :func:`select` or :func:`poll` call by 829 :func:`asyncore.loop`. 830 :param sockmap: A dictionary which will be used to hold 831 :class:`asyncore.dispatcher` instances used by 832 :func:`asyncore.loop`. This avoids changing the 833 :mod:`asyncore` module's global state. 834 """ 835 836 def __init__(self, addr, handler, poll_interval, sockmap): 837 smtpd.SMTPServer.__init__(self, addr, None, map=sockmap, 838 decode_data=True) 839 self.port = self.socket.getsockname()[1] 840 self._handler = handler 841 self._thread = None 842 self._quit = False 843 self.poll_interval = poll_interval 844 845 def process_message(self, peer, mailfrom, rcpttos, data): 846 """ 847 Delegates to the handler passed in to the server's constructor. 848 849 Typically, this will be a test case method. 850 :param peer: The client (host, port) tuple. 851 :param mailfrom: The address of the sender. 852 :param rcpttos: The addresses of the recipients. 853 :param data: The message. 854 """ 855 self._handler(peer, mailfrom, rcpttos, data) 856 857 def start(self): 858 """ 859 Start the server running on a separate daemon thread. 860 """ 861 self._thread = t = threading.Thread(target=self.serve_forever, 862 args=(self.poll_interval,)) 863 t.daemon = True 864 t.start() 865 866 def serve_forever(self, poll_interval): 867 """ 868 Run the :mod:`asyncore` loop until normal termination 869 conditions arise. 870 :param poll_interval: The interval, in seconds, used in the underlying 871 :func:`select` or :func:`poll` call by 872 :func:`asyncore.loop`. 873 """ 874 while not self._quit: 875 asyncore.loop(poll_interval, map=self._map, count=1) 876 877 def stop(self): 878 """ 879 Stop the thread by closing the server instance. 880 Wait for the server thread to terminate. 881 """ 882 self._quit = True 883 threading_helper.join_thread(self._thread) 884 self._thread = None 885 self.close() 886 asyncore.close_all(map=self._map, ignore_all=True) 887 888 889class ControlMixin(object): 890 """ 891 This mixin is used to start a server on a separate thread, and 892 shut it down programmatically. Request handling is simplified - instead 893 of needing to derive a suitable RequestHandler subclass, you just 894 provide a callable which will be passed each received request to be 895 processed. 896 897 :param handler: A handler callable which will be called with a 898 single parameter - the request - in order to 899 process the request. This handler is called on the 900 server thread, effectively meaning that requests are 901 processed serially. While not quite web scale ;-), 902 this should be fine for testing applications. 903 :param poll_interval: The polling interval in seconds. 904 """ 905 def __init__(self, handler, poll_interval): 906 self._thread = None 907 self.poll_interval = poll_interval 908 self._handler = handler 909 self.ready = threading.Event() 910 911 def start(self): 912 """ 913 Create a daemon thread to run the server, and start it. 914 """ 915 self._thread = t = threading.Thread(target=self.serve_forever, 916 args=(self.poll_interval,)) 917 t.daemon = True 918 t.start() 919 920 def serve_forever(self, poll_interval): 921 """ 922 Run the server. Set the ready flag before entering the 923 service loop. 924 """ 925 self.ready.set() 926 super(ControlMixin, self).serve_forever(poll_interval) 927 928 def stop(self): 929 """ 930 Tell the server thread to stop, and wait for it to do so. 931 """ 932 self.shutdown() 933 if self._thread is not None: 934 threading_helper.join_thread(self._thread) 935 self._thread = None 936 self.server_close() 937 self.ready.clear() 938 939class TestHTTPServer(ControlMixin, HTTPServer): 940 """ 941 An HTTP server which is controllable using :class:`ControlMixin`. 942 943 :param addr: A tuple with the IP address and port to listen on. 944 :param handler: A handler callable which will be called with a 945 single parameter - the request - in order to 946 process the request. 947 :param poll_interval: The polling interval in seconds. 948 :param log: Pass ``True`` to enable log messages. 949 """ 950 def __init__(self, addr, handler, poll_interval=0.5, 951 log=False, sslctx=None): 952 class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): 953 def __getattr__(self, name, default=None): 954 if name.startswith('do_'): 955 return self.process_request 956 raise AttributeError(name) 957 958 def process_request(self): 959 self.server._handler(self) 960 961 def log_message(self, format, *args): 962 if log: 963 super(DelegatingHTTPRequestHandler, 964 self).log_message(format, *args) 965 HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) 966 ControlMixin.__init__(self, handler, poll_interval) 967 self.sslctx = sslctx 968 969 def get_request(self): 970 try: 971 sock, addr = self.socket.accept() 972 if self.sslctx: 973 sock = self.sslctx.wrap_socket(sock, server_side=True) 974 except OSError as e: 975 # socket errors are silenced by the caller, print them here 976 sys.stderr.write("Got an error:\n%s\n" % e) 977 raise 978 return sock, addr 979 980class TestTCPServer(ControlMixin, ThreadingTCPServer): 981 """ 982 A TCP server which is controllable using :class:`ControlMixin`. 983 984 :param addr: A tuple with the IP address and port to listen on. 985 :param handler: A handler callable which will be called with a single 986 parameter - the request - in order to process the request. 987 :param poll_interval: The polling interval in seconds. 988 :bind_and_activate: If True (the default), binds the server and starts it 989 listening. If False, you need to call 990 :meth:`server_bind` and :meth:`server_activate` at 991 some later time before calling :meth:`start`, so that 992 the server will set up the socket and listen on it. 993 """ 994 995 allow_reuse_address = True 996 997 def __init__(self, addr, handler, poll_interval=0.5, 998 bind_and_activate=True): 999 class DelegatingTCPRequestHandler(StreamRequestHandler): 1000 1001 def handle(self): 1002 self.server._handler(self) 1003 ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, 1004 bind_and_activate) 1005 ControlMixin.__init__(self, handler, poll_interval) 1006 1007 def server_bind(self): 1008 super(TestTCPServer, self).server_bind() 1009 self.port = self.socket.getsockname()[1] 1010 1011class TestUDPServer(ControlMixin, ThreadingUDPServer): 1012 """ 1013 A UDP server which is controllable using :class:`ControlMixin`. 1014 1015 :param addr: A tuple with the IP address and port to listen on. 1016 :param handler: A handler callable which will be called with a 1017 single parameter - the request - in order to 1018 process the request. 1019 :param poll_interval: The polling interval for shutdown requests, 1020 in seconds. 1021 :bind_and_activate: If True (the default), binds the server and 1022 starts it listening. If False, you need to 1023 call :meth:`server_bind` and 1024 :meth:`server_activate` at some later time 1025 before calling :meth:`start`, so that the server will 1026 set up the socket and listen on it. 1027 """ 1028 def __init__(self, addr, handler, poll_interval=0.5, 1029 bind_and_activate=True): 1030 class DelegatingUDPRequestHandler(DatagramRequestHandler): 1031 1032 def handle(self): 1033 self.server._handler(self) 1034 1035 def finish(self): 1036 data = self.wfile.getvalue() 1037 if data: 1038 try: 1039 super(DelegatingUDPRequestHandler, self).finish() 1040 except OSError: 1041 if not self.server._closed: 1042 raise 1043 1044 ThreadingUDPServer.__init__(self, addr, 1045 DelegatingUDPRequestHandler, 1046 bind_and_activate) 1047 ControlMixin.__init__(self, handler, poll_interval) 1048 self._closed = False 1049 1050 def server_bind(self): 1051 super(TestUDPServer, self).server_bind() 1052 self.port = self.socket.getsockname()[1] 1053 1054 def server_close(self): 1055 super(TestUDPServer, self).server_close() 1056 self._closed = True 1057 1058if hasattr(socket, "AF_UNIX"): 1059 class TestUnixStreamServer(TestTCPServer): 1060 address_family = socket.AF_UNIX 1061 1062 class TestUnixDatagramServer(TestUDPServer): 1063 address_family = socket.AF_UNIX 1064 1065# - end of server_helper section 1066 1067@support.requires_working_socket() 1068@threading_helper.requires_working_threading() 1069class SMTPHandlerTest(BaseTest): 1070 # bpo-14314, bpo-19665, bpo-34092: don't wait forever 1071 TIMEOUT = support.LONG_TIMEOUT 1072 1073 def test_basic(self): 1074 sockmap = {} 1075 server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001, 1076 sockmap) 1077 server.start() 1078 addr = (socket_helper.HOST, server.port) 1079 h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', 1080 timeout=self.TIMEOUT) 1081 self.assertEqual(h.toaddrs, ['you']) 1082 self.messages = [] 1083 r = logging.makeLogRecord({'msg': 'Hello \u2713'}) 1084 self.handled = threading.Event() 1085 h.handle(r) 1086 self.handled.wait(self.TIMEOUT) 1087 server.stop() 1088 self.assertTrue(self.handled.is_set()) 1089 self.assertEqual(len(self.messages), 1) 1090 peer, mailfrom, rcpttos, data = self.messages[0] 1091 self.assertEqual(mailfrom, 'me') 1092 self.assertEqual(rcpttos, ['you']) 1093 self.assertIn('\nSubject: Log\n', data) 1094 self.assertTrue(data.endswith('\n\nHello \u2713')) 1095 h.close() 1096 1097 def process_message(self, *args): 1098 self.messages.append(args) 1099 self.handled.set() 1100 1101class MemoryHandlerTest(BaseTest): 1102 1103 """Tests for the MemoryHandler.""" 1104 1105 # Do not bother with a logger name group. 1106 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 1107 1108 def setUp(self): 1109 BaseTest.setUp(self) 1110 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1111 self.root_hdlr) 1112 self.mem_logger = logging.getLogger('mem') 1113 self.mem_logger.propagate = 0 1114 self.mem_logger.addHandler(self.mem_hdlr) 1115 1116 def tearDown(self): 1117 self.mem_hdlr.close() 1118 BaseTest.tearDown(self) 1119 1120 def test_flush(self): 1121 # The memory handler flushes to its target handler based on specific 1122 # criteria (message count and message level). 1123 self.mem_logger.debug(self.next_message()) 1124 self.assert_log_lines([]) 1125 self.mem_logger.info(self.next_message()) 1126 self.assert_log_lines([]) 1127 # This will flush because the level is >= logging.WARNING 1128 self.mem_logger.warning(self.next_message()) 1129 lines = [ 1130 ('DEBUG', '1'), 1131 ('INFO', '2'), 1132 ('WARNING', '3'), 1133 ] 1134 self.assert_log_lines(lines) 1135 for n in (4, 14): 1136 for i in range(9): 1137 self.mem_logger.debug(self.next_message()) 1138 self.assert_log_lines(lines) 1139 # This will flush because it's the 10th message since the last 1140 # flush. 1141 self.mem_logger.debug(self.next_message()) 1142 lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] 1143 self.assert_log_lines(lines) 1144 1145 self.mem_logger.debug(self.next_message()) 1146 self.assert_log_lines(lines) 1147 1148 def test_flush_on_close(self): 1149 """ 1150 Test that the flush-on-close configuration works as expected. 1151 """ 1152 self.mem_logger.debug(self.next_message()) 1153 self.assert_log_lines([]) 1154 self.mem_logger.info(self.next_message()) 1155 self.assert_log_lines([]) 1156 self.mem_logger.removeHandler(self.mem_hdlr) 1157 # Default behaviour is to flush on close. Check that it happens. 1158 self.mem_hdlr.close() 1159 lines = [ 1160 ('DEBUG', '1'), 1161 ('INFO', '2'), 1162 ] 1163 self.assert_log_lines(lines) 1164 # Now configure for flushing not to be done on close. 1165 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1166 self.root_hdlr, 1167 False) 1168 self.mem_logger.addHandler(self.mem_hdlr) 1169 self.mem_logger.debug(self.next_message()) 1170 self.assert_log_lines(lines) # no change 1171 self.mem_logger.info(self.next_message()) 1172 self.assert_log_lines(lines) # no change 1173 self.mem_logger.removeHandler(self.mem_hdlr) 1174 self.mem_hdlr.close() 1175 # assert that no new lines have been added 1176 self.assert_log_lines(lines) # no change 1177 1178 @threading_helper.requires_working_threading() 1179 def test_race_between_set_target_and_flush(self): 1180 class MockRaceConditionHandler: 1181 def __init__(self, mem_hdlr): 1182 self.mem_hdlr = mem_hdlr 1183 self.threads = [] 1184 1185 def removeTarget(self): 1186 self.mem_hdlr.setTarget(None) 1187 1188 def handle(self, msg): 1189 thread = threading.Thread(target=self.removeTarget) 1190 self.threads.append(thread) 1191 thread.start() 1192 1193 target = MockRaceConditionHandler(self.mem_hdlr) 1194 try: 1195 self.mem_hdlr.setTarget(target) 1196 1197 for _ in range(10): 1198 time.sleep(0.005) 1199 self.mem_logger.info("not flushed") 1200 self.mem_logger.warning("flushed") 1201 finally: 1202 for thread in target.threads: 1203 threading_helper.join_thread(thread) 1204 1205 1206class ExceptionFormatter(logging.Formatter): 1207 """A special exception formatter.""" 1208 def formatException(self, ei): 1209 return "Got a [%s]" % ei[0].__name__ 1210 1211 1212class ConfigFileTest(BaseTest): 1213 1214 """Reading logging config from a .ini-style config file.""" 1215 1216 check_no_resource_warning = warnings_helper.check_no_resource_warning 1217 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 1218 1219 # config0 is a standard configuration. 1220 config0 = """ 1221 [loggers] 1222 keys=root 1223 1224 [handlers] 1225 keys=hand1 1226 1227 [formatters] 1228 keys=form1 1229 1230 [logger_root] 1231 level=WARNING 1232 handlers=hand1 1233 1234 [handler_hand1] 1235 class=StreamHandler 1236 level=NOTSET 1237 formatter=form1 1238 args=(sys.stdout,) 1239 1240 [formatter_form1] 1241 format=%(levelname)s ++ %(message)s 1242 datefmt= 1243 """ 1244 1245 # config1 adds a little to the standard configuration. 1246 config1 = """ 1247 [loggers] 1248 keys=root,parser 1249 1250 [handlers] 1251 keys=hand1 1252 1253 [formatters] 1254 keys=form1 1255 1256 [logger_root] 1257 level=WARNING 1258 handlers= 1259 1260 [logger_parser] 1261 level=DEBUG 1262 handlers=hand1 1263 propagate=1 1264 qualname=compiler.parser 1265 1266 [handler_hand1] 1267 class=StreamHandler 1268 level=NOTSET 1269 formatter=form1 1270 args=(sys.stdout,) 1271 1272 [formatter_form1] 1273 format=%(levelname)s ++ %(message)s 1274 datefmt= 1275 """ 1276 1277 # config1a moves the handler to the root. 1278 config1a = """ 1279 [loggers] 1280 keys=root,parser 1281 1282 [handlers] 1283 keys=hand1 1284 1285 [formatters] 1286 keys=form1 1287 1288 [logger_root] 1289 level=WARNING 1290 handlers=hand1 1291 1292 [logger_parser] 1293 level=DEBUG 1294 handlers= 1295 propagate=1 1296 qualname=compiler.parser 1297 1298 [handler_hand1] 1299 class=StreamHandler 1300 level=NOTSET 1301 formatter=form1 1302 args=(sys.stdout,) 1303 1304 [formatter_form1] 1305 format=%(levelname)s ++ %(message)s 1306 datefmt= 1307 """ 1308 1309 # config2 has a subtle configuration error that should be reported 1310 config2 = config1.replace("sys.stdout", "sys.stbout") 1311 1312 # config3 has a less subtle configuration error 1313 config3 = config1.replace("formatter=form1", "formatter=misspelled_name") 1314 1315 # config4 specifies a custom formatter class to be loaded 1316 config4 = """ 1317 [loggers] 1318 keys=root 1319 1320 [handlers] 1321 keys=hand1 1322 1323 [formatters] 1324 keys=form1 1325 1326 [logger_root] 1327 level=NOTSET 1328 handlers=hand1 1329 1330 [handler_hand1] 1331 class=StreamHandler 1332 level=NOTSET 1333 formatter=form1 1334 args=(sys.stdout,) 1335 1336 [formatter_form1] 1337 class=""" + __name__ + """.ExceptionFormatter 1338 format=%(levelname)s:%(name)s:%(message)s 1339 datefmt= 1340 """ 1341 1342 # config5 specifies a custom handler class to be loaded 1343 config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') 1344 1345 # config6 uses ', ' delimiters in the handlers and formatters sections 1346 config6 = """ 1347 [loggers] 1348 keys=root,parser 1349 1350 [handlers] 1351 keys=hand1, hand2 1352 1353 [formatters] 1354 keys=form1, form2 1355 1356 [logger_root] 1357 level=WARNING 1358 handlers= 1359 1360 [logger_parser] 1361 level=DEBUG 1362 handlers=hand1 1363 propagate=1 1364 qualname=compiler.parser 1365 1366 [handler_hand1] 1367 class=StreamHandler 1368 level=NOTSET 1369 formatter=form1 1370 args=(sys.stdout,) 1371 1372 [handler_hand2] 1373 class=StreamHandler 1374 level=NOTSET 1375 formatter=form1 1376 args=(sys.stderr,) 1377 1378 [formatter_form1] 1379 format=%(levelname)s ++ %(message)s 1380 datefmt= 1381 1382 [formatter_form2] 1383 format=%(message)s 1384 datefmt= 1385 """ 1386 1387 # config7 adds a compiler logger, and uses kwargs instead of args. 1388 config7 = """ 1389 [loggers] 1390 keys=root,parser,compiler 1391 1392 [handlers] 1393 keys=hand1 1394 1395 [formatters] 1396 keys=form1 1397 1398 [logger_root] 1399 level=WARNING 1400 handlers=hand1 1401 1402 [logger_compiler] 1403 level=DEBUG 1404 handlers= 1405 propagate=1 1406 qualname=compiler 1407 1408 [logger_parser] 1409 level=DEBUG 1410 handlers= 1411 propagate=1 1412 qualname=compiler.parser 1413 1414 [handler_hand1] 1415 class=StreamHandler 1416 level=NOTSET 1417 formatter=form1 1418 kwargs={'stream': sys.stdout,} 1419 1420 [formatter_form1] 1421 format=%(levelname)s ++ %(message)s 1422 datefmt= 1423 """ 1424 1425 # config 8, check for resource warning 1426 config8 = r""" 1427 [loggers] 1428 keys=root 1429 1430 [handlers] 1431 keys=file 1432 1433 [formatters] 1434 keys= 1435 1436 [logger_root] 1437 level=DEBUG 1438 handlers=file 1439 1440 [handler_file] 1441 class=FileHandler 1442 level=DEBUG 1443 args=("{tempfile}",) 1444 kwargs={{"encoding": "utf-8"}} 1445 """ 1446 1447 disable_test = """ 1448 [loggers] 1449 keys=root 1450 1451 [handlers] 1452 keys=screen 1453 1454 [formatters] 1455 keys= 1456 1457 [logger_root] 1458 level=DEBUG 1459 handlers=screen 1460 1461 [handler_screen] 1462 level=DEBUG 1463 class=StreamHandler 1464 args=(sys.stdout,) 1465 formatter= 1466 """ 1467 1468 def apply_config(self, conf, **kwargs): 1469 file = io.StringIO(textwrap.dedent(conf)) 1470 logging.config.fileConfig(file, encoding="utf-8", **kwargs) 1471 1472 def test_config0_ok(self): 1473 # A simple config file which overrides the default settings. 1474 with support.captured_stdout() as output: 1475 self.apply_config(self.config0) 1476 logger = logging.getLogger() 1477 # Won't output anything 1478 logger.info(self.next_message()) 1479 # Outputs a message 1480 logger.error(self.next_message()) 1481 self.assert_log_lines([ 1482 ('ERROR', '2'), 1483 ], stream=output) 1484 # Original logger output is empty. 1485 self.assert_log_lines([]) 1486 1487 def test_config0_using_cp_ok(self): 1488 # A simple config file which overrides the default settings. 1489 with support.captured_stdout() as output: 1490 file = io.StringIO(textwrap.dedent(self.config0)) 1491 cp = configparser.ConfigParser() 1492 cp.read_file(file) 1493 logging.config.fileConfig(cp) 1494 logger = logging.getLogger() 1495 # Won't output anything 1496 logger.info(self.next_message()) 1497 # Outputs a message 1498 logger.error(self.next_message()) 1499 self.assert_log_lines([ 1500 ('ERROR', '2'), 1501 ], stream=output) 1502 # Original logger output is empty. 1503 self.assert_log_lines([]) 1504 1505 def test_config1_ok(self, config=config1): 1506 # A config file defining a sub-parser as well. 1507 with support.captured_stdout() as output: 1508 self.apply_config(config) 1509 logger = logging.getLogger("compiler.parser") 1510 # Both will output a message 1511 logger.info(self.next_message()) 1512 logger.error(self.next_message()) 1513 self.assert_log_lines([ 1514 ('INFO', '1'), 1515 ('ERROR', '2'), 1516 ], stream=output) 1517 # Original logger output is empty. 1518 self.assert_log_lines([]) 1519 1520 def test_config2_failure(self): 1521 # A simple config file which overrides the default settings. 1522 self.assertRaises(Exception, self.apply_config, self.config2) 1523 1524 def test_config3_failure(self): 1525 # A simple config file which overrides the default settings. 1526 self.assertRaises(Exception, self.apply_config, self.config3) 1527 1528 def test_config4_ok(self): 1529 # A config file specifying a custom formatter class. 1530 with support.captured_stdout() as output: 1531 self.apply_config(self.config4) 1532 logger = logging.getLogger() 1533 try: 1534 raise RuntimeError() 1535 except RuntimeError: 1536 logging.exception("just testing") 1537 sys.stdout.seek(0) 1538 self.assertEqual(output.getvalue(), 1539 "ERROR:root:just testing\nGot a [RuntimeError]\n") 1540 # Original logger output is empty 1541 self.assert_log_lines([]) 1542 1543 def test_config5_ok(self): 1544 self.test_config1_ok(config=self.config5) 1545 1546 def test_config6_ok(self): 1547 self.test_config1_ok(config=self.config6) 1548 1549 def test_config7_ok(self): 1550 with support.captured_stdout() as output: 1551 self.apply_config(self.config1a) 1552 logger = logging.getLogger("compiler.parser") 1553 # See issue #11424. compiler-hyphenated sorts 1554 # between compiler and compiler.xyz and this 1555 # was preventing compiler.xyz from being included 1556 # in the child loggers of compiler because of an 1557 # overzealous loop termination condition. 1558 hyphenated = logging.getLogger('compiler-hyphenated') 1559 # All will output a message 1560 logger.info(self.next_message()) 1561 logger.error(self.next_message()) 1562 hyphenated.critical(self.next_message()) 1563 self.assert_log_lines([ 1564 ('INFO', '1'), 1565 ('ERROR', '2'), 1566 ('CRITICAL', '3'), 1567 ], stream=output) 1568 # Original logger output is empty. 1569 self.assert_log_lines([]) 1570 with support.captured_stdout() as output: 1571 self.apply_config(self.config7) 1572 logger = logging.getLogger("compiler.parser") 1573 self.assertFalse(logger.disabled) 1574 # Both will output a message 1575 logger.info(self.next_message()) 1576 logger.error(self.next_message()) 1577 logger = logging.getLogger("compiler.lexer") 1578 # Both will output a message 1579 logger.info(self.next_message()) 1580 logger.error(self.next_message()) 1581 # Will not appear 1582 hyphenated.critical(self.next_message()) 1583 self.assert_log_lines([ 1584 ('INFO', '4'), 1585 ('ERROR', '5'), 1586 ('INFO', '6'), 1587 ('ERROR', '7'), 1588 ], stream=output) 1589 # Original logger output is empty. 1590 self.assert_log_lines([]) 1591 1592 def test_config8_ok(self): 1593 1594 def cleanup(h1, fn): 1595 h1.close() 1596 os.remove(fn) 1597 1598 with self.check_no_resource_warning(): 1599 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 1600 os.close(fd) 1601 1602 # Replace single backslash with double backslash in windows 1603 # to avoid unicode error during string formatting 1604 if os.name == "nt": 1605 fn = fn.replace("\\", "\\\\") 1606 1607 config8 = self.config8.format(tempfile=fn) 1608 1609 self.apply_config(config8) 1610 self.apply_config(config8) 1611 1612 handler = logging.root.handlers[0] 1613 self.addCleanup(cleanup, handler, fn) 1614 1615 def test_logger_disabling(self): 1616 self.apply_config(self.disable_test) 1617 logger = logging.getLogger('some_pristine_logger') 1618 self.assertFalse(logger.disabled) 1619 self.apply_config(self.disable_test) 1620 self.assertTrue(logger.disabled) 1621 self.apply_config(self.disable_test, disable_existing_loggers=False) 1622 self.assertFalse(logger.disabled) 1623 1624 def test_config_set_handler_names(self): 1625 test_config = """ 1626 [loggers] 1627 keys=root 1628 1629 [handlers] 1630 keys=hand1 1631 1632 [formatters] 1633 keys=form1 1634 1635 [logger_root] 1636 handlers=hand1 1637 1638 [handler_hand1] 1639 class=StreamHandler 1640 formatter=form1 1641 1642 [formatter_form1] 1643 format=%(levelname)s ++ %(message)s 1644 """ 1645 self.apply_config(test_config) 1646 self.assertEqual(logging.getLogger().handlers[0].name, 'hand1') 1647 1648 def test_exception_if_confg_file_is_invalid(self): 1649 test_config = """ 1650 [loggers] 1651 keys=root 1652 1653 [handlers] 1654 keys=hand1 1655 1656 [formatters] 1657 keys=form1 1658 1659 [logger_root] 1660 handlers=hand1 1661 1662 [handler_hand1] 1663 class=StreamHandler 1664 formatter=form1 1665 1666 [formatter_form1] 1667 format=%(levelname)s ++ %(message)s 1668 1669 prince 1670 """ 1671 1672 file = io.StringIO(textwrap.dedent(test_config)) 1673 self.assertRaises(RuntimeError, logging.config.fileConfig, file) 1674 1675 def test_exception_if_confg_file_is_empty(self): 1676 fd, fn = tempfile.mkstemp(prefix='test_empty_', suffix='.ini') 1677 os.close(fd) 1678 self.assertRaises(RuntimeError, logging.config.fileConfig, fn) 1679 os.remove(fn) 1680 1681 def test_exception_if_config_file_does_not_exist(self): 1682 self.assertRaises(FileNotFoundError, logging.config.fileConfig, 'filenotfound') 1683 1684 def test_defaults_do_no_interpolation(self): 1685 """bpo-33802 defaults should not get interpolated""" 1686 ini = textwrap.dedent(""" 1687 [formatters] 1688 keys=default 1689 1690 [formatter_default] 1691 1692 [handlers] 1693 keys=console 1694 1695 [handler_console] 1696 class=logging.StreamHandler 1697 args=tuple() 1698 1699 [loggers] 1700 keys=root 1701 1702 [logger_root] 1703 formatter=default 1704 handlers=console 1705 """).strip() 1706 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini') 1707 try: 1708 os.write(fd, ini.encode('ascii')) 1709 os.close(fd) 1710 logging.config.fileConfig( 1711 fn, 1712 encoding="utf-8", 1713 defaults=dict( 1714 version=1, 1715 disable_existing_loggers=False, 1716 formatters={ 1717 "generic": { 1718 "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s", 1719 "datefmt": "[%Y-%m-%d %H:%M:%S %z]", 1720 "class": "logging.Formatter" 1721 }, 1722 }, 1723 ) 1724 ) 1725 finally: 1726 os.unlink(fn) 1727 1728 1729@support.requires_working_socket() 1730@threading_helper.requires_working_threading() 1731class SocketHandlerTest(BaseTest): 1732 1733 """Test for SocketHandler objects.""" 1734 1735 server_class = TestTCPServer 1736 address = ('localhost', 0) 1737 1738 def setUp(self): 1739 """Set up a TCP server to receive log messages, and a SocketHandler 1740 pointing to that server's address and port.""" 1741 BaseTest.setUp(self) 1742 # Issue #29177: deal with errors that happen during setup 1743 self.server = self.sock_hdlr = self.server_exception = None 1744 try: 1745 self.server = server = self.server_class(self.address, 1746 self.handle_socket, 0.01) 1747 server.start() 1748 # Uncomment next line to test error recovery in setUp() 1749 # raise OSError('dummy error raised') 1750 except OSError as e: 1751 self.server_exception = e 1752 return 1753 server.ready.wait() 1754 hcls = logging.handlers.SocketHandler 1755 if isinstance(server.server_address, tuple): 1756 self.sock_hdlr = hcls('localhost', server.port) 1757 else: 1758 self.sock_hdlr = hcls(server.server_address, None) 1759 self.log_output = '' 1760 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1761 self.root_logger.addHandler(self.sock_hdlr) 1762 self.handled = threading.Semaphore(0) 1763 1764 def tearDown(self): 1765 """Shutdown the TCP server.""" 1766 try: 1767 if self.sock_hdlr: 1768 self.root_logger.removeHandler(self.sock_hdlr) 1769 self.sock_hdlr.close() 1770 if self.server: 1771 self.server.stop() 1772 finally: 1773 BaseTest.tearDown(self) 1774 1775 def handle_socket(self, request): 1776 conn = request.connection 1777 while True: 1778 chunk = conn.recv(4) 1779 if len(chunk) < 4: 1780 break 1781 slen = struct.unpack(">L", chunk)[0] 1782 chunk = conn.recv(slen) 1783 while len(chunk) < slen: 1784 chunk = chunk + conn.recv(slen - len(chunk)) 1785 obj = pickle.loads(chunk) 1786 record = logging.makeLogRecord(obj) 1787 self.log_output += record.msg + '\n' 1788 self.handled.release() 1789 1790 def test_output(self): 1791 # The log message sent to the SocketHandler is properly received. 1792 if self.server_exception: 1793 self.skipTest(self.server_exception) 1794 logger = logging.getLogger("tcp") 1795 logger.error("spam") 1796 self.handled.acquire() 1797 logger.debug("eggs") 1798 self.handled.acquire() 1799 self.assertEqual(self.log_output, "spam\neggs\n") 1800 1801 def test_noserver(self): 1802 if self.server_exception: 1803 self.skipTest(self.server_exception) 1804 # Avoid timing-related failures due to SocketHandler's own hard-wired 1805 # one-second timeout on socket.create_connection() (issue #16264). 1806 self.sock_hdlr.retryStart = 2.5 1807 # Kill the server 1808 self.server.stop() 1809 # The logging call should try to connect, which should fail 1810 try: 1811 raise RuntimeError('Deliberate mistake') 1812 except RuntimeError: 1813 self.root_logger.exception('Never sent') 1814 self.root_logger.error('Never sent, either') 1815 now = time.time() 1816 self.assertGreater(self.sock_hdlr.retryTime, now) 1817 time.sleep(self.sock_hdlr.retryTime - now + 0.001) 1818 self.root_logger.error('Nor this') 1819 1820def _get_temp_domain_socket(): 1821 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock') 1822 os.close(fd) 1823 # just need a name - file can't be present, or we'll get an 1824 # 'address already in use' error. 1825 os.remove(fn) 1826 return fn 1827 1828@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1829class UnixSocketHandlerTest(SocketHandlerTest): 1830 1831 """Test for SocketHandler with unix sockets.""" 1832 1833 if hasattr(socket, "AF_UNIX"): 1834 server_class = TestUnixStreamServer 1835 1836 def setUp(self): 1837 # override the definition in the base class 1838 self.address = _get_temp_domain_socket() 1839 SocketHandlerTest.setUp(self) 1840 1841 def tearDown(self): 1842 SocketHandlerTest.tearDown(self) 1843 os_helper.unlink(self.address) 1844 1845@support.requires_working_socket() 1846@threading_helper.requires_working_threading() 1847class DatagramHandlerTest(BaseTest): 1848 1849 """Test for DatagramHandler.""" 1850 1851 server_class = TestUDPServer 1852 address = ('localhost', 0) 1853 1854 def setUp(self): 1855 """Set up a UDP server to receive log messages, and a DatagramHandler 1856 pointing to that server's address and port.""" 1857 BaseTest.setUp(self) 1858 # Issue #29177: deal with errors that happen during setup 1859 self.server = self.sock_hdlr = self.server_exception = None 1860 try: 1861 self.server = server = self.server_class(self.address, 1862 self.handle_datagram, 0.01) 1863 server.start() 1864 # Uncomment next line to test error recovery in setUp() 1865 # raise OSError('dummy error raised') 1866 except OSError as e: 1867 self.server_exception = e 1868 return 1869 server.ready.wait() 1870 hcls = logging.handlers.DatagramHandler 1871 if isinstance(server.server_address, tuple): 1872 self.sock_hdlr = hcls('localhost', server.port) 1873 else: 1874 self.sock_hdlr = hcls(server.server_address, None) 1875 self.log_output = '' 1876 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1877 self.root_logger.addHandler(self.sock_hdlr) 1878 self.handled = threading.Event() 1879 1880 def tearDown(self): 1881 """Shutdown the UDP server.""" 1882 try: 1883 if self.server: 1884 self.server.stop() 1885 if self.sock_hdlr: 1886 self.root_logger.removeHandler(self.sock_hdlr) 1887 self.sock_hdlr.close() 1888 finally: 1889 BaseTest.tearDown(self) 1890 1891 def handle_datagram(self, request): 1892 slen = struct.pack('>L', 0) # length of prefix 1893 packet = request.packet[len(slen):] 1894 obj = pickle.loads(packet) 1895 record = logging.makeLogRecord(obj) 1896 self.log_output += record.msg + '\n' 1897 self.handled.set() 1898 1899 def test_output(self): 1900 # The log message sent to the DatagramHandler is properly received. 1901 if self.server_exception: 1902 self.skipTest(self.server_exception) 1903 logger = logging.getLogger("udp") 1904 logger.error("spam") 1905 self.handled.wait() 1906 self.handled.clear() 1907 logger.error("eggs") 1908 self.handled.wait() 1909 self.assertEqual(self.log_output, "spam\neggs\n") 1910 1911@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1912class UnixDatagramHandlerTest(DatagramHandlerTest): 1913 1914 """Test for DatagramHandler using Unix sockets.""" 1915 1916 if hasattr(socket, "AF_UNIX"): 1917 server_class = TestUnixDatagramServer 1918 1919 def setUp(self): 1920 # override the definition in the base class 1921 self.address = _get_temp_domain_socket() 1922 DatagramHandlerTest.setUp(self) 1923 1924 def tearDown(self): 1925 DatagramHandlerTest.tearDown(self) 1926 os_helper.unlink(self.address) 1927 1928@support.requires_working_socket() 1929@threading_helper.requires_working_threading() 1930class SysLogHandlerTest(BaseTest): 1931 1932 """Test for SysLogHandler using UDP.""" 1933 1934 server_class = TestUDPServer 1935 address = ('localhost', 0) 1936 1937 def setUp(self): 1938 """Set up a UDP server to receive log messages, and a SysLogHandler 1939 pointing to that server's address and port.""" 1940 BaseTest.setUp(self) 1941 # Issue #29177: deal with errors that happen during setup 1942 self.server = self.sl_hdlr = self.server_exception = None 1943 try: 1944 self.server = server = self.server_class(self.address, 1945 self.handle_datagram, 0.01) 1946 server.start() 1947 # Uncomment next line to test error recovery in setUp() 1948 # raise OSError('dummy error raised') 1949 except OSError as e: 1950 self.server_exception = e 1951 return 1952 server.ready.wait() 1953 hcls = logging.handlers.SysLogHandler 1954 if isinstance(server.server_address, tuple): 1955 self.sl_hdlr = hcls((server.server_address[0], server.port)) 1956 else: 1957 self.sl_hdlr = hcls(server.server_address) 1958 self.log_output = b'' 1959 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1960 self.root_logger.addHandler(self.sl_hdlr) 1961 self.handled = threading.Event() 1962 1963 def tearDown(self): 1964 """Shutdown the server.""" 1965 try: 1966 if self.server: 1967 self.server.stop() 1968 if self.sl_hdlr: 1969 self.root_logger.removeHandler(self.sl_hdlr) 1970 self.sl_hdlr.close() 1971 finally: 1972 BaseTest.tearDown(self) 1973 1974 def handle_datagram(self, request): 1975 self.log_output = request.packet 1976 self.handled.set() 1977 1978 def test_output(self): 1979 if self.server_exception: 1980 self.skipTest(self.server_exception) 1981 # The log message sent to the SysLogHandler is properly received. 1982 logger = logging.getLogger("slh") 1983 logger.error("sp\xe4m") 1984 self.handled.wait() 1985 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') 1986 self.handled.clear() 1987 self.sl_hdlr.append_nul = False 1988 logger.error("sp\xe4m") 1989 self.handled.wait() 1990 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m') 1991 self.handled.clear() 1992 self.sl_hdlr.ident = "h\xe4m-" 1993 logger.error("sp\xe4m") 1994 self.handled.wait() 1995 self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m') 1996 1997 def test_udp_reconnection(self): 1998 logger = logging.getLogger("slh") 1999 self.sl_hdlr.close() 2000 self.handled.clear() 2001 logger.error("sp\xe4m") 2002 self.handled.wait(0.1) 2003 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') 2004 2005@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 2006class UnixSysLogHandlerTest(SysLogHandlerTest): 2007 2008 """Test for SysLogHandler with Unix sockets.""" 2009 2010 if hasattr(socket, "AF_UNIX"): 2011 server_class = TestUnixDatagramServer 2012 2013 def setUp(self): 2014 # override the definition in the base class 2015 self.address = _get_temp_domain_socket() 2016 SysLogHandlerTest.setUp(self) 2017 2018 def tearDown(self): 2019 SysLogHandlerTest.tearDown(self) 2020 os_helper.unlink(self.address) 2021 2022@unittest.skipUnless(socket_helper.IPV6_ENABLED, 2023 'IPv6 support required for this test.') 2024class IPv6SysLogHandlerTest(SysLogHandlerTest): 2025 2026 """Test for SysLogHandler with IPv6 host.""" 2027 2028 server_class = TestUDPServer 2029 address = ('::1', 0) 2030 2031 def setUp(self): 2032 self.server_class.address_family = socket.AF_INET6 2033 super(IPv6SysLogHandlerTest, self).setUp() 2034 2035 def tearDown(self): 2036 self.server_class.address_family = socket.AF_INET 2037 super(IPv6SysLogHandlerTest, self).tearDown() 2038 2039@support.requires_working_socket() 2040@threading_helper.requires_working_threading() 2041class HTTPHandlerTest(BaseTest): 2042 """Test for HTTPHandler.""" 2043 2044 def setUp(self): 2045 """Set up an HTTP server to receive log messages, and a HTTPHandler 2046 pointing to that server's address and port.""" 2047 BaseTest.setUp(self) 2048 self.handled = threading.Event() 2049 2050 def handle_request(self, request): 2051 self.command = request.command 2052 self.log_data = urlparse(request.path) 2053 if self.command == 'POST': 2054 try: 2055 rlen = int(request.headers['Content-Length']) 2056 self.post_data = request.rfile.read(rlen) 2057 except: 2058 self.post_data = None 2059 request.send_response(200) 2060 request.end_headers() 2061 self.handled.set() 2062 2063 def test_output(self): 2064 # The log message sent to the HTTPHandler is properly received. 2065 logger = logging.getLogger("http") 2066 root_logger = self.root_logger 2067 root_logger.removeHandler(self.root_logger.handlers[0]) 2068 for secure in (False, True): 2069 addr = ('localhost', 0) 2070 if secure: 2071 try: 2072 import ssl 2073 except ImportError: 2074 sslctx = None 2075 else: 2076 here = os.path.dirname(__file__) 2077 localhost_cert = os.path.join(here, "keycert.pem") 2078 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 2079 sslctx.load_cert_chain(localhost_cert) 2080 2081 context = ssl.create_default_context(cafile=localhost_cert) 2082 else: 2083 sslctx = None 2084 context = None 2085 self.server = server = TestHTTPServer(addr, self.handle_request, 2086 0.01, sslctx=sslctx) 2087 server.start() 2088 server.ready.wait() 2089 host = 'localhost:%d' % server.server_port 2090 secure_client = secure and sslctx 2091 self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob', 2092 secure=secure_client, 2093 context=context, 2094 credentials=('foo', 'bar')) 2095 self.log_data = None 2096 root_logger.addHandler(self.h_hdlr) 2097 2098 for method in ('GET', 'POST'): 2099 self.h_hdlr.method = method 2100 self.handled.clear() 2101 msg = "sp\xe4m" 2102 logger.error(msg) 2103 self.handled.wait() 2104 self.assertEqual(self.log_data.path, '/frob') 2105 self.assertEqual(self.command, method) 2106 if method == 'GET': 2107 d = parse_qs(self.log_data.query) 2108 else: 2109 d = parse_qs(self.post_data.decode('utf-8')) 2110 self.assertEqual(d['name'], ['http']) 2111 self.assertEqual(d['funcName'], ['test_output']) 2112 self.assertEqual(d['msg'], [msg]) 2113 2114 self.server.stop() 2115 self.root_logger.removeHandler(self.h_hdlr) 2116 self.h_hdlr.close() 2117 2118class MemoryTest(BaseTest): 2119 2120 """Test memory persistence of logger objects.""" 2121 2122 def setUp(self): 2123 """Create a dict to remember potentially destroyed objects.""" 2124 BaseTest.setUp(self) 2125 self._survivors = {} 2126 2127 def _watch_for_survival(self, *args): 2128 """Watch the given objects for survival, by creating weakrefs to 2129 them.""" 2130 for obj in args: 2131 key = id(obj), repr(obj) 2132 self._survivors[key] = weakref.ref(obj) 2133 2134 def _assertTruesurvival(self): 2135 """Assert that all objects watched for survival have survived.""" 2136 # Trigger cycle breaking. 2137 gc.collect() 2138 dead = [] 2139 for (id_, repr_), ref in self._survivors.items(): 2140 if ref() is None: 2141 dead.append(repr_) 2142 if dead: 2143 self.fail("%d objects should have survived " 2144 "but have been destroyed: %s" % (len(dead), ", ".join(dead))) 2145 2146 def test_persistent_loggers(self): 2147 # Logger objects are persistent and retain their configuration, even 2148 # if visible references are destroyed. 2149 self.root_logger.setLevel(logging.INFO) 2150 foo = logging.getLogger("foo") 2151 self._watch_for_survival(foo) 2152 foo.setLevel(logging.DEBUG) 2153 self.root_logger.debug(self.next_message()) 2154 foo.debug(self.next_message()) 2155 self.assert_log_lines([ 2156 ('foo', 'DEBUG', '2'), 2157 ]) 2158 del foo 2159 # foo has survived. 2160 self._assertTruesurvival() 2161 # foo has retained its settings. 2162 bar = logging.getLogger("foo") 2163 bar.debug(self.next_message()) 2164 self.assert_log_lines([ 2165 ('foo', 'DEBUG', '2'), 2166 ('foo', 'DEBUG', '3'), 2167 ]) 2168 2169 2170class EncodingTest(BaseTest): 2171 def test_encoding_plain_file(self): 2172 # In Python 2.x, a plain file object is treated as having no encoding. 2173 log = logging.getLogger("test") 2174 fd, fn = tempfile.mkstemp(".log", "test_logging-1-") 2175 os.close(fd) 2176 # the non-ascii data we write to the log. 2177 data = "foo\x80" 2178 try: 2179 handler = logging.FileHandler(fn, encoding="utf-8") 2180 log.addHandler(handler) 2181 try: 2182 # write non-ascii data to the log. 2183 log.warning(data) 2184 finally: 2185 log.removeHandler(handler) 2186 handler.close() 2187 # check we wrote exactly those bytes, ignoring trailing \n etc 2188 f = open(fn, encoding="utf-8") 2189 try: 2190 self.assertEqual(f.read().rstrip(), data) 2191 finally: 2192 f.close() 2193 finally: 2194 if os.path.isfile(fn): 2195 os.remove(fn) 2196 2197 def test_encoding_cyrillic_unicode(self): 2198 log = logging.getLogger("test") 2199 # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) 2200 message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' 2201 # Ensure it's written in a Cyrillic encoding 2202 writer_class = codecs.getwriter('cp1251') 2203 writer_class.encoding = 'cp1251' 2204 stream = io.BytesIO() 2205 writer = writer_class(stream, 'strict') 2206 handler = logging.StreamHandler(writer) 2207 log.addHandler(handler) 2208 try: 2209 log.warning(message) 2210 finally: 2211 log.removeHandler(handler) 2212 handler.close() 2213 # check we wrote exactly those bytes, ignoring trailing \n etc 2214 s = stream.getvalue() 2215 # Compare against what the data should be when encoded in CP-1251 2216 self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') 2217 2218 2219class WarningsTest(BaseTest): 2220 2221 def test_warnings(self): 2222 with warnings.catch_warnings(): 2223 logging.captureWarnings(True) 2224 self.addCleanup(logging.captureWarnings, False) 2225 warnings.filterwarnings("always", category=UserWarning) 2226 stream = io.StringIO() 2227 h = logging.StreamHandler(stream) 2228 logger = logging.getLogger("py.warnings") 2229 logger.addHandler(h) 2230 warnings.warn("I'm warning you...") 2231 logger.removeHandler(h) 2232 s = stream.getvalue() 2233 h.close() 2234 self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0) 2235 2236 # See if an explicit file uses the original implementation 2237 a_file = io.StringIO() 2238 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, 2239 a_file, "Dummy line") 2240 s = a_file.getvalue() 2241 a_file.close() 2242 self.assertEqual(s, 2243 "dummy.py:42: UserWarning: Explicit\n Dummy line\n") 2244 2245 def test_warnings_no_handlers(self): 2246 with warnings.catch_warnings(): 2247 logging.captureWarnings(True) 2248 self.addCleanup(logging.captureWarnings, False) 2249 2250 # confirm our assumption: no loggers are set 2251 logger = logging.getLogger("py.warnings") 2252 self.assertEqual(logger.handlers, []) 2253 2254 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42) 2255 self.assertEqual(len(logger.handlers), 1) 2256 self.assertIsInstance(logger.handlers[0], logging.NullHandler) 2257 2258 2259def formatFunc(format, datefmt=None): 2260 return logging.Formatter(format, datefmt) 2261 2262class myCustomFormatter: 2263 def __init__(self, fmt, datefmt=None): 2264 pass 2265 2266def handlerFunc(): 2267 return logging.StreamHandler() 2268 2269class CustomHandler(logging.StreamHandler): 2270 pass 2271 2272class ConfigDictTest(BaseTest): 2273 2274 """Reading logging config from a dictionary.""" 2275 2276 check_no_resource_warning = warnings_helper.check_no_resource_warning 2277 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 2278 2279 # config0 is a standard configuration. 2280 config0 = { 2281 'version': 1, 2282 'formatters': { 2283 'form1' : { 2284 'format' : '%(levelname)s ++ %(message)s', 2285 }, 2286 }, 2287 'handlers' : { 2288 'hand1' : { 2289 'class' : 'logging.StreamHandler', 2290 'formatter' : 'form1', 2291 'level' : 'NOTSET', 2292 'stream' : 'ext://sys.stdout', 2293 }, 2294 }, 2295 'root' : { 2296 'level' : 'WARNING', 2297 'handlers' : ['hand1'], 2298 }, 2299 } 2300 2301 # config1 adds a little to the standard configuration. 2302 config1 = { 2303 'version': 1, 2304 'formatters': { 2305 'form1' : { 2306 'format' : '%(levelname)s ++ %(message)s', 2307 }, 2308 }, 2309 'handlers' : { 2310 'hand1' : { 2311 'class' : 'logging.StreamHandler', 2312 'formatter' : 'form1', 2313 'level' : 'NOTSET', 2314 'stream' : 'ext://sys.stdout', 2315 }, 2316 }, 2317 'loggers' : { 2318 'compiler.parser' : { 2319 'level' : 'DEBUG', 2320 'handlers' : ['hand1'], 2321 }, 2322 }, 2323 'root' : { 2324 'level' : 'WARNING', 2325 }, 2326 } 2327 2328 # config1a moves the handler to the root. Used with config8a 2329 config1a = { 2330 'version': 1, 2331 'formatters': { 2332 'form1' : { 2333 'format' : '%(levelname)s ++ %(message)s', 2334 }, 2335 }, 2336 'handlers' : { 2337 'hand1' : { 2338 'class' : 'logging.StreamHandler', 2339 'formatter' : 'form1', 2340 'level' : 'NOTSET', 2341 'stream' : 'ext://sys.stdout', 2342 }, 2343 }, 2344 'loggers' : { 2345 'compiler.parser' : { 2346 'level' : 'DEBUG', 2347 }, 2348 }, 2349 'root' : { 2350 'level' : 'WARNING', 2351 'handlers' : ['hand1'], 2352 }, 2353 } 2354 2355 # config2 has a subtle configuration error that should be reported 2356 config2 = { 2357 'version': 1, 2358 'formatters': { 2359 'form1' : { 2360 'format' : '%(levelname)s ++ %(message)s', 2361 }, 2362 }, 2363 'handlers' : { 2364 'hand1' : { 2365 'class' : 'logging.StreamHandler', 2366 'formatter' : 'form1', 2367 'level' : 'NOTSET', 2368 'stream' : 'ext://sys.stdbout', 2369 }, 2370 }, 2371 'loggers' : { 2372 'compiler.parser' : { 2373 'level' : 'DEBUG', 2374 'handlers' : ['hand1'], 2375 }, 2376 }, 2377 'root' : { 2378 'level' : 'WARNING', 2379 }, 2380 } 2381 2382 # As config1 but with a misspelt level on a handler 2383 config2a = { 2384 'version': 1, 2385 'formatters': { 2386 'form1' : { 2387 'format' : '%(levelname)s ++ %(message)s', 2388 }, 2389 }, 2390 'handlers' : { 2391 'hand1' : { 2392 'class' : 'logging.StreamHandler', 2393 'formatter' : 'form1', 2394 'level' : 'NTOSET', 2395 'stream' : 'ext://sys.stdout', 2396 }, 2397 }, 2398 'loggers' : { 2399 'compiler.parser' : { 2400 'level' : 'DEBUG', 2401 'handlers' : ['hand1'], 2402 }, 2403 }, 2404 'root' : { 2405 'level' : 'WARNING', 2406 }, 2407 } 2408 2409 2410 # As config1 but with a misspelt level on a logger 2411 config2b = { 2412 'version': 1, 2413 'formatters': { 2414 'form1' : { 2415 'format' : '%(levelname)s ++ %(message)s', 2416 }, 2417 }, 2418 'handlers' : { 2419 'hand1' : { 2420 'class' : 'logging.StreamHandler', 2421 'formatter' : 'form1', 2422 'level' : 'NOTSET', 2423 'stream' : 'ext://sys.stdout', 2424 }, 2425 }, 2426 'loggers' : { 2427 'compiler.parser' : { 2428 'level' : 'DEBUG', 2429 'handlers' : ['hand1'], 2430 }, 2431 }, 2432 'root' : { 2433 'level' : 'WRANING', 2434 }, 2435 } 2436 2437 # config3 has a less subtle configuration error 2438 config3 = { 2439 'version': 1, 2440 'formatters': { 2441 'form1' : { 2442 'format' : '%(levelname)s ++ %(message)s', 2443 }, 2444 }, 2445 'handlers' : { 2446 'hand1' : { 2447 'class' : 'logging.StreamHandler', 2448 'formatter' : 'misspelled_name', 2449 'level' : 'NOTSET', 2450 'stream' : 'ext://sys.stdout', 2451 }, 2452 }, 2453 'loggers' : { 2454 'compiler.parser' : { 2455 'level' : 'DEBUG', 2456 'handlers' : ['hand1'], 2457 }, 2458 }, 2459 'root' : { 2460 'level' : 'WARNING', 2461 }, 2462 } 2463 2464 # config4 specifies a custom formatter class to be loaded 2465 config4 = { 2466 'version': 1, 2467 'formatters': { 2468 'form1' : { 2469 '()' : __name__ + '.ExceptionFormatter', 2470 'format' : '%(levelname)s:%(name)s:%(message)s', 2471 }, 2472 }, 2473 'handlers' : { 2474 'hand1' : { 2475 'class' : 'logging.StreamHandler', 2476 'formatter' : 'form1', 2477 'level' : 'NOTSET', 2478 'stream' : 'ext://sys.stdout', 2479 }, 2480 }, 2481 'root' : { 2482 'level' : 'NOTSET', 2483 'handlers' : ['hand1'], 2484 }, 2485 } 2486 2487 # As config4 but using an actual callable rather than a string 2488 config4a = { 2489 'version': 1, 2490 'formatters': { 2491 'form1' : { 2492 '()' : ExceptionFormatter, 2493 'format' : '%(levelname)s:%(name)s:%(message)s', 2494 }, 2495 'form2' : { 2496 '()' : __name__ + '.formatFunc', 2497 'format' : '%(levelname)s:%(name)s:%(message)s', 2498 }, 2499 'form3' : { 2500 '()' : formatFunc, 2501 'format' : '%(levelname)s:%(name)s:%(message)s', 2502 }, 2503 }, 2504 'handlers' : { 2505 'hand1' : { 2506 'class' : 'logging.StreamHandler', 2507 'formatter' : 'form1', 2508 'level' : 'NOTSET', 2509 'stream' : 'ext://sys.stdout', 2510 }, 2511 'hand2' : { 2512 '()' : handlerFunc, 2513 }, 2514 }, 2515 'root' : { 2516 'level' : 'NOTSET', 2517 'handlers' : ['hand1'], 2518 }, 2519 } 2520 2521 # config5 specifies a custom handler class to be loaded 2522 config5 = { 2523 'version': 1, 2524 'formatters': { 2525 'form1' : { 2526 'format' : '%(levelname)s ++ %(message)s', 2527 }, 2528 }, 2529 'handlers' : { 2530 'hand1' : { 2531 'class' : __name__ + '.CustomHandler', 2532 'formatter' : 'form1', 2533 'level' : 'NOTSET', 2534 'stream' : 'ext://sys.stdout', 2535 }, 2536 }, 2537 'loggers' : { 2538 'compiler.parser' : { 2539 'level' : 'DEBUG', 2540 'handlers' : ['hand1'], 2541 }, 2542 }, 2543 'root' : { 2544 'level' : 'WARNING', 2545 }, 2546 } 2547 2548 # config6 specifies a custom handler class to be loaded 2549 # but has bad arguments 2550 config6 = { 2551 'version': 1, 2552 'formatters': { 2553 'form1' : { 2554 'format' : '%(levelname)s ++ %(message)s', 2555 }, 2556 }, 2557 'handlers' : { 2558 'hand1' : { 2559 'class' : __name__ + '.CustomHandler', 2560 'formatter' : 'form1', 2561 'level' : 'NOTSET', 2562 'stream' : 'ext://sys.stdout', 2563 '9' : 'invalid parameter name', 2564 }, 2565 }, 2566 'loggers' : { 2567 'compiler.parser' : { 2568 'level' : 'DEBUG', 2569 'handlers' : ['hand1'], 2570 }, 2571 }, 2572 'root' : { 2573 'level' : 'WARNING', 2574 }, 2575 } 2576 2577 # config 7 does not define compiler.parser but defines compiler.lexer 2578 # so compiler.parser should be disabled after applying it 2579 config7 = { 2580 'version': 1, 2581 'formatters': { 2582 'form1' : { 2583 'format' : '%(levelname)s ++ %(message)s', 2584 }, 2585 }, 2586 'handlers' : { 2587 'hand1' : { 2588 'class' : 'logging.StreamHandler', 2589 'formatter' : 'form1', 2590 'level' : 'NOTSET', 2591 'stream' : 'ext://sys.stdout', 2592 }, 2593 }, 2594 'loggers' : { 2595 'compiler.lexer' : { 2596 'level' : 'DEBUG', 2597 'handlers' : ['hand1'], 2598 }, 2599 }, 2600 'root' : { 2601 'level' : 'WARNING', 2602 }, 2603 } 2604 2605 # config8 defines both compiler and compiler.lexer 2606 # so compiler.parser should not be disabled (since 2607 # compiler is defined) 2608 config8 = { 2609 'version': 1, 2610 'disable_existing_loggers' : False, 2611 'formatters': { 2612 'form1' : { 2613 'format' : '%(levelname)s ++ %(message)s', 2614 }, 2615 }, 2616 'handlers' : { 2617 'hand1' : { 2618 'class' : 'logging.StreamHandler', 2619 'formatter' : 'form1', 2620 'level' : 'NOTSET', 2621 'stream' : 'ext://sys.stdout', 2622 }, 2623 }, 2624 'loggers' : { 2625 'compiler' : { 2626 'level' : 'DEBUG', 2627 'handlers' : ['hand1'], 2628 }, 2629 'compiler.lexer' : { 2630 }, 2631 }, 2632 'root' : { 2633 'level' : 'WARNING', 2634 }, 2635 } 2636 2637 # config8a disables existing loggers 2638 config8a = { 2639 'version': 1, 2640 'disable_existing_loggers' : True, 2641 'formatters': { 2642 'form1' : { 2643 'format' : '%(levelname)s ++ %(message)s', 2644 }, 2645 }, 2646 'handlers' : { 2647 'hand1' : { 2648 'class' : 'logging.StreamHandler', 2649 'formatter' : 'form1', 2650 'level' : 'NOTSET', 2651 'stream' : 'ext://sys.stdout', 2652 }, 2653 }, 2654 'loggers' : { 2655 'compiler' : { 2656 'level' : 'DEBUG', 2657 'handlers' : ['hand1'], 2658 }, 2659 'compiler.lexer' : { 2660 }, 2661 }, 2662 'root' : { 2663 'level' : 'WARNING', 2664 }, 2665 } 2666 2667 config9 = { 2668 'version': 1, 2669 'formatters': { 2670 'form1' : { 2671 'format' : '%(levelname)s ++ %(message)s', 2672 }, 2673 }, 2674 'handlers' : { 2675 'hand1' : { 2676 'class' : 'logging.StreamHandler', 2677 'formatter' : 'form1', 2678 'level' : 'WARNING', 2679 'stream' : 'ext://sys.stdout', 2680 }, 2681 }, 2682 'loggers' : { 2683 'compiler.parser' : { 2684 'level' : 'WARNING', 2685 'handlers' : ['hand1'], 2686 }, 2687 }, 2688 'root' : { 2689 'level' : 'NOTSET', 2690 }, 2691 } 2692 2693 config9a = { 2694 'version': 1, 2695 'incremental' : True, 2696 'handlers' : { 2697 'hand1' : { 2698 'level' : 'WARNING', 2699 }, 2700 }, 2701 'loggers' : { 2702 'compiler.parser' : { 2703 'level' : 'INFO', 2704 }, 2705 }, 2706 } 2707 2708 config9b = { 2709 'version': 1, 2710 'incremental' : True, 2711 'handlers' : { 2712 'hand1' : { 2713 'level' : 'INFO', 2714 }, 2715 }, 2716 'loggers' : { 2717 'compiler.parser' : { 2718 'level' : 'INFO', 2719 }, 2720 }, 2721 } 2722 2723 # As config1 but with a filter added 2724 config10 = { 2725 'version': 1, 2726 'formatters': { 2727 'form1' : { 2728 'format' : '%(levelname)s ++ %(message)s', 2729 }, 2730 }, 2731 'filters' : { 2732 'filt1' : { 2733 'name' : 'compiler.parser', 2734 }, 2735 }, 2736 'handlers' : { 2737 'hand1' : { 2738 'class' : 'logging.StreamHandler', 2739 'formatter' : 'form1', 2740 'level' : 'NOTSET', 2741 'stream' : 'ext://sys.stdout', 2742 'filters' : ['filt1'], 2743 }, 2744 }, 2745 'loggers' : { 2746 'compiler.parser' : { 2747 'level' : 'DEBUG', 2748 'filters' : ['filt1'], 2749 }, 2750 }, 2751 'root' : { 2752 'level' : 'WARNING', 2753 'handlers' : ['hand1'], 2754 }, 2755 } 2756 2757 # As config1 but using cfg:// references 2758 config11 = { 2759 'version': 1, 2760 'true_formatters': { 2761 'form1' : { 2762 'format' : '%(levelname)s ++ %(message)s', 2763 }, 2764 }, 2765 'handler_configs': { 2766 'hand1' : { 2767 'class' : 'logging.StreamHandler', 2768 'formatter' : 'form1', 2769 'level' : 'NOTSET', 2770 'stream' : 'ext://sys.stdout', 2771 }, 2772 }, 2773 'formatters' : 'cfg://true_formatters', 2774 'handlers' : { 2775 'hand1' : 'cfg://handler_configs[hand1]', 2776 }, 2777 'loggers' : { 2778 'compiler.parser' : { 2779 'level' : 'DEBUG', 2780 'handlers' : ['hand1'], 2781 }, 2782 }, 2783 'root' : { 2784 'level' : 'WARNING', 2785 }, 2786 } 2787 2788 # As config11 but missing the version key 2789 config12 = { 2790 'true_formatters': { 2791 'form1' : { 2792 'format' : '%(levelname)s ++ %(message)s', 2793 }, 2794 }, 2795 'handler_configs': { 2796 'hand1' : { 2797 'class' : 'logging.StreamHandler', 2798 'formatter' : 'form1', 2799 'level' : 'NOTSET', 2800 'stream' : 'ext://sys.stdout', 2801 }, 2802 }, 2803 'formatters' : 'cfg://true_formatters', 2804 'handlers' : { 2805 'hand1' : 'cfg://handler_configs[hand1]', 2806 }, 2807 'loggers' : { 2808 'compiler.parser' : { 2809 'level' : 'DEBUG', 2810 'handlers' : ['hand1'], 2811 }, 2812 }, 2813 'root' : { 2814 'level' : 'WARNING', 2815 }, 2816 } 2817 2818 # As config11 but using an unsupported version 2819 config13 = { 2820 'version': 2, 2821 'true_formatters': { 2822 'form1' : { 2823 'format' : '%(levelname)s ++ %(message)s', 2824 }, 2825 }, 2826 'handler_configs': { 2827 'hand1' : { 2828 'class' : 'logging.StreamHandler', 2829 'formatter' : 'form1', 2830 'level' : 'NOTSET', 2831 'stream' : 'ext://sys.stdout', 2832 }, 2833 }, 2834 'formatters' : 'cfg://true_formatters', 2835 'handlers' : { 2836 'hand1' : 'cfg://handler_configs[hand1]', 2837 }, 2838 'loggers' : { 2839 'compiler.parser' : { 2840 'level' : 'DEBUG', 2841 'handlers' : ['hand1'], 2842 }, 2843 }, 2844 'root' : { 2845 'level' : 'WARNING', 2846 }, 2847 } 2848 2849 # As config0, but with properties 2850 config14 = { 2851 'version': 1, 2852 'formatters': { 2853 'form1' : { 2854 'format' : '%(levelname)s ++ %(message)s', 2855 }, 2856 }, 2857 'handlers' : { 2858 'hand1' : { 2859 'class' : 'logging.StreamHandler', 2860 'formatter' : 'form1', 2861 'level' : 'NOTSET', 2862 'stream' : 'ext://sys.stdout', 2863 '.': { 2864 'foo': 'bar', 2865 'terminator': '!\n', 2866 } 2867 }, 2868 }, 2869 'root' : { 2870 'level' : 'WARNING', 2871 'handlers' : ['hand1'], 2872 }, 2873 } 2874 2875 out_of_order = { 2876 "version": 1, 2877 "formatters": { 2878 "mySimpleFormatter": { 2879 "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s", 2880 "style": "$" 2881 } 2882 }, 2883 "handlers": { 2884 "fileGlobal": { 2885 "class": "logging.StreamHandler", 2886 "level": "DEBUG", 2887 "formatter": "mySimpleFormatter" 2888 }, 2889 "bufferGlobal": { 2890 "class": "logging.handlers.MemoryHandler", 2891 "capacity": 5, 2892 "formatter": "mySimpleFormatter", 2893 "target": "fileGlobal", 2894 "level": "DEBUG" 2895 } 2896 }, 2897 "loggers": { 2898 "mymodule": { 2899 "level": "DEBUG", 2900 "handlers": ["bufferGlobal"], 2901 "propagate": "true" 2902 } 2903 } 2904 } 2905 2906 # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False 2907 custom_formatter_class_validate = { 2908 'version': 1, 2909 'formatters': { 2910 'form1': { 2911 '()': __name__ + '.ExceptionFormatter', 2912 'format': '%(levelname)s:%(name)s:%(message)s', 2913 'validate': False, 2914 }, 2915 }, 2916 'handlers' : { 2917 'hand1' : { 2918 'class': 'logging.StreamHandler', 2919 'formatter': 'form1', 2920 'level': 'NOTSET', 2921 'stream': 'ext://sys.stdout', 2922 }, 2923 }, 2924 "loggers": { 2925 "my_test_logger_custom_formatter": { 2926 "level": "DEBUG", 2927 "handlers": ["hand1"], 2928 "propagate": "true" 2929 } 2930 } 2931 } 2932 2933 # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False 2934 custom_formatter_class_validate2 = { 2935 'version': 1, 2936 'formatters': { 2937 'form1': { 2938 'class': __name__ + '.ExceptionFormatter', 2939 'format': '%(levelname)s:%(name)s:%(message)s', 2940 'validate': False, 2941 }, 2942 }, 2943 'handlers' : { 2944 'hand1' : { 2945 'class': 'logging.StreamHandler', 2946 'formatter': 'form1', 2947 'level': 'NOTSET', 2948 'stream': 'ext://sys.stdout', 2949 }, 2950 }, 2951 "loggers": { 2952 "my_test_logger_custom_formatter": { 2953 "level": "DEBUG", 2954 "handlers": ["hand1"], 2955 "propagate": "true" 2956 } 2957 } 2958 } 2959 2960 # Configuration with custom class that is not inherited from logging.Formatter 2961 custom_formatter_class_validate3 = { 2962 'version': 1, 2963 'formatters': { 2964 'form1': { 2965 'class': __name__ + '.myCustomFormatter', 2966 'format': '%(levelname)s:%(name)s:%(message)s', 2967 'validate': False, 2968 }, 2969 }, 2970 'handlers' : { 2971 'hand1' : { 2972 'class': 'logging.StreamHandler', 2973 'formatter': 'form1', 2974 'level': 'NOTSET', 2975 'stream': 'ext://sys.stdout', 2976 }, 2977 }, 2978 "loggers": { 2979 "my_test_logger_custom_formatter": { 2980 "level": "DEBUG", 2981 "handlers": ["hand1"], 2982 "propagate": "true" 2983 } 2984 } 2985 } 2986 2987 # Configuration with custom function and 'validate' set to False 2988 custom_formatter_with_function = { 2989 'version': 1, 2990 'formatters': { 2991 'form1': { 2992 '()': formatFunc, 2993 'format': '%(levelname)s:%(name)s:%(message)s', 2994 'validate': False, 2995 }, 2996 }, 2997 'handlers' : { 2998 'hand1' : { 2999 'class': 'logging.StreamHandler', 3000 'formatter': 'form1', 3001 'level': 'NOTSET', 3002 'stream': 'ext://sys.stdout', 3003 }, 3004 }, 3005 "loggers": { 3006 "my_test_logger_custom_formatter": { 3007 "level": "DEBUG", 3008 "handlers": ["hand1"], 3009 "propagate": "true" 3010 } 3011 } 3012 } 3013 3014 def apply_config(self, conf): 3015 logging.config.dictConfig(conf) 3016 3017 def test_config0_ok(self): 3018 # A simple config which overrides the default settings. 3019 with support.captured_stdout() as output: 3020 self.apply_config(self.config0) 3021 logger = logging.getLogger() 3022 # Won't output anything 3023 logger.info(self.next_message()) 3024 # Outputs a message 3025 logger.error(self.next_message()) 3026 self.assert_log_lines([ 3027 ('ERROR', '2'), 3028 ], stream=output) 3029 # Original logger output is empty. 3030 self.assert_log_lines([]) 3031 3032 def test_config1_ok(self, config=config1): 3033 # A config defining a sub-parser as well. 3034 with support.captured_stdout() as output: 3035 self.apply_config(config) 3036 logger = logging.getLogger("compiler.parser") 3037 # Both will output a message 3038 logger.info(self.next_message()) 3039 logger.error(self.next_message()) 3040 self.assert_log_lines([ 3041 ('INFO', '1'), 3042 ('ERROR', '2'), 3043 ], stream=output) 3044 # Original logger output is empty. 3045 self.assert_log_lines([]) 3046 3047 def test_config2_failure(self): 3048 # A simple config which overrides the default settings. 3049 self.assertRaises(Exception, self.apply_config, self.config2) 3050 3051 def test_config2a_failure(self): 3052 # A simple config which overrides the default settings. 3053 self.assertRaises(Exception, self.apply_config, self.config2a) 3054 3055 def test_config2b_failure(self): 3056 # A simple config which overrides the default settings. 3057 self.assertRaises(Exception, self.apply_config, self.config2b) 3058 3059 def test_config3_failure(self): 3060 # A simple config which overrides the default settings. 3061 self.assertRaises(Exception, self.apply_config, self.config3) 3062 3063 def test_config4_ok(self): 3064 # A config specifying a custom formatter class. 3065 with support.captured_stdout() as output: 3066 self.apply_config(self.config4) 3067 #logger = logging.getLogger() 3068 try: 3069 raise RuntimeError() 3070 except RuntimeError: 3071 logging.exception("just testing") 3072 sys.stdout.seek(0) 3073 self.assertEqual(output.getvalue(), 3074 "ERROR:root:just testing\nGot a [RuntimeError]\n") 3075 # Original logger output is empty 3076 self.assert_log_lines([]) 3077 3078 def test_config4a_ok(self): 3079 # A config specifying a custom formatter class. 3080 with support.captured_stdout() as output: 3081 self.apply_config(self.config4a) 3082 #logger = logging.getLogger() 3083 try: 3084 raise RuntimeError() 3085 except RuntimeError: 3086 logging.exception("just testing") 3087 sys.stdout.seek(0) 3088 self.assertEqual(output.getvalue(), 3089 "ERROR:root:just testing\nGot a [RuntimeError]\n") 3090 # Original logger output is empty 3091 self.assert_log_lines([]) 3092 3093 def test_config5_ok(self): 3094 self.test_config1_ok(config=self.config5) 3095 3096 def test_config6_failure(self): 3097 self.assertRaises(Exception, self.apply_config, self.config6) 3098 3099 def test_config7_ok(self): 3100 with support.captured_stdout() as output: 3101 self.apply_config(self.config1) 3102 logger = logging.getLogger("compiler.parser") 3103 # Both will output a message 3104 logger.info(self.next_message()) 3105 logger.error(self.next_message()) 3106 self.assert_log_lines([ 3107 ('INFO', '1'), 3108 ('ERROR', '2'), 3109 ], stream=output) 3110 # Original logger output is empty. 3111 self.assert_log_lines([]) 3112 with support.captured_stdout() as output: 3113 self.apply_config(self.config7) 3114 logger = logging.getLogger("compiler.parser") 3115 self.assertTrue(logger.disabled) 3116 logger = logging.getLogger("compiler.lexer") 3117 # Both will output a message 3118 logger.info(self.next_message()) 3119 logger.error(self.next_message()) 3120 self.assert_log_lines([ 3121 ('INFO', '3'), 3122 ('ERROR', '4'), 3123 ], stream=output) 3124 # Original logger output is empty. 3125 self.assert_log_lines([]) 3126 3127 # Same as test_config_7_ok but don't disable old loggers. 3128 def test_config_8_ok(self): 3129 with support.captured_stdout() as output: 3130 self.apply_config(self.config1) 3131 logger = logging.getLogger("compiler.parser") 3132 # All will output a message 3133 logger.info(self.next_message()) 3134 logger.error(self.next_message()) 3135 self.assert_log_lines([ 3136 ('INFO', '1'), 3137 ('ERROR', '2'), 3138 ], stream=output) 3139 # Original logger output is empty. 3140 self.assert_log_lines([]) 3141 with support.captured_stdout() as output: 3142 self.apply_config(self.config8) 3143 logger = logging.getLogger("compiler.parser") 3144 self.assertFalse(logger.disabled) 3145 # Both will output a message 3146 logger.info(self.next_message()) 3147 logger.error(self.next_message()) 3148 logger = logging.getLogger("compiler.lexer") 3149 # Both will output a message 3150 logger.info(self.next_message()) 3151 logger.error(self.next_message()) 3152 self.assert_log_lines([ 3153 ('INFO', '3'), 3154 ('ERROR', '4'), 3155 ('INFO', '5'), 3156 ('ERROR', '6'), 3157 ], stream=output) 3158 # Original logger output is empty. 3159 self.assert_log_lines([]) 3160 3161 def test_config_8a_ok(self): 3162 with support.captured_stdout() as output: 3163 self.apply_config(self.config1a) 3164 logger = logging.getLogger("compiler.parser") 3165 # See issue #11424. compiler-hyphenated sorts 3166 # between compiler and compiler.xyz and this 3167 # was preventing compiler.xyz from being included 3168 # in the child loggers of compiler because of an 3169 # overzealous loop termination condition. 3170 hyphenated = logging.getLogger('compiler-hyphenated') 3171 # All will output a message 3172 logger.info(self.next_message()) 3173 logger.error(self.next_message()) 3174 hyphenated.critical(self.next_message()) 3175 self.assert_log_lines([ 3176 ('INFO', '1'), 3177 ('ERROR', '2'), 3178 ('CRITICAL', '3'), 3179 ], stream=output) 3180 # Original logger output is empty. 3181 self.assert_log_lines([]) 3182 with support.captured_stdout() as output: 3183 self.apply_config(self.config8a) 3184 logger = logging.getLogger("compiler.parser") 3185 self.assertFalse(logger.disabled) 3186 # Both will output a message 3187 logger.info(self.next_message()) 3188 logger.error(self.next_message()) 3189 logger = logging.getLogger("compiler.lexer") 3190 # Both will output a message 3191 logger.info(self.next_message()) 3192 logger.error(self.next_message()) 3193 # Will not appear 3194 hyphenated.critical(self.next_message()) 3195 self.assert_log_lines([ 3196 ('INFO', '4'), 3197 ('ERROR', '5'), 3198 ('INFO', '6'), 3199 ('ERROR', '7'), 3200 ], stream=output) 3201 # Original logger output is empty. 3202 self.assert_log_lines([]) 3203 3204 def test_config_9_ok(self): 3205 with support.captured_stdout() as output: 3206 self.apply_config(self.config9) 3207 logger = logging.getLogger("compiler.parser") 3208 # Nothing will be output since both handler and logger are set to WARNING 3209 logger.info(self.next_message()) 3210 self.assert_log_lines([], stream=output) 3211 self.apply_config(self.config9a) 3212 # Nothing will be output since handler is still set to WARNING 3213 logger.info(self.next_message()) 3214 self.assert_log_lines([], stream=output) 3215 self.apply_config(self.config9b) 3216 # Message should now be output 3217 logger.info(self.next_message()) 3218 self.assert_log_lines([ 3219 ('INFO', '3'), 3220 ], stream=output) 3221 3222 def test_config_10_ok(self): 3223 with support.captured_stdout() as output: 3224 self.apply_config(self.config10) 3225 logger = logging.getLogger("compiler.parser") 3226 logger.warning(self.next_message()) 3227 logger = logging.getLogger('compiler') 3228 # Not output, because filtered 3229 logger.warning(self.next_message()) 3230 logger = logging.getLogger('compiler.lexer') 3231 # Not output, because filtered 3232 logger.warning(self.next_message()) 3233 logger = logging.getLogger("compiler.parser.codegen") 3234 # Output, as not filtered 3235 logger.error(self.next_message()) 3236 self.assert_log_lines([ 3237 ('WARNING', '1'), 3238 ('ERROR', '4'), 3239 ], stream=output) 3240 3241 def test_config11_ok(self): 3242 self.test_config1_ok(self.config11) 3243 3244 def test_config12_failure(self): 3245 self.assertRaises(Exception, self.apply_config, self.config12) 3246 3247 def test_config13_failure(self): 3248 self.assertRaises(Exception, self.apply_config, self.config13) 3249 3250 def test_config14_ok(self): 3251 with support.captured_stdout() as output: 3252 self.apply_config(self.config14) 3253 h = logging._handlers['hand1'] 3254 self.assertEqual(h.foo, 'bar') 3255 self.assertEqual(h.terminator, '!\n') 3256 logging.warning('Exclamation') 3257 self.assertTrue(output.getvalue().endswith('Exclamation!\n')) 3258 3259 def test_config15_ok(self): 3260 3261 def cleanup(h1, fn): 3262 h1.close() 3263 os.remove(fn) 3264 3265 with self.check_no_resource_warning(): 3266 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 3267 os.close(fd) 3268 3269 config = { 3270 "version": 1, 3271 "handlers": { 3272 "file": { 3273 "class": "logging.FileHandler", 3274 "filename": fn, 3275 "encoding": "utf-8", 3276 } 3277 }, 3278 "root": { 3279 "handlers": ["file"] 3280 } 3281 } 3282 3283 self.apply_config(config) 3284 self.apply_config(config) 3285 3286 handler = logging.root.handlers[0] 3287 self.addCleanup(cleanup, handler, fn) 3288 3289 def setup_via_listener(self, text, verify=None): 3290 text = text.encode("utf-8") 3291 # Ask for a randomly assigned port (by using port 0) 3292 t = logging.config.listen(0, verify) 3293 t.start() 3294 t.ready.wait() 3295 # Now get the port allocated 3296 port = t.port 3297 t.ready.clear() 3298 try: 3299 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 3300 sock.settimeout(2.0) 3301 sock.connect(('localhost', port)) 3302 3303 slen = struct.pack('>L', len(text)) 3304 s = slen + text 3305 sentsofar = 0 3306 left = len(s) 3307 while left > 0: 3308 sent = sock.send(s[sentsofar:]) 3309 sentsofar += sent 3310 left -= sent 3311 sock.close() 3312 finally: 3313 t.ready.wait(2.0) 3314 logging.config.stopListening() 3315 threading_helper.join_thread(t) 3316 3317 @support.requires_working_socket() 3318 def test_listen_config_10_ok(self): 3319 with support.captured_stdout() as output: 3320 self.setup_via_listener(json.dumps(self.config10)) 3321 logger = logging.getLogger("compiler.parser") 3322 logger.warning(self.next_message()) 3323 logger = logging.getLogger('compiler') 3324 # Not output, because filtered 3325 logger.warning(self.next_message()) 3326 logger = logging.getLogger('compiler.lexer') 3327 # Not output, because filtered 3328 logger.warning(self.next_message()) 3329 logger = logging.getLogger("compiler.parser.codegen") 3330 # Output, as not filtered 3331 logger.error(self.next_message()) 3332 self.assert_log_lines([ 3333 ('WARNING', '1'), 3334 ('ERROR', '4'), 3335 ], stream=output) 3336 3337 @support.requires_working_socket() 3338 def test_listen_config_1_ok(self): 3339 with support.captured_stdout() as output: 3340 self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) 3341 logger = logging.getLogger("compiler.parser") 3342 # Both will output a message 3343 logger.info(self.next_message()) 3344 logger.error(self.next_message()) 3345 self.assert_log_lines([ 3346 ('INFO', '1'), 3347 ('ERROR', '2'), 3348 ], stream=output) 3349 # Original logger output is empty. 3350 self.assert_log_lines([]) 3351 3352 @support.requires_working_socket() 3353 def test_listen_verify(self): 3354 3355 def verify_fail(stuff): 3356 return None 3357 3358 def verify_reverse(stuff): 3359 return stuff[::-1] 3360 3361 logger = logging.getLogger("compiler.parser") 3362 to_send = textwrap.dedent(ConfigFileTest.config1) 3363 # First, specify a verification function that will fail. 3364 # We expect to see no output, since our configuration 3365 # never took effect. 3366 with support.captured_stdout() as output: 3367 self.setup_via_listener(to_send, verify_fail) 3368 # Both will output a message 3369 logger.info(self.next_message()) 3370 logger.error(self.next_message()) 3371 self.assert_log_lines([], stream=output) 3372 # Original logger output has the stuff we logged. 3373 self.assert_log_lines([ 3374 ('INFO', '1'), 3375 ('ERROR', '2'), 3376 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3377 3378 # Now, perform no verification. Our configuration 3379 # should take effect. 3380 3381 with support.captured_stdout() as output: 3382 self.setup_via_listener(to_send) # no verify callable specified 3383 logger = logging.getLogger("compiler.parser") 3384 # Both will output a message 3385 logger.info(self.next_message()) 3386 logger.error(self.next_message()) 3387 self.assert_log_lines([ 3388 ('INFO', '3'), 3389 ('ERROR', '4'), 3390 ], stream=output) 3391 # Original logger output still has the stuff we logged before. 3392 self.assert_log_lines([ 3393 ('INFO', '1'), 3394 ('ERROR', '2'), 3395 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3396 3397 # Now, perform verification which transforms the bytes. 3398 3399 with support.captured_stdout() as output: 3400 self.setup_via_listener(to_send[::-1], verify_reverse) 3401 logger = logging.getLogger("compiler.parser") 3402 # Both will output a message 3403 logger.info(self.next_message()) 3404 logger.error(self.next_message()) 3405 self.assert_log_lines([ 3406 ('INFO', '5'), 3407 ('ERROR', '6'), 3408 ], stream=output) 3409 # Original logger output still has the stuff we logged before. 3410 self.assert_log_lines([ 3411 ('INFO', '1'), 3412 ('ERROR', '2'), 3413 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3414 3415 def test_out_of_order(self): 3416 self.assertRaises(ValueError, self.apply_config, self.out_of_order) 3417 3418 def test_out_of_order_with_dollar_style(self): 3419 config = copy.deepcopy(self.out_of_order) 3420 config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}" 3421 3422 self.apply_config(config) 3423 handler = logging.getLogger('mymodule').handlers[0] 3424 self.assertIsInstance(handler.target, logging.Handler) 3425 self.assertIsInstance(handler.formatter._style, 3426 logging.StringTemplateStyle) 3427 3428 def test_custom_formatter_class_with_validate(self): 3429 self.apply_config(self.custom_formatter_class_validate) 3430 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3431 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3432 3433 def test_custom_formatter_class_with_validate2(self): 3434 self.apply_config(self.custom_formatter_class_validate2) 3435 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3436 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3437 3438 def test_custom_formatter_class_with_validate2_with_wrong_fmt(self): 3439 config = self.custom_formatter_class_validate.copy() 3440 config['formatters']['form1']['style'] = "$" 3441 3442 # Exception should not be raise as we have configured 'validate' to False 3443 self.apply_config(config) 3444 handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0] 3445 self.assertIsInstance(handler.formatter, ExceptionFormatter) 3446 3447 def test_custom_formatter_class_with_validate3(self): 3448 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3) 3449 3450 def test_custom_formatter_function_with_validate(self): 3451 self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function) 3452 3453 def test_baseconfig(self): 3454 d = { 3455 'atuple': (1, 2, 3), 3456 'alist': ['a', 'b', 'c'], 3457 'adict': {'d': 'e', 'f': 3 }, 3458 'nest1': ('g', ('h', 'i'), 'j'), 3459 'nest2': ['k', ['l', 'm'], 'n'], 3460 'nest3': ['o', 'cfg://alist', 'p'], 3461 } 3462 bc = logging.config.BaseConfigurator(d) 3463 self.assertEqual(bc.convert('cfg://atuple[1]'), 2) 3464 self.assertEqual(bc.convert('cfg://alist[1]'), 'b') 3465 self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h') 3466 self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm') 3467 self.assertEqual(bc.convert('cfg://adict.d'), 'e') 3468 self.assertEqual(bc.convert('cfg://adict[f]'), 3) 3469 v = bc.convert('cfg://nest3') 3470 self.assertEqual(v.pop(1), ['a', 'b', 'c']) 3471 self.assertRaises(KeyError, bc.convert, 'cfg://nosuch') 3472 self.assertRaises(ValueError, bc.convert, 'cfg://!') 3473 self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]') 3474 3475 def test_namedtuple(self): 3476 # see bpo-39142 3477 from collections import namedtuple 3478 3479 class MyHandler(logging.StreamHandler): 3480 def __init__(self, resource, *args, **kwargs): 3481 super().__init__(*args, **kwargs) 3482 self.resource: namedtuple = resource 3483 3484 def emit(self, record): 3485 record.msg += f' {self.resource.type}' 3486 return super().emit(record) 3487 3488 Resource = namedtuple('Resource', ['type', 'labels']) 3489 resource = Resource(type='my_type', labels=['a']) 3490 3491 config = { 3492 'version': 1, 3493 'handlers': { 3494 'myhandler': { 3495 '()': MyHandler, 3496 'resource': resource 3497 } 3498 }, 3499 'root': {'level': 'INFO', 'handlers': ['myhandler']}, 3500 } 3501 with support.captured_stderr() as stderr: 3502 self.apply_config(config) 3503 logging.info('some log') 3504 self.assertEqual(stderr.getvalue(), 'some log my_type\n') 3505 3506 def test_config_callable_filter_works(self): 3507 def filter_(_): 3508 return 1 3509 self.apply_config({ 3510 "version": 1, "root": {"level": "DEBUG", "filters": [filter_]} 3511 }) 3512 assert logging.getLogger().filters[0] is filter_ 3513 logging.getLogger().filters = [] 3514 3515 def test_config_filter_works(self): 3516 filter_ = logging.Filter("spam.eggs") 3517 self.apply_config({ 3518 "version": 1, "root": {"level": "DEBUG", "filters": [filter_]} 3519 }) 3520 assert logging.getLogger().filters[0] is filter_ 3521 logging.getLogger().filters = [] 3522 3523 def test_config_filter_method_works(self): 3524 class FakeFilter: 3525 def filter(self, _): 3526 return 1 3527 filter_ = FakeFilter() 3528 self.apply_config({ 3529 "version": 1, "root": {"level": "DEBUG", "filters": [filter_]} 3530 }) 3531 assert logging.getLogger().filters[0] is filter_ 3532 logging.getLogger().filters = [] 3533 3534 def test_invalid_type_raises(self): 3535 class NotAFilter: pass 3536 for filter_ in [None, 1, NotAFilter()]: 3537 self.assertRaises( 3538 ValueError, 3539 self.apply_config, 3540 {"version": 1, "root": {"level": "DEBUG", "filters": [filter_]}} 3541 ) 3542 3543 def test_90195(self): 3544 # See gh-90195 3545 config = { 3546 'version': 1, 3547 'disable_existing_loggers': False, 3548 'handlers': { 3549 'console': { 3550 'level': 'DEBUG', 3551 'class': 'logging.StreamHandler', 3552 }, 3553 }, 3554 'loggers': { 3555 'a': { 3556 'level': 'DEBUG', 3557 'handlers': ['console'] 3558 } 3559 } 3560 } 3561 logger = logging.getLogger('a') 3562 self.assertFalse(logger.disabled) 3563 self.apply_config(config) 3564 self.assertFalse(logger.disabled) 3565 # Should disable all loggers ... 3566 self.apply_config({'version': 1}) 3567 self.assertTrue(logger.disabled) 3568 del config['disable_existing_loggers'] 3569 self.apply_config(config) 3570 # Logger should be enabled, since explicitly mentioned 3571 self.assertFalse(logger.disabled) 3572 3573class ManagerTest(BaseTest): 3574 def test_manager_loggerclass(self): 3575 logged = [] 3576 3577 class MyLogger(logging.Logger): 3578 def _log(self, level, msg, args, exc_info=None, extra=None): 3579 logged.append(msg) 3580 3581 man = logging.Manager(None) 3582 self.assertRaises(TypeError, man.setLoggerClass, int) 3583 man.setLoggerClass(MyLogger) 3584 logger = man.getLogger('test') 3585 logger.warning('should appear in logged') 3586 logging.warning('should not appear in logged') 3587 3588 self.assertEqual(logged, ['should appear in logged']) 3589 3590 def test_set_log_record_factory(self): 3591 man = logging.Manager(None) 3592 expected = object() 3593 man.setLogRecordFactory(expected) 3594 self.assertEqual(man.logRecordFactory, expected) 3595 3596class ChildLoggerTest(BaseTest): 3597 def test_child_loggers(self): 3598 r = logging.getLogger() 3599 l1 = logging.getLogger('abc') 3600 l2 = logging.getLogger('def.ghi') 3601 c1 = r.getChild('xyz') 3602 c2 = r.getChild('uvw.xyz') 3603 self.assertIs(c1, logging.getLogger('xyz')) 3604 self.assertIs(c2, logging.getLogger('uvw.xyz')) 3605 c1 = l1.getChild('def') 3606 c2 = c1.getChild('ghi') 3607 c3 = l1.getChild('def.ghi') 3608 self.assertIs(c1, logging.getLogger('abc.def')) 3609 self.assertIs(c2, logging.getLogger('abc.def.ghi')) 3610 self.assertIs(c2, c3) 3611 3612 3613class DerivedLogRecord(logging.LogRecord): 3614 pass 3615 3616class LogRecordFactoryTest(BaseTest): 3617 3618 def setUp(self): 3619 class CheckingFilter(logging.Filter): 3620 def __init__(self, cls): 3621 self.cls = cls 3622 3623 def filter(self, record): 3624 t = type(record) 3625 if t is not self.cls: 3626 msg = 'Unexpected LogRecord type %s, expected %s' % (t, 3627 self.cls) 3628 raise TypeError(msg) 3629 return True 3630 3631 BaseTest.setUp(self) 3632 self.filter = CheckingFilter(DerivedLogRecord) 3633 self.root_logger.addFilter(self.filter) 3634 self.orig_factory = logging.getLogRecordFactory() 3635 3636 def tearDown(self): 3637 self.root_logger.removeFilter(self.filter) 3638 BaseTest.tearDown(self) 3639 logging.setLogRecordFactory(self.orig_factory) 3640 3641 def test_logrecord_class(self): 3642 self.assertRaises(TypeError, self.root_logger.warning, 3643 self.next_message()) 3644 logging.setLogRecordFactory(DerivedLogRecord) 3645 self.root_logger.error(self.next_message()) 3646 self.assert_log_lines([ 3647 ('root', 'ERROR', '2'), 3648 ]) 3649 3650 3651@threading_helper.requires_working_threading() 3652class QueueHandlerTest(BaseTest): 3653 # Do not bother with a logger name group. 3654 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 3655 3656 def setUp(self): 3657 BaseTest.setUp(self) 3658 self.queue = queue.Queue(-1) 3659 self.que_hdlr = logging.handlers.QueueHandler(self.queue) 3660 self.name = 'que' 3661 self.que_logger = logging.getLogger('que') 3662 self.que_logger.propagate = False 3663 self.que_logger.setLevel(logging.WARNING) 3664 self.que_logger.addHandler(self.que_hdlr) 3665 3666 def tearDown(self): 3667 self.que_hdlr.close() 3668 BaseTest.tearDown(self) 3669 3670 def test_queue_handler(self): 3671 self.que_logger.debug(self.next_message()) 3672 self.assertRaises(queue.Empty, self.queue.get_nowait) 3673 self.que_logger.info(self.next_message()) 3674 self.assertRaises(queue.Empty, self.queue.get_nowait) 3675 msg = self.next_message() 3676 self.que_logger.warning(msg) 3677 data = self.queue.get_nowait() 3678 self.assertTrue(isinstance(data, logging.LogRecord)) 3679 self.assertEqual(data.name, self.que_logger.name) 3680 self.assertEqual((data.msg, data.args), (msg, None)) 3681 3682 def test_formatting(self): 3683 msg = self.next_message() 3684 levelname = logging.getLevelName(logging.WARNING) 3685 log_format_str = '{name} -> {levelname}: {message}' 3686 formatted_msg = log_format_str.format(name=self.name, 3687 levelname=levelname, message=msg) 3688 formatter = logging.Formatter(self.log_format) 3689 self.que_hdlr.setFormatter(formatter) 3690 self.que_logger.warning(msg) 3691 log_record = self.queue.get_nowait() 3692 self.assertEqual(formatted_msg, log_record.msg) 3693 self.assertEqual(formatted_msg, log_record.message) 3694 3695 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3696 'logging.handlers.QueueListener required for this test') 3697 def test_queue_listener(self): 3698 handler = TestHandler(support.Matcher()) 3699 listener = logging.handlers.QueueListener(self.queue, handler) 3700 listener.start() 3701 try: 3702 self.que_logger.warning(self.next_message()) 3703 self.que_logger.error(self.next_message()) 3704 self.que_logger.critical(self.next_message()) 3705 finally: 3706 listener.stop() 3707 self.assertTrue(handler.matches(levelno=logging.WARNING, message='1')) 3708 self.assertTrue(handler.matches(levelno=logging.ERROR, message='2')) 3709 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3')) 3710 handler.close() 3711 3712 # Now test with respect_handler_level set 3713 3714 handler = TestHandler(support.Matcher()) 3715 handler.setLevel(logging.CRITICAL) 3716 listener = logging.handlers.QueueListener(self.queue, handler, 3717 respect_handler_level=True) 3718 listener.start() 3719 try: 3720 self.que_logger.warning(self.next_message()) 3721 self.que_logger.error(self.next_message()) 3722 self.que_logger.critical(self.next_message()) 3723 finally: 3724 listener.stop() 3725 self.assertFalse(handler.matches(levelno=logging.WARNING, message='4')) 3726 self.assertFalse(handler.matches(levelno=logging.ERROR, message='5')) 3727 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6')) 3728 handler.close() 3729 3730 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3731 'logging.handlers.QueueListener required for this test') 3732 def test_queue_listener_with_StreamHandler(self): 3733 # Test that traceback and stack-info only appends once (bpo-34334, bpo-46755). 3734 listener = logging.handlers.QueueListener(self.queue, self.root_hdlr) 3735 listener.start() 3736 try: 3737 1 / 0 3738 except ZeroDivisionError as e: 3739 exc = e 3740 self.que_logger.exception(self.next_message(), exc_info=exc) 3741 self.que_logger.error(self.next_message(), stack_info=True) 3742 listener.stop() 3743 self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1) 3744 self.assertEqual(self.stream.getvalue().strip().count('Stack'), 1) 3745 3746 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3747 'logging.handlers.QueueListener required for this test') 3748 def test_queue_listener_with_multiple_handlers(self): 3749 # Test that queue handler format doesn't affect other handler formats (bpo-35726). 3750 self.que_hdlr.setFormatter(self.root_formatter) 3751 self.que_logger.addHandler(self.root_hdlr) 3752 3753 listener = logging.handlers.QueueListener(self.queue, self.que_hdlr) 3754 listener.start() 3755 self.que_logger.error("error") 3756 listener.stop() 3757 self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error") 3758 3759if hasattr(logging.handlers, 'QueueListener'): 3760 import multiprocessing 3761 from unittest.mock import patch 3762 3763 @threading_helper.requires_working_threading() 3764 class QueueListenerTest(BaseTest): 3765 """ 3766 Tests based on patch submitted for issue #27930. Ensure that 3767 QueueListener handles all log messages. 3768 """ 3769 3770 repeat = 20 3771 3772 @staticmethod 3773 def setup_and_log(log_queue, ident): 3774 """ 3775 Creates a logger with a QueueHandler that logs to a queue read by a 3776 QueueListener. Starts the listener, logs five messages, and stops 3777 the listener. 3778 """ 3779 logger = logging.getLogger('test_logger_with_id_%s' % ident) 3780 logger.setLevel(logging.DEBUG) 3781 handler = logging.handlers.QueueHandler(log_queue) 3782 logger.addHandler(handler) 3783 listener = logging.handlers.QueueListener(log_queue) 3784 listener.start() 3785 3786 logger.info('one') 3787 logger.info('two') 3788 logger.info('three') 3789 logger.info('four') 3790 logger.info('five') 3791 3792 listener.stop() 3793 logger.removeHandler(handler) 3794 handler.close() 3795 3796 @patch.object(logging.handlers.QueueListener, 'handle') 3797 def test_handle_called_with_queue_queue(self, mock_handle): 3798 for i in range(self.repeat): 3799 log_queue = queue.Queue() 3800 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3801 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3802 'correct number of handled log messages') 3803 3804 @patch.object(logging.handlers.QueueListener, 'handle') 3805 def test_handle_called_with_mp_queue(self, mock_handle): 3806 # bpo-28668: The multiprocessing (mp) module is not functional 3807 # when the mp.synchronize module cannot be imported. 3808 support.skip_if_broken_multiprocessing_synchronize() 3809 for i in range(self.repeat): 3810 log_queue = multiprocessing.Queue() 3811 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3812 log_queue.close() 3813 log_queue.join_thread() 3814 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3815 'correct number of handled log messages') 3816 3817 @staticmethod 3818 def get_all_from_queue(log_queue): 3819 try: 3820 while True: 3821 yield log_queue.get_nowait() 3822 except queue.Empty: 3823 return [] 3824 3825 def test_no_messages_in_queue_after_stop(self): 3826 """ 3827 Five messages are logged then the QueueListener is stopped. This 3828 test then gets everything off the queue. Failure of this test 3829 indicates that messages were not registered on the queue until 3830 _after_ the QueueListener stopped. 3831 """ 3832 # bpo-28668: The multiprocessing (mp) module is not functional 3833 # when the mp.synchronize module cannot be imported. 3834 support.skip_if_broken_multiprocessing_synchronize() 3835 for i in range(self.repeat): 3836 queue = multiprocessing.Queue() 3837 self.setup_and_log(queue, '%s_%s' %(self.id(), i)) 3838 # time.sleep(1) 3839 items = list(self.get_all_from_queue(queue)) 3840 queue.close() 3841 queue.join_thread() 3842 3843 expected = [[], [logging.handlers.QueueListener._sentinel]] 3844 self.assertIn(items, expected, 3845 'Found unexpected messages in queue: %s' % ( 3846 [m.msg if isinstance(m, logging.LogRecord) 3847 else m for m in items])) 3848 3849 def test_calls_task_done_after_stop(self): 3850 # Issue 36813: Make sure queue.join does not deadlock. 3851 log_queue = queue.Queue() 3852 listener = logging.handlers.QueueListener(log_queue) 3853 listener.start() 3854 listener.stop() 3855 with self.assertRaises(ValueError): 3856 # Make sure all tasks are done and .join won't block. 3857 log_queue.task_done() 3858 3859 3860ZERO = datetime.timedelta(0) 3861 3862class UTC(datetime.tzinfo): 3863 def utcoffset(self, dt): 3864 return ZERO 3865 3866 dst = utcoffset 3867 3868 def tzname(self, dt): 3869 return 'UTC' 3870 3871utc = UTC() 3872 3873class AssertErrorMessage: 3874 3875 def assert_error_message(self, exception, message, *args, **kwargs): 3876 try: 3877 self.assertRaises((), *args, **kwargs) 3878 except exception as e: 3879 self.assertEqual(message, str(e)) 3880 3881class FormatterTest(unittest.TestCase, AssertErrorMessage): 3882 def setUp(self): 3883 self.common = { 3884 'name': 'formatter.test', 3885 'level': logging.DEBUG, 3886 'pathname': os.path.join('path', 'to', 'dummy.ext'), 3887 'lineno': 42, 3888 'exc_info': None, 3889 'func': None, 3890 'msg': 'Message with %d %s', 3891 'args': (2, 'placeholders'), 3892 } 3893 self.variants = { 3894 'custom': { 3895 'custom': 1234 3896 } 3897 } 3898 3899 def get_record(self, name=None): 3900 result = dict(self.common) 3901 if name is not None: 3902 result.update(self.variants[name]) 3903 return logging.makeLogRecord(result) 3904 3905 def test_percent(self): 3906 # Test %-formatting 3907 r = self.get_record() 3908 f = logging.Formatter('${%(message)s}') 3909 self.assertEqual(f.format(r), '${Message with 2 placeholders}') 3910 f = logging.Formatter('%(random)s') 3911 self.assertRaises(ValueError, f.format, r) 3912 self.assertFalse(f.usesTime()) 3913 f = logging.Formatter('%(asctime)s') 3914 self.assertTrue(f.usesTime()) 3915 f = logging.Formatter('%(asctime)-15s') 3916 self.assertTrue(f.usesTime()) 3917 f = logging.Formatter('%(asctime)#15s') 3918 self.assertTrue(f.usesTime()) 3919 3920 def test_braces(self): 3921 # Test {}-formatting 3922 r = self.get_record() 3923 f = logging.Formatter('$%{message}%$', style='{') 3924 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3925 f = logging.Formatter('{random}', style='{') 3926 self.assertRaises(ValueError, f.format, r) 3927 f = logging.Formatter("{message}", style='{') 3928 self.assertFalse(f.usesTime()) 3929 f = logging.Formatter('{asctime}', style='{') 3930 self.assertTrue(f.usesTime()) 3931 f = logging.Formatter('{asctime!s:15}', style='{') 3932 self.assertTrue(f.usesTime()) 3933 f = logging.Formatter('{asctime:15}', style='{') 3934 self.assertTrue(f.usesTime()) 3935 3936 def test_dollars(self): 3937 # Test $-formatting 3938 r = self.get_record() 3939 f = logging.Formatter('${message}', style='$') 3940 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3941 f = logging.Formatter('$message', style='$') 3942 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3943 f = logging.Formatter('$$%${message}%$$', style='$') 3944 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3945 f = logging.Formatter('${random}', style='$') 3946 self.assertRaises(ValueError, f.format, r) 3947 self.assertFalse(f.usesTime()) 3948 f = logging.Formatter('${asctime}', style='$') 3949 self.assertTrue(f.usesTime()) 3950 f = logging.Formatter('$asctime', style='$') 3951 self.assertTrue(f.usesTime()) 3952 f = logging.Formatter('${message}', style='$') 3953 self.assertFalse(f.usesTime()) 3954 f = logging.Formatter('${asctime}--', style='$') 3955 self.assertTrue(f.usesTime()) 3956 3957 def test_format_validate(self): 3958 # Check correct formatting 3959 # Percentage style 3960 f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 3961 self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s") 3962 f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 3963 self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o") 3964 f = logging.Formatter("%(process)#+027.23X") 3965 self.assertEqual(f._fmt, "%(process)#+027.23X") 3966 f = logging.Formatter("%(foo)#.*g") 3967 self.assertEqual(f._fmt, "%(foo)#.*g") 3968 3969 # StrFormat Style 3970 f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{") 3971 self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}") 3972 f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{") 3973 self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}") 3974 f = logging.Formatter("{customfield!s:#<30}", style="{") 3975 self.assertEqual(f._fmt, "{customfield!s:#<30}") 3976 f = logging.Formatter("{message!r}", style="{") 3977 self.assertEqual(f._fmt, "{message!r}") 3978 f = logging.Formatter("{message!s}", style="{") 3979 self.assertEqual(f._fmt, "{message!s}") 3980 f = logging.Formatter("{message!a}", style="{") 3981 self.assertEqual(f._fmt, "{message!a}") 3982 f = logging.Formatter("{process!r:4.2}", style="{") 3983 self.assertEqual(f._fmt, "{process!r:4.2}") 3984 f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{") 3985 self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}") 3986 f = logging.Formatter("{process!s:{w},.{p}}", style="{") 3987 self.assertEqual(f._fmt, "{process!s:{w},.{p}}") 3988 f = logging.Formatter("{foo:12.{p}}", style="{") 3989 self.assertEqual(f._fmt, "{foo:12.{p}}") 3990 f = logging.Formatter("{foo:{w}.6}", style="{") 3991 self.assertEqual(f._fmt, "{foo:{w}.6}") 3992 f = logging.Formatter("{foo[0].bar[1].baz}", style="{") 3993 self.assertEqual(f._fmt, "{foo[0].bar[1].baz}") 3994 f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{") 3995 self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}") 3996 f = logging.Formatter("{12[k1].bar[k2].baz}", style="{") 3997 self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}") 3998 3999 # Dollar style 4000 f = logging.Formatter("${asctime} - $message", style="$") 4001 self.assertEqual(f._fmt, "${asctime} - $message") 4002 f = logging.Formatter("$bar $$", style="$") 4003 self.assertEqual(f._fmt, "$bar $$") 4004 f = logging.Formatter("$bar $$$$", style="$") 4005 self.assertEqual(f._fmt, "$bar $$$$") # this would print two $($$) 4006 4007 # Testing when ValueError being raised from incorrect format 4008 # Percentage Style 4009 self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z") 4010 self.assertRaises(ValueError, logging.Formatter, "%(asctime)b") 4011 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*") 4012 self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s") 4013 self.assertRaises(ValueError, logging.Formatter, "%(asctime)_") 4014 self.assertRaises(ValueError, logging.Formatter, '{asctime}') 4015 self.assertRaises(ValueError, logging.Formatter, '${message}') 4016 self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f') # with both * and decimal number as precision 4017 self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f') 4018 4019 # StrFormat Style 4020 # Testing failure for '-' in field name 4021 self.assert_error_message( 4022 ValueError, 4023 "invalid format: invalid field name/expression: 'name-thing'", 4024 logging.Formatter, "{name-thing}", style="{" 4025 ) 4026 # Testing failure for style mismatch 4027 self.assert_error_message( 4028 ValueError, 4029 "invalid format: no fields", 4030 logging.Formatter, '%(asctime)s', style='{' 4031 ) 4032 # Testing failure for invalid conversion 4033 self.assert_error_message( 4034 ValueError, 4035 "invalid conversion: 'Z'" 4036 ) 4037 self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{') 4038 self.assert_error_message( 4039 ValueError, 4040 "invalid format: expected ':' after conversion specifier", 4041 logging.Formatter, '{asctime!aa:15}', style='{' 4042 ) 4043 # Testing failure for invalid spec 4044 self.assert_error_message( 4045 ValueError, 4046 "invalid format: bad specifier: '.2ff'", 4047 logging.Formatter, '{process:.2ff}', style='{' 4048 ) 4049 self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{') 4050 self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{') 4051 self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{') 4052 self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{') 4053 # Testing failure for mismatch braces 4054 self.assert_error_message( 4055 ValueError, 4056 "invalid format: expected '}' before end of string", 4057 logging.Formatter, '{process', style='{' 4058 ) 4059 self.assert_error_message( 4060 ValueError, 4061 "invalid format: Single '}' encountered in format string", 4062 logging.Formatter, 'process}', style='{' 4063 ) 4064 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{') 4065 self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{') 4066 self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{') 4067 self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{') 4068 self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{') 4069 self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{') 4070 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{') 4071 self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{') 4072 self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{') 4073 4074 # Dollar style 4075 # Testing failure for mismatch bare $ 4076 self.assert_error_message( 4077 ValueError, 4078 "invalid format: bare \'$\' not allowed", 4079 logging.Formatter, '$bar $$$', style='$' 4080 ) 4081 self.assert_error_message( 4082 ValueError, 4083 "invalid format: bare \'$\' not allowed", 4084 logging.Formatter, 'bar $', style='$' 4085 ) 4086 self.assert_error_message( 4087 ValueError, 4088 "invalid format: bare \'$\' not allowed", 4089 logging.Formatter, 'foo $.', style='$' 4090 ) 4091 # Testing failure for mismatch style 4092 self.assert_error_message( 4093 ValueError, 4094 "invalid format: no fields", 4095 logging.Formatter, '{asctime}', style='$' 4096 ) 4097 self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$') 4098 4099 # Testing failure for incorrect fields 4100 self.assert_error_message( 4101 ValueError, 4102 "invalid format: no fields", 4103 logging.Formatter, 'foo', style='$' 4104 ) 4105 self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$') 4106 4107 def test_defaults_parameter(self): 4108 fmts = ['%(custom)s %(message)s', '{custom} {message}', '$custom $message'] 4109 styles = ['%', '{', '$'] 4110 for fmt, style in zip(fmts, styles): 4111 f = logging.Formatter(fmt, style=style, defaults={'custom': 'Default'}) 4112 r = self.get_record() 4113 self.assertEqual(f.format(r), 'Default Message with 2 placeholders') 4114 r = self.get_record("custom") 4115 self.assertEqual(f.format(r), '1234 Message with 2 placeholders') 4116 4117 # Without default 4118 f = logging.Formatter(fmt, style=style) 4119 r = self.get_record() 4120 self.assertRaises(ValueError, f.format, r) 4121 4122 # Non-existing default is ignored 4123 f = logging.Formatter(fmt, style=style, defaults={'Non-existing': 'Default'}) 4124 r = self.get_record("custom") 4125 self.assertEqual(f.format(r), '1234 Message with 2 placeholders') 4126 4127 def test_invalid_style(self): 4128 self.assertRaises(ValueError, logging.Formatter, None, None, 'x') 4129 4130 def test_time(self): 4131 r = self.get_record() 4132 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc) 4133 # We use None to indicate we want the local timezone 4134 # We're essentially converting a UTC time to local time 4135 r.created = time.mktime(dt.astimezone(None).timetuple()) 4136 r.msecs = 123 4137 f = logging.Formatter('%(asctime)s %(message)s') 4138 f.converter = time.gmtime 4139 self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123') 4140 self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21') 4141 f.format(r) 4142 self.assertEqual(r.asctime, '1993-04-21 08:03:00,123') 4143 4144 def test_default_msec_format_none(self): 4145 class NoMsecFormatter(logging.Formatter): 4146 default_msec_format = None 4147 default_time_format = '%d/%m/%Y %H:%M:%S' 4148 4149 r = self.get_record() 4150 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 123, utc) 4151 r.created = time.mktime(dt.astimezone(None).timetuple()) 4152 f = NoMsecFormatter() 4153 f.converter = time.gmtime 4154 self.assertEqual(f.formatTime(r), '21/04/1993 08:03:00') 4155 4156 def test_issue_89047(self): 4157 f = logging.Formatter(fmt='{asctime}.{msecs:03.0f} {message}', style='{', datefmt="%Y-%m-%d %H:%M:%S") 4158 for i in range(2500): 4159 time.sleep(0.0004) 4160 r = logging.makeLogRecord({'msg': 'Message %d' % (i + 1)}) 4161 s = f.format(r) 4162 self.assertNotIn('.1000', s) 4163 4164 4165class TestBufferingFormatter(logging.BufferingFormatter): 4166 def formatHeader(self, records): 4167 return '[(%d)' % len(records) 4168 4169 def formatFooter(self, records): 4170 return '(%d)]' % len(records) 4171 4172class BufferingFormatterTest(unittest.TestCase): 4173 def setUp(self): 4174 self.records = [ 4175 logging.makeLogRecord({'msg': 'one'}), 4176 logging.makeLogRecord({'msg': 'two'}), 4177 ] 4178 4179 def test_default(self): 4180 f = logging.BufferingFormatter() 4181 self.assertEqual('', f.format([])) 4182 self.assertEqual('onetwo', f.format(self.records)) 4183 4184 def test_custom(self): 4185 f = TestBufferingFormatter() 4186 self.assertEqual('[(2)onetwo(2)]', f.format(self.records)) 4187 lf = logging.Formatter('<%(message)s>') 4188 f = TestBufferingFormatter(lf) 4189 self.assertEqual('[(2)<one><two>(2)]', f.format(self.records)) 4190 4191class ExceptionTest(BaseTest): 4192 def test_formatting(self): 4193 r = self.root_logger 4194 h = RecordingHandler() 4195 r.addHandler(h) 4196 try: 4197 raise RuntimeError('deliberate mistake') 4198 except: 4199 logging.exception('failed', stack_info=True) 4200 r.removeHandler(h) 4201 h.close() 4202 r = h.records[0] 4203 self.assertTrue(r.exc_text.startswith('Traceback (most recent ' 4204 'call last):\n')) 4205 self.assertTrue(r.exc_text.endswith('\nRuntimeError: ' 4206 'deliberate mistake')) 4207 self.assertTrue(r.stack_info.startswith('Stack (most recent ' 4208 'call last):\n')) 4209 self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', ' 4210 'stack_info=True)')) 4211 4212 4213class LastResortTest(BaseTest): 4214 def test_last_resort(self): 4215 # Test the last resort handler 4216 root = self.root_logger 4217 root.removeHandler(self.root_hdlr) 4218 old_lastresort = logging.lastResort 4219 old_raise_exceptions = logging.raiseExceptions 4220 4221 try: 4222 with support.captured_stderr() as stderr: 4223 root.debug('This should not appear') 4224 self.assertEqual(stderr.getvalue(), '') 4225 root.warning('Final chance!') 4226 self.assertEqual(stderr.getvalue(), 'Final chance!\n') 4227 4228 # No handlers and no last resort, so 'No handlers' message 4229 logging.lastResort = None 4230 with support.captured_stderr() as stderr: 4231 root.warning('Final chance!') 4232 msg = 'No handlers could be found for logger "root"\n' 4233 self.assertEqual(stderr.getvalue(), msg) 4234 4235 # 'No handlers' message only printed once 4236 with support.captured_stderr() as stderr: 4237 root.warning('Final chance!') 4238 self.assertEqual(stderr.getvalue(), '') 4239 4240 # If raiseExceptions is False, no message is printed 4241 root.manager.emittedNoHandlerWarning = False 4242 logging.raiseExceptions = False 4243 with support.captured_stderr() as stderr: 4244 root.warning('Final chance!') 4245 self.assertEqual(stderr.getvalue(), '') 4246 finally: 4247 root.addHandler(self.root_hdlr) 4248 logging.lastResort = old_lastresort 4249 logging.raiseExceptions = old_raise_exceptions 4250 4251 4252class FakeHandler: 4253 4254 def __init__(self, identifier, called): 4255 for method in ('acquire', 'flush', 'close', 'release'): 4256 setattr(self, method, self.record_call(identifier, method, called)) 4257 4258 def record_call(self, identifier, method_name, called): 4259 def inner(): 4260 called.append('{} - {}'.format(identifier, method_name)) 4261 return inner 4262 4263 4264class RecordingHandler(logging.NullHandler): 4265 4266 def __init__(self, *args, **kwargs): 4267 super(RecordingHandler, self).__init__(*args, **kwargs) 4268 self.records = [] 4269 4270 def handle(self, record): 4271 """Keep track of all the emitted records.""" 4272 self.records.append(record) 4273 4274 4275class ShutdownTest(BaseTest): 4276 4277 """Test suite for the shutdown method.""" 4278 4279 def setUp(self): 4280 super(ShutdownTest, self).setUp() 4281 self.called = [] 4282 4283 raise_exceptions = logging.raiseExceptions 4284 self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions) 4285 4286 def raise_error(self, error): 4287 def inner(): 4288 raise error() 4289 return inner 4290 4291 def test_no_failure(self): 4292 # create some fake handlers 4293 handler0 = FakeHandler(0, self.called) 4294 handler1 = FakeHandler(1, self.called) 4295 handler2 = FakeHandler(2, self.called) 4296 4297 # create live weakref to those handlers 4298 handlers = map(logging.weakref.ref, [handler0, handler1, handler2]) 4299 4300 logging.shutdown(handlerList=list(handlers)) 4301 4302 expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release', 4303 '1 - acquire', '1 - flush', '1 - close', '1 - release', 4304 '0 - acquire', '0 - flush', '0 - close', '0 - release'] 4305 self.assertEqual(expected, self.called) 4306 4307 def _test_with_failure_in_method(self, method, error): 4308 handler = FakeHandler(0, self.called) 4309 setattr(handler, method, self.raise_error(error)) 4310 handlers = [logging.weakref.ref(handler)] 4311 4312 logging.shutdown(handlerList=list(handlers)) 4313 4314 self.assertEqual('0 - release', self.called[-1]) 4315 4316 def test_with_ioerror_in_acquire(self): 4317 self._test_with_failure_in_method('acquire', OSError) 4318 4319 def test_with_ioerror_in_flush(self): 4320 self._test_with_failure_in_method('flush', OSError) 4321 4322 def test_with_ioerror_in_close(self): 4323 self._test_with_failure_in_method('close', OSError) 4324 4325 def test_with_valueerror_in_acquire(self): 4326 self._test_with_failure_in_method('acquire', ValueError) 4327 4328 def test_with_valueerror_in_flush(self): 4329 self._test_with_failure_in_method('flush', ValueError) 4330 4331 def test_with_valueerror_in_close(self): 4332 self._test_with_failure_in_method('close', ValueError) 4333 4334 def test_with_other_error_in_acquire_without_raise(self): 4335 logging.raiseExceptions = False 4336 self._test_with_failure_in_method('acquire', IndexError) 4337 4338 def test_with_other_error_in_flush_without_raise(self): 4339 logging.raiseExceptions = False 4340 self._test_with_failure_in_method('flush', IndexError) 4341 4342 def test_with_other_error_in_close_without_raise(self): 4343 logging.raiseExceptions = False 4344 self._test_with_failure_in_method('close', IndexError) 4345 4346 def test_with_other_error_in_acquire_with_raise(self): 4347 logging.raiseExceptions = True 4348 self.assertRaises(IndexError, self._test_with_failure_in_method, 4349 'acquire', IndexError) 4350 4351 def test_with_other_error_in_flush_with_raise(self): 4352 logging.raiseExceptions = True 4353 self.assertRaises(IndexError, self._test_with_failure_in_method, 4354 'flush', IndexError) 4355 4356 def test_with_other_error_in_close_with_raise(self): 4357 logging.raiseExceptions = True 4358 self.assertRaises(IndexError, self._test_with_failure_in_method, 4359 'close', IndexError) 4360 4361 4362class ModuleLevelMiscTest(BaseTest): 4363 4364 """Test suite for some module level methods.""" 4365 4366 def test_disable(self): 4367 old_disable = logging.root.manager.disable 4368 # confirm our assumptions are correct 4369 self.assertEqual(old_disable, 0) 4370 self.addCleanup(logging.disable, old_disable) 4371 4372 logging.disable(83) 4373 self.assertEqual(logging.root.manager.disable, 83) 4374 4375 self.assertRaises(ValueError, logging.disable, "doesnotexists") 4376 4377 class _NotAnIntOrString: 4378 pass 4379 4380 self.assertRaises(TypeError, logging.disable, _NotAnIntOrString()) 4381 4382 logging.disable("WARN") 4383 4384 # test the default value introduced in 3.7 4385 # (Issue #28524) 4386 logging.disable() 4387 self.assertEqual(logging.root.manager.disable, logging.CRITICAL) 4388 4389 def _test_log(self, method, level=None): 4390 called = [] 4391 support.patch(self, logging, 'basicConfig', 4392 lambda *a, **kw: called.append((a, kw))) 4393 4394 recording = RecordingHandler() 4395 logging.root.addHandler(recording) 4396 4397 log_method = getattr(logging, method) 4398 if level is not None: 4399 log_method(level, "test me: %r", recording) 4400 else: 4401 log_method("test me: %r", recording) 4402 4403 self.assertEqual(len(recording.records), 1) 4404 record = recording.records[0] 4405 self.assertEqual(record.getMessage(), "test me: %r" % recording) 4406 4407 expected_level = level if level is not None else getattr(logging, method.upper()) 4408 self.assertEqual(record.levelno, expected_level) 4409 4410 # basicConfig was not called! 4411 self.assertEqual(called, []) 4412 4413 def test_log(self): 4414 self._test_log('log', logging.ERROR) 4415 4416 def test_debug(self): 4417 self._test_log('debug') 4418 4419 def test_info(self): 4420 self._test_log('info') 4421 4422 def test_warning(self): 4423 self._test_log('warning') 4424 4425 def test_error(self): 4426 self._test_log('error') 4427 4428 def test_critical(self): 4429 self._test_log('critical') 4430 4431 def test_set_logger_class(self): 4432 self.assertRaises(TypeError, logging.setLoggerClass, object) 4433 4434 class MyLogger(logging.Logger): 4435 pass 4436 4437 logging.setLoggerClass(MyLogger) 4438 self.assertEqual(logging.getLoggerClass(), MyLogger) 4439 4440 logging.setLoggerClass(logging.Logger) 4441 self.assertEqual(logging.getLoggerClass(), logging.Logger) 4442 4443 def test_subclass_logger_cache(self): 4444 # bpo-37258 4445 message = [] 4446 4447 class MyLogger(logging.getLoggerClass()): 4448 def __init__(self, name='MyLogger', level=logging.NOTSET): 4449 super().__init__(name, level) 4450 message.append('initialized') 4451 4452 logging.setLoggerClass(MyLogger) 4453 logger = logging.getLogger('just_some_logger') 4454 self.assertEqual(message, ['initialized']) 4455 stream = io.StringIO() 4456 h = logging.StreamHandler(stream) 4457 logger.addHandler(h) 4458 try: 4459 logger.setLevel(logging.DEBUG) 4460 logger.debug("hello") 4461 self.assertEqual(stream.getvalue().strip(), "hello") 4462 4463 stream.truncate(0) 4464 stream.seek(0) 4465 4466 logger.setLevel(logging.INFO) 4467 logger.debug("hello") 4468 self.assertEqual(stream.getvalue(), "") 4469 finally: 4470 logger.removeHandler(h) 4471 h.close() 4472 logging.setLoggerClass(logging.Logger) 4473 4474 def test_logging_at_shutdown(self): 4475 # bpo-20037: Doing text I/O late at interpreter shutdown must not crash 4476 code = textwrap.dedent(""" 4477 import logging 4478 4479 class A: 4480 def __del__(self): 4481 try: 4482 raise ValueError("some error") 4483 except Exception: 4484 logging.exception("exception in __del__") 4485 4486 a = A() 4487 """) 4488 rc, out, err = assert_python_ok("-c", code) 4489 err = err.decode() 4490 self.assertIn("exception in __del__", err) 4491 self.assertIn("ValueError: some error", err) 4492 4493 def test_logging_at_shutdown_open(self): 4494 # bpo-26789: FileHandler keeps a reference to the builtin open() 4495 # function to be able to open or reopen the file during Python 4496 # finalization. 4497 filename = os_helper.TESTFN 4498 self.addCleanup(os_helper.unlink, filename) 4499 4500 code = textwrap.dedent(f""" 4501 import builtins 4502 import logging 4503 4504 class A: 4505 def __del__(self): 4506 logging.error("log in __del__") 4507 4508 # basicConfig() opens the file, but logging.shutdown() closes 4509 # it at Python exit. When A.__del__() is called, 4510 # FileHandler._open() must be called again to re-open the file. 4511 logging.basicConfig(filename={filename!r}, encoding="utf-8") 4512 4513 a = A() 4514 4515 # Simulate the Python finalization which removes the builtin 4516 # open() function. 4517 del builtins.open 4518 """) 4519 assert_python_ok("-c", code) 4520 4521 with open(filename, encoding="utf-8") as fp: 4522 self.assertEqual(fp.read().rstrip(), "ERROR:root:log in __del__") 4523 4524 def test_recursion_error(self): 4525 # Issue 36272 4526 code = textwrap.dedent(""" 4527 import logging 4528 4529 def rec(): 4530 logging.error("foo") 4531 rec() 4532 4533 rec() 4534 """) 4535 rc, out, err = assert_python_failure("-c", code) 4536 err = err.decode() 4537 self.assertNotIn("Cannot recover from stack overflow.", err) 4538 self.assertEqual(rc, 1) 4539 4540 def test_get_level_names_mapping(self): 4541 mapping = logging.getLevelNamesMapping() 4542 self.assertEqual(logging._nameToLevel, mapping) # value is equivalent 4543 self.assertIsNot(logging._nameToLevel, mapping) # but not the internal data 4544 new_mapping = logging.getLevelNamesMapping() # another call -> another copy 4545 self.assertIsNot(mapping, new_mapping) # verify not the same object as before 4546 self.assertEqual(mapping, new_mapping) # but equivalent in value 4547 4548 4549class LogRecordTest(BaseTest): 4550 def test_str_rep(self): 4551 r = logging.makeLogRecord({}) 4552 s = str(r) 4553 self.assertTrue(s.startswith('<LogRecord: ')) 4554 self.assertTrue(s.endswith('>')) 4555 4556 def test_dict_arg(self): 4557 h = RecordingHandler() 4558 r = logging.getLogger() 4559 r.addHandler(h) 4560 d = {'less' : 'more' } 4561 logging.warning('less is %(less)s', d) 4562 self.assertIs(h.records[0].args, d) 4563 self.assertEqual(h.records[0].message, 'less is more') 4564 r.removeHandler(h) 4565 h.close() 4566 4567 @staticmethod # pickled as target of child process in the following test 4568 def _extract_logrecord_process_name(key, logMultiprocessing, conn=None): 4569 prev_logMultiprocessing = logging.logMultiprocessing 4570 logging.logMultiprocessing = logMultiprocessing 4571 try: 4572 import multiprocessing as mp 4573 name = mp.current_process().name 4574 4575 r1 = logging.makeLogRecord({'msg': f'msg1_{key}'}) 4576 4577 # https://bugs.python.org/issue45128 4578 with support.swap_item(sys.modules, 'multiprocessing', None): 4579 r2 = logging.makeLogRecord({'msg': f'msg2_{key}'}) 4580 4581 results = {'processName' : name, 4582 'r1.processName': r1.processName, 4583 'r2.processName': r2.processName, 4584 } 4585 finally: 4586 logging.logMultiprocessing = prev_logMultiprocessing 4587 if conn: 4588 conn.send(results) 4589 else: 4590 return results 4591 4592 def test_multiprocessing(self): 4593 support.skip_if_broken_multiprocessing_synchronize() 4594 multiprocessing_imported = 'multiprocessing' in sys.modules 4595 try: 4596 # logMultiprocessing is True by default 4597 self.assertEqual(logging.logMultiprocessing, True) 4598 4599 LOG_MULTI_PROCESSING = True 4600 # When logMultiprocessing == True: 4601 # In the main process processName = 'MainProcess' 4602 r = logging.makeLogRecord({}) 4603 self.assertEqual(r.processName, 'MainProcess') 4604 4605 results = self._extract_logrecord_process_name(1, LOG_MULTI_PROCESSING) 4606 self.assertEqual('MainProcess', results['processName']) 4607 self.assertEqual('MainProcess', results['r1.processName']) 4608 self.assertEqual('MainProcess', results['r2.processName']) 4609 4610 # In other processes, processName is correct when multiprocessing in imported, 4611 # but it is (incorrectly) defaulted to 'MainProcess' otherwise (bpo-38762). 4612 import multiprocessing 4613 parent_conn, child_conn = multiprocessing.Pipe() 4614 p = multiprocessing.Process( 4615 target=self._extract_logrecord_process_name, 4616 args=(2, LOG_MULTI_PROCESSING, child_conn,) 4617 ) 4618 p.start() 4619 results = parent_conn.recv() 4620 self.assertNotEqual('MainProcess', results['processName']) 4621 self.assertEqual(results['processName'], results['r1.processName']) 4622 self.assertEqual('MainProcess', results['r2.processName']) 4623 p.join() 4624 4625 finally: 4626 if multiprocessing_imported: 4627 import multiprocessing 4628 4629 def test_optional(self): 4630 r = logging.makeLogRecord({}) 4631 NOT_NONE = self.assertIsNotNone 4632 NOT_NONE(r.thread) 4633 NOT_NONE(r.threadName) 4634 NOT_NONE(r.process) 4635 NOT_NONE(r.processName) 4636 log_threads = logging.logThreads 4637 log_processes = logging.logProcesses 4638 log_multiprocessing = logging.logMultiprocessing 4639 try: 4640 logging.logThreads = False 4641 logging.logProcesses = False 4642 logging.logMultiprocessing = False 4643 r = logging.makeLogRecord({}) 4644 NONE = self.assertIsNone 4645 NONE(r.thread) 4646 NONE(r.threadName) 4647 NONE(r.process) 4648 NONE(r.processName) 4649 finally: 4650 logging.logThreads = log_threads 4651 logging.logProcesses = log_processes 4652 logging.logMultiprocessing = log_multiprocessing 4653 4654class BasicConfigTest(unittest.TestCase): 4655 4656 """Test suite for logging.basicConfig.""" 4657 4658 def setUp(self): 4659 super(BasicConfigTest, self).setUp() 4660 self.handlers = logging.root.handlers 4661 self.saved_handlers = logging._handlers.copy() 4662 self.saved_handler_list = logging._handlerList[:] 4663 self.original_logging_level = logging.root.level 4664 self.addCleanup(self.cleanup) 4665 logging.root.handlers = [] 4666 4667 def tearDown(self): 4668 for h in logging.root.handlers[:]: 4669 logging.root.removeHandler(h) 4670 h.close() 4671 super(BasicConfigTest, self).tearDown() 4672 4673 def cleanup(self): 4674 setattr(logging.root, 'handlers', self.handlers) 4675 logging._handlers.clear() 4676 logging._handlers.update(self.saved_handlers) 4677 logging._handlerList[:] = self.saved_handler_list 4678 logging.root.setLevel(self.original_logging_level) 4679 4680 def test_no_kwargs(self): 4681 logging.basicConfig() 4682 4683 # handler defaults to a StreamHandler to sys.stderr 4684 self.assertEqual(len(logging.root.handlers), 1) 4685 handler = logging.root.handlers[0] 4686 self.assertIsInstance(handler, logging.StreamHandler) 4687 self.assertEqual(handler.stream, sys.stderr) 4688 4689 formatter = handler.formatter 4690 # format defaults to logging.BASIC_FORMAT 4691 self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT) 4692 # datefmt defaults to None 4693 self.assertIsNone(formatter.datefmt) 4694 # style defaults to % 4695 self.assertIsInstance(formatter._style, logging.PercentStyle) 4696 4697 # level is not explicitly set 4698 self.assertEqual(logging.root.level, self.original_logging_level) 4699 4700 def test_strformatstyle(self): 4701 with support.captured_stdout() as output: 4702 logging.basicConfig(stream=sys.stdout, style="{") 4703 logging.error("Log an error") 4704 sys.stdout.seek(0) 4705 self.assertEqual(output.getvalue().strip(), 4706 "ERROR:root:Log an error") 4707 4708 def test_stringtemplatestyle(self): 4709 with support.captured_stdout() as output: 4710 logging.basicConfig(stream=sys.stdout, style="$") 4711 logging.error("Log an error") 4712 sys.stdout.seek(0) 4713 self.assertEqual(output.getvalue().strip(), 4714 "ERROR:root:Log an error") 4715 4716 def test_filename(self): 4717 4718 def cleanup(h1, h2, fn): 4719 h1.close() 4720 h2.close() 4721 os.remove(fn) 4722 4723 logging.basicConfig(filename='test.log', encoding='utf-8') 4724 4725 self.assertEqual(len(logging.root.handlers), 1) 4726 handler = logging.root.handlers[0] 4727 self.assertIsInstance(handler, logging.FileHandler) 4728 4729 expected = logging.FileHandler('test.log', 'a', encoding='utf-8') 4730 self.assertEqual(handler.stream.mode, expected.stream.mode) 4731 self.assertEqual(handler.stream.name, expected.stream.name) 4732 self.addCleanup(cleanup, handler, expected, 'test.log') 4733 4734 def test_filemode(self): 4735 4736 def cleanup(h1, h2, fn): 4737 h1.close() 4738 h2.close() 4739 os.remove(fn) 4740 4741 logging.basicConfig(filename='test.log', filemode='wb') 4742 4743 handler = logging.root.handlers[0] 4744 expected = logging.FileHandler('test.log', 'wb') 4745 self.assertEqual(handler.stream.mode, expected.stream.mode) 4746 self.addCleanup(cleanup, handler, expected, 'test.log') 4747 4748 def test_stream(self): 4749 stream = io.StringIO() 4750 self.addCleanup(stream.close) 4751 logging.basicConfig(stream=stream) 4752 4753 self.assertEqual(len(logging.root.handlers), 1) 4754 handler = logging.root.handlers[0] 4755 self.assertIsInstance(handler, logging.StreamHandler) 4756 self.assertEqual(handler.stream, stream) 4757 4758 def test_format(self): 4759 logging.basicConfig(format='%(asctime)s - %(message)s') 4760 4761 formatter = logging.root.handlers[0].formatter 4762 self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s') 4763 4764 def test_datefmt(self): 4765 logging.basicConfig(datefmt='bar') 4766 4767 formatter = logging.root.handlers[0].formatter 4768 self.assertEqual(formatter.datefmt, 'bar') 4769 4770 def test_style(self): 4771 logging.basicConfig(style='$') 4772 4773 formatter = logging.root.handlers[0].formatter 4774 self.assertIsInstance(formatter._style, logging.StringTemplateStyle) 4775 4776 def test_level(self): 4777 old_level = logging.root.level 4778 self.addCleanup(logging.root.setLevel, old_level) 4779 4780 logging.basicConfig(level=57) 4781 self.assertEqual(logging.root.level, 57) 4782 # Test that second call has no effect 4783 logging.basicConfig(level=58) 4784 self.assertEqual(logging.root.level, 57) 4785 4786 def test_incompatible(self): 4787 assertRaises = self.assertRaises 4788 handlers = [logging.StreamHandler()] 4789 stream = sys.stderr 4790 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4791 stream=stream) 4792 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4793 handlers=handlers) 4794 assertRaises(ValueError, logging.basicConfig, stream=stream, 4795 handlers=handlers) 4796 # Issue 23207: test for invalid kwargs 4797 assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO) 4798 # Should pop both filename and filemode even if filename is None 4799 logging.basicConfig(filename=None, filemode='a') 4800 4801 def test_handlers(self): 4802 handlers = [ 4803 logging.StreamHandler(), 4804 logging.StreamHandler(sys.stdout), 4805 logging.StreamHandler(), 4806 ] 4807 f = logging.Formatter() 4808 handlers[2].setFormatter(f) 4809 logging.basicConfig(handlers=handlers) 4810 self.assertIs(handlers[0], logging.root.handlers[0]) 4811 self.assertIs(handlers[1], logging.root.handlers[1]) 4812 self.assertIs(handlers[2], logging.root.handlers[2]) 4813 self.assertIsNotNone(handlers[0].formatter) 4814 self.assertIsNotNone(handlers[1].formatter) 4815 self.assertIs(handlers[2].formatter, f) 4816 self.assertIs(handlers[0].formatter, handlers[1].formatter) 4817 4818 def test_force(self): 4819 old_string_io = io.StringIO() 4820 new_string_io = io.StringIO() 4821 old_handlers = [logging.StreamHandler(old_string_io)] 4822 new_handlers = [logging.StreamHandler(new_string_io)] 4823 logging.basicConfig(level=logging.WARNING, handlers=old_handlers) 4824 logging.warning('warn') 4825 logging.info('info') 4826 logging.debug('debug') 4827 self.assertEqual(len(logging.root.handlers), 1) 4828 logging.basicConfig(level=logging.INFO, handlers=new_handlers, 4829 force=True) 4830 logging.warning('warn') 4831 logging.info('info') 4832 logging.debug('debug') 4833 self.assertEqual(len(logging.root.handlers), 1) 4834 self.assertEqual(old_string_io.getvalue().strip(), 4835 'WARNING:root:warn') 4836 self.assertEqual(new_string_io.getvalue().strip(), 4837 'WARNING:root:warn\nINFO:root:info') 4838 4839 def test_encoding(self): 4840 try: 4841 encoding = 'utf-8' 4842 logging.basicConfig(filename='test.log', encoding=encoding, 4843 errors='strict', 4844 format='%(message)s', level=logging.DEBUG) 4845 4846 self.assertEqual(len(logging.root.handlers), 1) 4847 handler = logging.root.handlers[0] 4848 self.assertIsInstance(handler, logging.FileHandler) 4849 self.assertEqual(handler.encoding, encoding) 4850 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 4851 finally: 4852 handler.close() 4853 with open('test.log', encoding='utf-8') as f: 4854 data = f.read().strip() 4855 os.remove('test.log') 4856 self.assertEqual(data, 4857 'The Øresund Bridge joins Copenhagen to Malmö') 4858 4859 def test_encoding_errors(self): 4860 try: 4861 encoding = 'ascii' 4862 logging.basicConfig(filename='test.log', encoding=encoding, 4863 errors='ignore', 4864 format='%(message)s', level=logging.DEBUG) 4865 4866 self.assertEqual(len(logging.root.handlers), 1) 4867 handler = logging.root.handlers[0] 4868 self.assertIsInstance(handler, logging.FileHandler) 4869 self.assertEqual(handler.encoding, encoding) 4870 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 4871 finally: 4872 handler.close() 4873 with open('test.log', encoding='utf-8') as f: 4874 data = f.read().strip() 4875 os.remove('test.log') 4876 self.assertEqual(data, 'The resund Bridge joins Copenhagen to Malm') 4877 4878 def test_encoding_errors_default(self): 4879 try: 4880 encoding = 'ascii' 4881 logging.basicConfig(filename='test.log', encoding=encoding, 4882 format='%(message)s', level=logging.DEBUG) 4883 4884 self.assertEqual(len(logging.root.handlers), 1) 4885 handler = logging.root.handlers[0] 4886 self.assertIsInstance(handler, logging.FileHandler) 4887 self.assertEqual(handler.encoding, encoding) 4888 self.assertEqual(handler.errors, 'backslashreplace') 4889 logging.debug(': ☃️: The Øresund Bridge joins Copenhagen to Malmö') 4890 finally: 4891 handler.close() 4892 with open('test.log', encoding='utf-8') as f: 4893 data = f.read().strip() 4894 os.remove('test.log') 4895 self.assertEqual(data, r'\U0001f602: \u2603\ufe0f: The \xd8resund ' 4896 r'Bridge joins Copenhagen to Malm\xf6') 4897 4898 def test_encoding_errors_none(self): 4899 # Specifying None should behave as 'strict' 4900 try: 4901 encoding = 'ascii' 4902 logging.basicConfig(filename='test.log', encoding=encoding, 4903 errors=None, 4904 format='%(message)s', level=logging.DEBUG) 4905 4906 self.assertEqual(len(logging.root.handlers), 1) 4907 handler = logging.root.handlers[0] 4908 self.assertIsInstance(handler, logging.FileHandler) 4909 self.assertEqual(handler.encoding, encoding) 4910 self.assertIsNone(handler.errors) 4911 4912 message = [] 4913 4914 def dummy_handle_error(record): 4915 _, v, _ = sys.exc_info() 4916 message.append(str(v)) 4917 4918 handler.handleError = dummy_handle_error 4919 logging.debug('The Øresund Bridge joins Copenhagen to Malmö') 4920 self.assertTrue(message) 4921 self.assertIn("'ascii' codec can't encode " 4922 "character '\\xd8' in position 4:", message[0]) 4923 finally: 4924 handler.close() 4925 with open('test.log', encoding='utf-8') as f: 4926 data = f.read().strip() 4927 os.remove('test.log') 4928 # didn't write anything due to the encoding error 4929 self.assertEqual(data, r'') 4930 4931 4932 def _test_log(self, method, level=None): 4933 # logging.root has no handlers so basicConfig should be called 4934 called = [] 4935 4936 old_basic_config = logging.basicConfig 4937 def my_basic_config(*a, **kw): 4938 old_basic_config() 4939 old_level = logging.root.level 4940 logging.root.setLevel(100) # avoid having messages in stderr 4941 self.addCleanup(logging.root.setLevel, old_level) 4942 called.append((a, kw)) 4943 4944 support.patch(self, logging, 'basicConfig', my_basic_config) 4945 4946 log_method = getattr(logging, method) 4947 if level is not None: 4948 log_method(level, "test me") 4949 else: 4950 log_method("test me") 4951 4952 # basicConfig was called with no arguments 4953 self.assertEqual(called, [((), {})]) 4954 4955 def test_log(self): 4956 self._test_log('log', logging.WARNING) 4957 4958 def test_debug(self): 4959 self._test_log('debug') 4960 4961 def test_info(self): 4962 self._test_log('info') 4963 4964 def test_warning(self): 4965 self._test_log('warning') 4966 4967 def test_error(self): 4968 self._test_log('error') 4969 4970 def test_critical(self): 4971 self._test_log('critical') 4972 4973 4974class LoggerAdapterTest(unittest.TestCase): 4975 def setUp(self): 4976 super(LoggerAdapterTest, self).setUp() 4977 old_handler_list = logging._handlerList[:] 4978 4979 self.recording = RecordingHandler() 4980 self.logger = logging.root 4981 self.logger.addHandler(self.recording) 4982 self.addCleanup(self.logger.removeHandler, self.recording) 4983 self.addCleanup(self.recording.close) 4984 4985 def cleanup(): 4986 logging._handlerList[:] = old_handler_list 4987 4988 self.addCleanup(cleanup) 4989 self.addCleanup(logging.shutdown) 4990 self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None) 4991 4992 def test_exception(self): 4993 msg = 'testing exception: %r' 4994 exc = None 4995 try: 4996 1 / 0 4997 except ZeroDivisionError as e: 4998 exc = e 4999 self.adapter.exception(msg, self.recording) 5000 5001 self.assertEqual(len(self.recording.records), 1) 5002 record = self.recording.records[0] 5003 self.assertEqual(record.levelno, logging.ERROR) 5004 self.assertEqual(record.msg, msg) 5005 self.assertEqual(record.args, (self.recording,)) 5006 self.assertEqual(record.exc_info, 5007 (exc.__class__, exc, exc.__traceback__)) 5008 5009 def test_exception_excinfo(self): 5010 try: 5011 1 / 0 5012 except ZeroDivisionError as e: 5013 exc = e 5014 5015 self.adapter.exception('exc_info test', exc_info=exc) 5016 5017 self.assertEqual(len(self.recording.records), 1) 5018 record = self.recording.records[0] 5019 self.assertEqual(record.exc_info, 5020 (exc.__class__, exc, exc.__traceback__)) 5021 5022 def test_critical(self): 5023 msg = 'critical test! %r' 5024 self.adapter.critical(msg, self.recording) 5025 5026 self.assertEqual(len(self.recording.records), 1) 5027 record = self.recording.records[0] 5028 self.assertEqual(record.levelno, logging.CRITICAL) 5029 self.assertEqual(record.msg, msg) 5030 self.assertEqual(record.args, (self.recording,)) 5031 5032 def test_is_enabled_for(self): 5033 old_disable = self.adapter.logger.manager.disable 5034 self.adapter.logger.manager.disable = 33 5035 self.addCleanup(setattr, self.adapter.logger.manager, 'disable', 5036 old_disable) 5037 self.assertFalse(self.adapter.isEnabledFor(32)) 5038 5039 def test_has_handlers(self): 5040 self.assertTrue(self.adapter.hasHandlers()) 5041 5042 for handler in self.logger.handlers: 5043 self.logger.removeHandler(handler) 5044 5045 self.assertFalse(self.logger.hasHandlers()) 5046 self.assertFalse(self.adapter.hasHandlers()) 5047 5048 def test_nested(self): 5049 class Adapter(logging.LoggerAdapter): 5050 prefix = 'Adapter' 5051 5052 def process(self, msg, kwargs): 5053 return f"{self.prefix} {msg}", kwargs 5054 5055 msg = 'Adapters can be nested, yo.' 5056 adapter = Adapter(logger=self.logger, extra=None) 5057 adapter_adapter = Adapter(logger=adapter, extra=None) 5058 adapter_adapter.prefix = 'AdapterAdapter' 5059 self.assertEqual(repr(adapter), repr(adapter_adapter)) 5060 adapter_adapter.log(logging.CRITICAL, msg, self.recording) 5061 self.assertEqual(len(self.recording.records), 1) 5062 record = self.recording.records[0] 5063 self.assertEqual(record.levelno, logging.CRITICAL) 5064 self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}") 5065 self.assertEqual(record.args, (self.recording,)) 5066 orig_manager = adapter_adapter.manager 5067 self.assertIs(adapter.manager, orig_manager) 5068 self.assertIs(self.logger.manager, orig_manager) 5069 temp_manager = object() 5070 try: 5071 adapter_adapter.manager = temp_manager 5072 self.assertIs(adapter_adapter.manager, temp_manager) 5073 self.assertIs(adapter.manager, temp_manager) 5074 self.assertIs(self.logger.manager, temp_manager) 5075 finally: 5076 adapter_adapter.manager = orig_manager 5077 self.assertIs(adapter_adapter.manager, orig_manager) 5078 self.assertIs(adapter.manager, orig_manager) 5079 self.assertIs(self.logger.manager, orig_manager) 5080 5081 5082class LoggerTest(BaseTest, AssertErrorMessage): 5083 5084 def setUp(self): 5085 super(LoggerTest, self).setUp() 5086 self.recording = RecordingHandler() 5087 self.logger = logging.Logger(name='blah') 5088 self.logger.addHandler(self.recording) 5089 self.addCleanup(self.logger.removeHandler, self.recording) 5090 self.addCleanup(self.recording.close) 5091 self.addCleanup(logging.shutdown) 5092 5093 def test_set_invalid_level(self): 5094 self.assert_error_message( 5095 TypeError, 'Level not an integer or a valid string: None', 5096 self.logger.setLevel, None) 5097 self.assert_error_message( 5098 TypeError, 'Level not an integer or a valid string: (0, 0)', 5099 self.logger.setLevel, (0, 0)) 5100 5101 def test_exception(self): 5102 msg = 'testing exception: %r' 5103 exc = None 5104 try: 5105 1 / 0 5106 except ZeroDivisionError as e: 5107 exc = e 5108 self.logger.exception(msg, self.recording) 5109 5110 self.assertEqual(len(self.recording.records), 1) 5111 record = self.recording.records[0] 5112 self.assertEqual(record.levelno, logging.ERROR) 5113 self.assertEqual(record.msg, msg) 5114 self.assertEqual(record.args, (self.recording,)) 5115 self.assertEqual(record.exc_info, 5116 (exc.__class__, exc, exc.__traceback__)) 5117 5118 def test_log_invalid_level_with_raise(self): 5119 with support.swap_attr(logging, 'raiseExceptions', True): 5120 self.assertRaises(TypeError, self.logger.log, '10', 'test message') 5121 5122 def test_log_invalid_level_no_raise(self): 5123 with support.swap_attr(logging, 'raiseExceptions', False): 5124 self.logger.log('10', 'test message') # no exception happens 5125 5126 def test_find_caller_with_stack_info(self): 5127 called = [] 5128 support.patch(self, logging.traceback, 'print_stack', 5129 lambda f, file: called.append(file.getvalue())) 5130 5131 self.logger.findCaller(stack_info=True) 5132 5133 self.assertEqual(len(called), 1) 5134 self.assertEqual('Stack (most recent call last):\n', called[0]) 5135 5136 def test_find_caller_with_stacklevel(self): 5137 the_level = 1 5138 trigger = self.logger.warning 5139 5140 def innermost(): 5141 trigger('test', stacklevel=the_level) 5142 5143 def inner(): 5144 innermost() 5145 5146 def outer(): 5147 inner() 5148 5149 records = self.recording.records 5150 outer() 5151 self.assertEqual(records[-1].funcName, 'innermost') 5152 lineno = records[-1].lineno 5153 the_level += 1 5154 outer() 5155 self.assertEqual(records[-1].funcName, 'inner') 5156 self.assertGreater(records[-1].lineno, lineno) 5157 lineno = records[-1].lineno 5158 the_level += 1 5159 outer() 5160 self.assertEqual(records[-1].funcName, 'outer') 5161 self.assertGreater(records[-1].lineno, lineno) 5162 lineno = records[-1].lineno 5163 root_logger = logging.getLogger() 5164 root_logger.addHandler(self.recording) 5165 trigger = logging.warning 5166 outer() 5167 self.assertEqual(records[-1].funcName, 'outer') 5168 root_logger.removeHandler(self.recording) 5169 trigger = self.logger.warning 5170 the_level += 1 5171 outer() 5172 self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel') 5173 self.assertGreater(records[-1].lineno, lineno) 5174 5175 def test_make_record_with_extra_overwrite(self): 5176 name = 'my record' 5177 level = 13 5178 fn = lno = msg = args = exc_info = func = sinfo = None 5179 rv = logging._logRecordFactory(name, level, fn, lno, msg, args, 5180 exc_info, func, sinfo) 5181 5182 for key in ('message', 'asctime') + tuple(rv.__dict__.keys()): 5183 extra = {key: 'some value'} 5184 self.assertRaises(KeyError, self.logger.makeRecord, name, level, 5185 fn, lno, msg, args, exc_info, 5186 extra=extra, sinfo=sinfo) 5187 5188 def test_make_record_with_extra_no_overwrite(self): 5189 name = 'my record' 5190 level = 13 5191 fn = lno = msg = args = exc_info = func = sinfo = None 5192 extra = {'valid_key': 'some value'} 5193 result = self.logger.makeRecord(name, level, fn, lno, msg, args, 5194 exc_info, extra=extra, sinfo=sinfo) 5195 self.assertIn('valid_key', result.__dict__) 5196 5197 def test_has_handlers(self): 5198 self.assertTrue(self.logger.hasHandlers()) 5199 5200 for handler in self.logger.handlers: 5201 self.logger.removeHandler(handler) 5202 self.assertFalse(self.logger.hasHandlers()) 5203 5204 def test_has_handlers_no_propagate(self): 5205 child_logger = logging.getLogger('blah.child') 5206 child_logger.propagate = False 5207 self.assertFalse(child_logger.hasHandlers()) 5208 5209 def test_is_enabled_for(self): 5210 old_disable = self.logger.manager.disable 5211 self.logger.manager.disable = 23 5212 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 5213 self.assertFalse(self.logger.isEnabledFor(22)) 5214 5215 def test_is_enabled_for_disabled_logger(self): 5216 old_disabled = self.logger.disabled 5217 old_disable = self.logger.manager.disable 5218 5219 self.logger.disabled = True 5220 self.logger.manager.disable = 21 5221 5222 self.addCleanup(setattr, self.logger, 'disabled', old_disabled) 5223 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 5224 5225 self.assertFalse(self.logger.isEnabledFor(22)) 5226 5227 def test_root_logger_aliases(self): 5228 root = logging.getLogger() 5229 self.assertIs(root, logging.root) 5230 self.assertIs(root, logging.getLogger(None)) 5231 self.assertIs(root, logging.getLogger('')) 5232 self.assertIs(root, logging.getLogger('root')) 5233 self.assertIs(root, logging.getLogger('foo').root) 5234 self.assertIs(root, logging.getLogger('foo.bar').root) 5235 self.assertIs(root, logging.getLogger('foo').parent) 5236 5237 self.assertIsNot(root, logging.getLogger('\0')) 5238 self.assertIsNot(root, logging.getLogger('foo.bar').parent) 5239 5240 def test_invalid_names(self): 5241 self.assertRaises(TypeError, logging.getLogger, any) 5242 self.assertRaises(TypeError, logging.getLogger, b'foo') 5243 5244 def test_pickling(self): 5245 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 5246 for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'): 5247 logger = logging.getLogger(name) 5248 s = pickle.dumps(logger, proto) 5249 unpickled = pickle.loads(s) 5250 self.assertIs(unpickled, logger) 5251 5252 def test_caching(self): 5253 root = self.root_logger 5254 logger1 = logging.getLogger("abc") 5255 logger2 = logging.getLogger("abc.def") 5256 5257 # Set root logger level and ensure cache is empty 5258 root.setLevel(logging.ERROR) 5259 self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR) 5260 self.assertEqual(logger2._cache, {}) 5261 5262 # Ensure cache is populated and calls are consistent 5263 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 5264 self.assertFalse(logger2.isEnabledFor(logging.DEBUG)) 5265 self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False}) 5266 self.assertEqual(root._cache, {}) 5267 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 5268 5269 # Ensure root cache gets populated 5270 self.assertEqual(root._cache, {}) 5271 self.assertTrue(root.isEnabledFor(logging.ERROR)) 5272 self.assertEqual(root._cache, {logging.ERROR: True}) 5273 5274 # Set parent logger level and ensure caches are emptied 5275 logger1.setLevel(logging.CRITICAL) 5276 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 5277 self.assertEqual(logger2._cache, {}) 5278 5279 # Ensure logger2 uses parent logger's effective level 5280 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 5281 5282 # Set level to NOTSET and ensure caches are empty 5283 logger2.setLevel(logging.NOTSET) 5284 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 5285 self.assertEqual(logger2._cache, {}) 5286 self.assertEqual(logger1._cache, {}) 5287 self.assertEqual(root._cache, {}) 5288 5289 # Verify logger2 follows parent and not root 5290 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 5291 self.assertTrue(logger2.isEnabledFor(logging.CRITICAL)) 5292 self.assertFalse(logger1.isEnabledFor(logging.ERROR)) 5293 self.assertTrue(logger1.isEnabledFor(logging.CRITICAL)) 5294 self.assertTrue(root.isEnabledFor(logging.ERROR)) 5295 5296 # Disable logging in manager and ensure caches are clear 5297 logging.disable() 5298 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 5299 self.assertEqual(logger2._cache, {}) 5300 self.assertEqual(logger1._cache, {}) 5301 self.assertEqual(root._cache, {}) 5302 5303 # Ensure no loggers are enabled 5304 self.assertFalse(logger1.isEnabledFor(logging.CRITICAL)) 5305 self.assertFalse(logger2.isEnabledFor(logging.CRITICAL)) 5306 self.assertFalse(root.isEnabledFor(logging.CRITICAL)) 5307 5308 5309class BaseFileTest(BaseTest): 5310 "Base class for handler tests that write log files" 5311 5312 def setUp(self): 5313 BaseTest.setUp(self) 5314 fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-") 5315 os.close(fd) 5316 self.rmfiles = [] 5317 5318 def tearDown(self): 5319 for fn in self.rmfiles: 5320 os.unlink(fn) 5321 if os.path.exists(self.fn): 5322 os.unlink(self.fn) 5323 BaseTest.tearDown(self) 5324 5325 def assertLogFile(self, filename): 5326 "Assert a log file is there and register it for deletion" 5327 self.assertTrue(os.path.exists(filename), 5328 msg="Log file %r does not exist" % filename) 5329 self.rmfiles.append(filename) 5330 5331 def next_rec(self): 5332 return logging.LogRecord('n', logging.DEBUG, 'p', 1, 5333 self.next_message(), None, None, None) 5334 5335class FileHandlerTest(BaseFileTest): 5336 def test_delay(self): 5337 os.unlink(self.fn) 5338 fh = logging.FileHandler(self.fn, encoding='utf-8', delay=True) 5339 self.assertIsNone(fh.stream) 5340 self.assertFalse(os.path.exists(self.fn)) 5341 fh.handle(logging.makeLogRecord({})) 5342 self.assertIsNotNone(fh.stream) 5343 self.assertTrue(os.path.exists(self.fn)) 5344 fh.close() 5345 5346 def test_emit_after_closing_in_write_mode(self): 5347 # Issue #42378 5348 os.unlink(self.fn) 5349 fh = logging.FileHandler(self.fn, encoding='utf-8', mode='w') 5350 fh.setFormatter(logging.Formatter('%(message)s')) 5351 fh.emit(self.next_rec()) # '1' 5352 fh.close() 5353 fh.emit(self.next_rec()) # '2' 5354 with open(self.fn) as fp: 5355 self.assertEqual(fp.read().strip(), '1') 5356 5357class RotatingFileHandlerTest(BaseFileTest): 5358 @unittest.skipIf(support.is_wasi, "WASI does not have /dev/null.") 5359 def test_should_not_rollover(self): 5360 # If maxbytes is zero rollover never occurs 5361 rh = logging.handlers.RotatingFileHandler( 5362 self.fn, encoding="utf-8", maxBytes=0) 5363 self.assertFalse(rh.shouldRollover(None)) 5364 rh.close() 5365 # bpo-45401 - test with special file 5366 # We set maxBytes to 1 so that rollover would normally happen, except 5367 # for the check for regular files 5368 rh = logging.handlers.RotatingFileHandler( 5369 os.devnull, encoding="utf-8", maxBytes=1) 5370 self.assertFalse(rh.shouldRollover(self.next_rec())) 5371 rh.close() 5372 5373 def test_should_rollover(self): 5374 rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8", maxBytes=1) 5375 self.assertTrue(rh.shouldRollover(self.next_rec())) 5376 rh.close() 5377 5378 def test_file_created(self): 5379 # checks that the file is created and assumes it was created 5380 # by us 5381 rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8") 5382 rh.emit(self.next_rec()) 5383 self.assertLogFile(self.fn) 5384 rh.close() 5385 5386 def test_rollover_filenames(self): 5387 def namer(name): 5388 return name + ".test" 5389 rh = logging.handlers.RotatingFileHandler( 5390 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 5391 rh.namer = namer 5392 rh.emit(self.next_rec()) 5393 self.assertLogFile(self.fn) 5394 rh.emit(self.next_rec()) 5395 self.assertLogFile(namer(self.fn + ".1")) 5396 rh.emit(self.next_rec()) 5397 self.assertLogFile(namer(self.fn + ".2")) 5398 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 5399 rh.close() 5400 5401 def test_namer_rotator_inheritance(self): 5402 class HandlerWithNamerAndRotator(logging.handlers.RotatingFileHandler): 5403 def namer(self, name): 5404 return name + ".test" 5405 5406 def rotator(self, source, dest): 5407 if os.path.exists(source): 5408 os.replace(source, dest + ".rotated") 5409 5410 rh = HandlerWithNamerAndRotator( 5411 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 5412 self.assertEqual(rh.namer(self.fn), self.fn + ".test") 5413 rh.emit(self.next_rec()) 5414 self.assertLogFile(self.fn) 5415 rh.emit(self.next_rec()) 5416 self.assertLogFile(rh.namer(self.fn + ".1") + ".rotated") 5417 self.assertFalse(os.path.exists(rh.namer(self.fn + ".1"))) 5418 rh.close() 5419 5420 @support.requires_zlib() 5421 def test_rotator(self): 5422 def namer(name): 5423 return name + ".gz" 5424 5425 def rotator(source, dest): 5426 with open(source, "rb") as sf: 5427 data = sf.read() 5428 compressed = zlib.compress(data, 9) 5429 with open(dest, "wb") as df: 5430 df.write(compressed) 5431 os.remove(source) 5432 5433 rh = logging.handlers.RotatingFileHandler( 5434 self.fn, encoding="utf-8", backupCount=2, maxBytes=1) 5435 rh.rotator = rotator 5436 rh.namer = namer 5437 m1 = self.next_rec() 5438 rh.emit(m1) 5439 self.assertLogFile(self.fn) 5440 m2 = self.next_rec() 5441 rh.emit(m2) 5442 fn = namer(self.fn + ".1") 5443 self.assertLogFile(fn) 5444 newline = os.linesep 5445 with open(fn, "rb") as f: 5446 compressed = f.read() 5447 data = zlib.decompress(compressed) 5448 self.assertEqual(data.decode("ascii"), m1.msg + newline) 5449 rh.emit(self.next_rec()) 5450 fn = namer(self.fn + ".2") 5451 self.assertLogFile(fn) 5452 with open(fn, "rb") as f: 5453 compressed = f.read() 5454 data = zlib.decompress(compressed) 5455 self.assertEqual(data.decode("ascii"), m1.msg + newline) 5456 rh.emit(self.next_rec()) 5457 fn = namer(self.fn + ".2") 5458 with open(fn, "rb") as f: 5459 compressed = f.read() 5460 data = zlib.decompress(compressed) 5461 self.assertEqual(data.decode("ascii"), m2.msg + newline) 5462 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 5463 rh.close() 5464 5465class TimedRotatingFileHandlerTest(BaseFileTest): 5466 @unittest.skipIf(support.is_wasi, "WASI does not have /dev/null.") 5467 def test_should_not_rollover(self): 5468 # See bpo-45401. Should only ever rollover regular files 5469 fh = logging.handlers.TimedRotatingFileHandler( 5470 os.devnull, 'S', encoding="utf-8", backupCount=1) 5471 time.sleep(1.1) # a little over a second ... 5472 r = logging.makeLogRecord({'msg': 'testing - device file'}) 5473 self.assertFalse(fh.shouldRollover(r)) 5474 fh.close() 5475 5476 # other test methods added below 5477 def test_rollover(self): 5478 fh = logging.handlers.TimedRotatingFileHandler( 5479 self.fn, 'S', encoding="utf-8", backupCount=1) 5480 fmt = logging.Formatter('%(asctime)s %(message)s') 5481 fh.setFormatter(fmt) 5482 r1 = logging.makeLogRecord({'msg': 'testing - initial'}) 5483 fh.emit(r1) 5484 self.assertLogFile(self.fn) 5485 time.sleep(1.1) # a little over a second ... 5486 r2 = logging.makeLogRecord({'msg': 'testing - after delay'}) 5487 fh.emit(r2) 5488 fh.close() 5489 # At this point, we should have a recent rotated file which we 5490 # can test for the existence of. However, in practice, on some 5491 # machines which run really slowly, we don't know how far back 5492 # in time to go to look for the log file. So, we go back a fair 5493 # bit, and stop as soon as we see a rotated file. In theory this 5494 # could of course still fail, but the chances are lower. 5495 found = False 5496 now = datetime.datetime.now() 5497 GO_BACK = 5 * 60 # seconds 5498 for secs in range(GO_BACK): 5499 prev = now - datetime.timedelta(seconds=secs) 5500 fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S") 5501 found = os.path.exists(fn) 5502 if found: 5503 self.rmfiles.append(fn) 5504 break 5505 msg = 'No rotated files found, went back %d seconds' % GO_BACK 5506 if not found: 5507 # print additional diagnostics 5508 dn, fn = os.path.split(self.fn) 5509 files = [f for f in os.listdir(dn) if f.startswith(fn)] 5510 print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr) 5511 print('The only matching files are: %s' % files, file=sys.stderr) 5512 for f in files: 5513 print('Contents of %s:' % f) 5514 path = os.path.join(dn, f) 5515 with open(path, 'r') as tf: 5516 print(tf.read()) 5517 self.assertTrue(found, msg=msg) 5518 5519 def test_invalid(self): 5520 assertRaises = self.assertRaises 5521 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5522 self.fn, 'X', encoding="utf-8", delay=True) 5523 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5524 self.fn, 'W', encoding="utf-8", delay=True) 5525 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 5526 self.fn, 'W7', encoding="utf-8", delay=True) 5527 5528 def test_compute_rollover_daily_attime(self): 5529 currentTime = 0 5530 atTime = datetime.time(12, 0, 0) 5531 rh = logging.handlers.TimedRotatingFileHandler( 5532 self.fn, encoding="utf-8", when='MIDNIGHT', interval=1, backupCount=0, 5533 utc=True, atTime=atTime) 5534 try: 5535 actual = rh.computeRollover(currentTime) 5536 self.assertEqual(actual, currentTime + 12 * 60 * 60) 5537 5538 actual = rh.computeRollover(currentTime + 13 * 60 * 60) 5539 self.assertEqual(actual, currentTime + 36 * 60 * 60) 5540 finally: 5541 rh.close() 5542 5543 #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.') 5544 def test_compute_rollover_weekly_attime(self): 5545 currentTime = int(time.time()) 5546 today = currentTime - currentTime % 86400 5547 5548 atTime = datetime.time(12, 0, 0) 5549 5550 wday = time.gmtime(today).tm_wday 5551 for day in range(7): 5552 rh = logging.handlers.TimedRotatingFileHandler( 5553 self.fn, encoding="utf-8", when='W%d' % day, interval=1, backupCount=0, 5554 utc=True, atTime=atTime) 5555 try: 5556 if wday > day: 5557 # The rollover day has already passed this week, so we 5558 # go over into next week 5559 expected = (7 - wday + day) 5560 else: 5561 expected = (day - wday) 5562 # At this point expected is in days from now, convert to seconds 5563 expected *= 24 * 60 * 60 5564 # Add in the rollover time 5565 expected += 12 * 60 * 60 5566 # Add in adjustment for today 5567 expected += today 5568 actual = rh.computeRollover(today) 5569 if actual != expected: 5570 print('failed in timezone: %d' % time.timezone) 5571 print('local vars: %s' % locals()) 5572 self.assertEqual(actual, expected) 5573 if day == wday: 5574 # goes into following week 5575 expected += 7 * 24 * 60 * 60 5576 actual = rh.computeRollover(today + 13 * 60 * 60) 5577 if actual != expected: 5578 print('failed in timezone: %d' % time.timezone) 5579 print('local vars: %s' % locals()) 5580 self.assertEqual(actual, expected) 5581 finally: 5582 rh.close() 5583 5584 def test_compute_files_to_delete(self): 5585 # See bpo-46063 for background 5586 wd = tempfile.mkdtemp(prefix='test_logging_') 5587 self.addCleanup(shutil.rmtree, wd) 5588 times = [] 5589 dt = datetime.datetime.now() 5590 for i in range(10): 5591 times.append(dt.strftime('%Y-%m-%d_%H-%M-%S')) 5592 dt += datetime.timedelta(seconds=5) 5593 prefixes = ('a.b', 'a.b.c', 'd.e', 'd.e.f') 5594 files = [] 5595 rotators = [] 5596 for prefix in prefixes: 5597 p = os.path.join(wd, '%s.log' % prefix) 5598 rotator = logging.handlers.TimedRotatingFileHandler(p, when='s', 5599 interval=5, 5600 backupCount=7, 5601 delay=True) 5602 rotators.append(rotator) 5603 if prefix.startswith('a.b'): 5604 for t in times: 5605 files.append('%s.log.%s' % (prefix, t)) 5606 else: 5607 rotator.namer = lambda name: name.replace('.log', '') + '.log' 5608 for t in times: 5609 files.append('%s.%s.log' % (prefix, t)) 5610 # Create empty files 5611 for fn in files: 5612 p = os.path.join(wd, fn) 5613 with open(p, 'wb') as f: 5614 pass 5615 # Now the checks that only the correct files are offered up for deletion 5616 for i, prefix in enumerate(prefixes): 5617 rotator = rotators[i] 5618 candidates = rotator.getFilesToDelete() 5619 self.assertEqual(len(candidates), 3) 5620 if prefix.startswith('a.b'): 5621 p = '%s.log.' % prefix 5622 for c in candidates: 5623 d, fn = os.path.split(c) 5624 self.assertTrue(fn.startswith(p)) 5625 else: 5626 for c in candidates: 5627 d, fn = os.path.split(c) 5628 self.assertTrue(fn.endswith('.log')) 5629 self.assertTrue(fn.startswith(prefix + '.') and 5630 fn[len(prefix) + 2].isdigit()) 5631 5632 5633def secs(**kw): 5634 return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) 5635 5636for when, exp in (('S', 1), 5637 ('M', 60), 5638 ('H', 60 * 60), 5639 ('D', 60 * 60 * 24), 5640 ('MIDNIGHT', 60 * 60 * 24), 5641 # current time (epoch start) is a Thursday, W0 means Monday 5642 ('W0', secs(days=4, hours=24)), 5643 ): 5644 def test_compute_rollover(self, when=when, exp=exp): 5645 rh = logging.handlers.TimedRotatingFileHandler( 5646 self.fn, encoding="utf-8", when=when, interval=1, backupCount=0, utc=True) 5647 currentTime = 0.0 5648 actual = rh.computeRollover(currentTime) 5649 if exp != actual: 5650 # Failures occur on some systems for MIDNIGHT and W0. 5651 # Print detailed calculation for MIDNIGHT so we can try to see 5652 # what's going on 5653 if when == 'MIDNIGHT': 5654 try: 5655 if rh.utc: 5656 t = time.gmtime(currentTime) 5657 else: 5658 t = time.localtime(currentTime) 5659 currentHour = t[3] 5660 currentMinute = t[4] 5661 currentSecond = t[5] 5662 # r is the number of seconds left between now and midnight 5663 r = logging.handlers._MIDNIGHT - ((currentHour * 60 + 5664 currentMinute) * 60 + 5665 currentSecond) 5666 result = currentTime + r 5667 print('t: %s (%s)' % (t, rh.utc), file=sys.stderr) 5668 print('currentHour: %s' % currentHour, file=sys.stderr) 5669 print('currentMinute: %s' % currentMinute, file=sys.stderr) 5670 print('currentSecond: %s' % currentSecond, file=sys.stderr) 5671 print('r: %s' % r, file=sys.stderr) 5672 print('result: %s' % result, file=sys.stderr) 5673 except Exception as e: 5674 print('exception in diagnostic code: %s' % e, file=sys.stderr) 5675 self.assertEqual(exp, actual) 5676 rh.close() 5677 setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) 5678 5679 5680@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.') 5681class NTEventLogHandlerTest(BaseTest): 5682 def test_basic(self): 5683 logtype = 'Application' 5684 elh = win32evtlog.OpenEventLog(None, logtype) 5685 num_recs = win32evtlog.GetNumberOfEventLogRecords(elh) 5686 5687 try: 5688 h = logging.handlers.NTEventLogHandler('test_logging') 5689 except pywintypes.error as e: 5690 if e.winerror == 5: # access denied 5691 raise unittest.SkipTest('Insufficient privileges to run test') 5692 raise 5693 5694 r = logging.makeLogRecord({'msg': 'Test Log Message'}) 5695 h.handle(r) 5696 h.close() 5697 # Now see if the event is recorded 5698 self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh)) 5699 flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ 5700 win32evtlog.EVENTLOG_SEQUENTIAL_READ 5701 found = False 5702 GO_BACK = 100 5703 events = win32evtlog.ReadEventLog(elh, flags, GO_BACK) 5704 for e in events: 5705 if e.SourceName != 'test_logging': 5706 continue 5707 msg = win32evtlogutil.SafeFormatMessage(e, logtype) 5708 if msg != 'Test Log Message\r\n': 5709 continue 5710 found = True 5711 break 5712 msg = 'Record not found in event log, went back %d records' % GO_BACK 5713 self.assertTrue(found, msg=msg) 5714 5715 5716class MiscTestCase(unittest.TestCase): 5717 def test__all__(self): 5718 not_exported = { 5719 'logThreads', 'logMultiprocessing', 'logProcesses', 'currentframe', 5720 'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle', 5721 'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 'root', 5722 'threading'} 5723 support.check__all__(self, logging, not_exported=not_exported) 5724 5725 5726# Set the locale to the platform-dependent default. I have no idea 5727# why the test does this, but in any case we save the current locale 5728# first and restore it at the end. 5729def setUpModule(): 5730 unittest.enterModuleContext(support.run_with_locale('LC_ALL', '')) 5731 5732 5733if __name__ == "__main__": 5734 unittest.main() 5735