1# Copyright 2001-2022 by Vinay Sajip. All Rights Reserved.
2#
3# Permission to use, copy, modify, and distribute this software and its
4# documentation for any purpose and without fee is hereby granted,
5# provided that the above copyright notice appear in all copies and that
6# both that copyright notice and this permission notice appear in
7# supporting documentation, and that the name of Vinay Sajip
8# not be used in advertising or publicity pertaining to distribution
9# of the software without specific, written prior permission.
10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16
17"""Test harness for the logging module. Run all tests.
18
19Copyright (C) 2001-2022 Vinay Sajip. All Rights Reserved.
20"""
21
22import logging
23import logging.handlers
24import logging.config
25
26import codecs
27import configparser
28import copy
29import datetime
30import pathlib
31import pickle
32import io
33import gc
34import json
35import os
36import queue
37import random
38import re
39import shutil
40import socket
41import struct
42import sys
43import tempfile
44from test.support.script_helper import assert_python_ok, assert_python_failure
45from test import support
46from test.support import os_helper
47from test.support import socket_helper
48from test.support import threading_helper
49from test.support import warnings_helper
50from test.support.logging_helper import TestHandler
51import textwrap
52import threading
53import time
54import unittest
55import warnings
56import weakref
57
58from http.server import HTTPServer, BaseHTTPRequestHandler
59from urllib.parse import urlparse, parse_qs
60from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
61                          ThreadingTCPServer, StreamRequestHandler)
62
63
64asyncore = warnings_helper.import_deprecated('asyncore')
65smtpd = warnings_helper.import_deprecated('smtpd')
66
67
68try:
69    import win32evtlog, win32evtlogutil, pywintypes
70except ImportError:
71    win32evtlog = win32evtlogutil = pywintypes = None
72
73try:
74    import zlib
75except ImportError:
76    pass
77
78
79class BaseTest(unittest.TestCase):
80
81    """Base class for logging tests."""
82
83    log_format = "%(name)s -> %(levelname)s: %(message)s"
84    expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
85    message_num = 0
86
87    def setUp(self):
88        """Setup the default logging stream to an internal StringIO instance,
89        so that we can examine log output as we want."""
90        self._threading_key = threading_helper.threading_setup()
91
92        logger_dict = logging.getLogger().manager.loggerDict
93        logging._acquireLock()
94        try:
95            self.saved_handlers = logging._handlers.copy()
96            self.saved_handler_list = logging._handlerList[:]
97            self.saved_loggers = saved_loggers = logger_dict.copy()
98            self.saved_name_to_level = logging._nameToLevel.copy()
99            self.saved_level_to_name = logging._levelToName.copy()
100            self.logger_states = logger_states = {}
101            for name in saved_loggers:
102                logger_states[name] = getattr(saved_loggers[name],
103                                              'disabled', None)
104        finally:
105            logging._releaseLock()
106
107        # Set two unused loggers
108        self.logger1 = logging.getLogger("\xab\xd7\xbb")
109        self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
110
111        self.root_logger = logging.getLogger("")
112        self.original_logging_level = self.root_logger.getEffectiveLevel()
113
114        self.stream = io.StringIO()
115        self.root_logger.setLevel(logging.DEBUG)
116        self.root_hdlr = logging.StreamHandler(self.stream)
117        self.root_formatter = logging.Formatter(self.log_format)
118        self.root_hdlr.setFormatter(self.root_formatter)
119        if self.logger1.hasHandlers():
120            hlist = self.logger1.handlers + self.root_logger.handlers
121            raise AssertionError('Unexpected handlers: %s' % hlist)
122        if self.logger2.hasHandlers():
123            hlist = self.logger2.handlers + self.root_logger.handlers
124            raise AssertionError('Unexpected handlers: %s' % hlist)
125        self.root_logger.addHandler(self.root_hdlr)
126        self.assertTrue(self.logger1.hasHandlers())
127        self.assertTrue(self.logger2.hasHandlers())
128
129    def tearDown(self):
130        """Remove our logging stream, and restore the original logging
131        level."""
132        self.stream.close()
133        self.root_logger.removeHandler(self.root_hdlr)
134        while self.root_logger.handlers:
135            h = self.root_logger.handlers[0]
136            self.root_logger.removeHandler(h)
137            h.close()
138        self.root_logger.setLevel(self.original_logging_level)
139        logging._acquireLock()
140        try:
141            logging._levelToName.clear()
142            logging._levelToName.update(self.saved_level_to_name)
143            logging._nameToLevel.clear()
144            logging._nameToLevel.update(self.saved_name_to_level)
145            logging._handlers.clear()
146            logging._handlers.update(self.saved_handlers)
147            logging._handlerList[:] = self.saved_handler_list
148            manager = logging.getLogger().manager
149            manager.disable = 0
150            loggerDict = manager.loggerDict
151            loggerDict.clear()
152            loggerDict.update(self.saved_loggers)
153            logger_states = self.logger_states
154            for name in self.logger_states:
155                if logger_states[name] is not None:
156                    self.saved_loggers[name].disabled = logger_states[name]
157        finally:
158            logging._releaseLock()
159
160        self.doCleanups()
161        threading_helper.threading_cleanup(*self._threading_key)
162
163    def assert_log_lines(self, expected_values, stream=None, pat=None):
164        """Match the collected log lines against the regular expression
165        self.expected_log_pat, and compare the extracted group values to
166        the expected_values list of tuples."""
167        stream = stream or self.stream
168        pat = re.compile(pat or self.expected_log_pat)
169        actual_lines = stream.getvalue().splitlines()
170        self.assertEqual(len(actual_lines), len(expected_values))
171        for actual, expected in zip(actual_lines, expected_values):
172            match = pat.search(actual)
173            if not match:
174                self.fail("Log line does not match expected pattern:\n" +
175                            actual)
176            self.assertEqual(tuple(match.groups()), expected)
177        s = stream.read()
178        if s:
179            self.fail("Remaining output at end of log stream:\n" + s)
180
181    def next_message(self):
182        """Generate a message consisting solely of an auto-incrementing
183        integer."""
184        self.message_num += 1
185        return "%d" % self.message_num
186
187
188class BuiltinLevelsTest(BaseTest):
189    """Test builtin levels and their inheritance."""
190
191    def test_flat(self):
192        # Logging levels in a flat logger namespace.
193        m = self.next_message
194
195        ERR = logging.getLogger("ERR")
196        ERR.setLevel(logging.ERROR)
197        INF = logging.LoggerAdapter(logging.getLogger("INF"), {})
198        INF.setLevel(logging.INFO)
199        DEB = logging.getLogger("DEB")
200        DEB.setLevel(logging.DEBUG)
201
202        # These should log.
203        ERR.log(logging.CRITICAL, m())
204        ERR.error(m())
205
206        INF.log(logging.CRITICAL, m())
207        INF.error(m())
208        INF.warning(m())
209        INF.info(m())
210
211        DEB.log(logging.CRITICAL, m())
212        DEB.error(m())
213        DEB.warning(m())
214        DEB.info(m())
215        DEB.debug(m())
216
217        # These should not log.
218        ERR.warning(m())
219        ERR.info(m())
220        ERR.debug(m())
221
222        INF.debug(m())
223
224        self.assert_log_lines([
225            ('ERR', 'CRITICAL', '1'),
226            ('ERR', 'ERROR', '2'),
227            ('INF', 'CRITICAL', '3'),
228            ('INF', 'ERROR', '4'),
229            ('INF', 'WARNING', '5'),
230            ('INF', 'INFO', '6'),
231            ('DEB', 'CRITICAL', '7'),
232            ('DEB', 'ERROR', '8'),
233            ('DEB', 'WARNING', '9'),
234            ('DEB', 'INFO', '10'),
235            ('DEB', 'DEBUG', '11'),
236        ])
237
238    def test_nested_explicit(self):
239        # Logging levels in a nested namespace, all explicitly set.
240        m = self.next_message
241
242        INF = logging.getLogger("INF")
243        INF.setLevel(logging.INFO)
244        INF_ERR  = logging.getLogger("INF.ERR")
245        INF_ERR.setLevel(logging.ERROR)
246
247        # These should log.
248        INF_ERR.log(logging.CRITICAL, m())
249        INF_ERR.error(m())
250
251        # These should not log.
252        INF_ERR.warning(m())
253        INF_ERR.info(m())
254        INF_ERR.debug(m())
255
256        self.assert_log_lines([
257            ('INF.ERR', 'CRITICAL', '1'),
258            ('INF.ERR', 'ERROR', '2'),
259        ])
260
261    def test_nested_inherited(self):
262        # Logging levels in a nested namespace, inherited from parent loggers.
263        m = self.next_message
264
265        INF = logging.getLogger("INF")
266        INF.setLevel(logging.INFO)
267        INF_ERR  = logging.getLogger("INF.ERR")
268        INF_ERR.setLevel(logging.ERROR)
269        INF_UNDEF = logging.getLogger("INF.UNDEF")
270        INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
271        UNDEF = logging.getLogger("UNDEF")
272
273        # These should log.
274        INF_UNDEF.log(logging.CRITICAL, m())
275        INF_UNDEF.error(m())
276        INF_UNDEF.warning(m())
277        INF_UNDEF.info(m())
278        INF_ERR_UNDEF.log(logging.CRITICAL, m())
279        INF_ERR_UNDEF.error(m())
280
281        # These should not log.
282        INF_UNDEF.debug(m())
283        INF_ERR_UNDEF.warning(m())
284        INF_ERR_UNDEF.info(m())
285        INF_ERR_UNDEF.debug(m())
286
287        self.assert_log_lines([
288            ('INF.UNDEF', 'CRITICAL', '1'),
289            ('INF.UNDEF', 'ERROR', '2'),
290            ('INF.UNDEF', 'WARNING', '3'),
291            ('INF.UNDEF', 'INFO', '4'),
292            ('INF.ERR.UNDEF', 'CRITICAL', '5'),
293            ('INF.ERR.UNDEF', 'ERROR', '6'),
294        ])
295
296    def test_nested_with_virtual_parent(self):
297        # Logging levels when some parent does not exist yet.
298        m = self.next_message
299
300        INF = logging.getLogger("INF")
301        GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
302        CHILD = logging.getLogger("INF.BADPARENT")
303        INF.setLevel(logging.INFO)
304
305        # These should log.
306        GRANDCHILD.log(logging.FATAL, m())
307        GRANDCHILD.info(m())
308        CHILD.log(logging.FATAL, m())
309        CHILD.info(m())
310
311        # These should not log.
312        GRANDCHILD.debug(m())
313        CHILD.debug(m())
314
315        self.assert_log_lines([
316            ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
317            ('INF.BADPARENT.UNDEF', 'INFO', '2'),
318            ('INF.BADPARENT', 'CRITICAL', '3'),
319            ('INF.BADPARENT', 'INFO', '4'),
320        ])
321
322    def test_regression_22386(self):
323        """See issue #22386 for more information."""
324        self.assertEqual(logging.getLevelName('INFO'), logging.INFO)
325        self.assertEqual(logging.getLevelName(logging.INFO), 'INFO')
326
327    def test_issue27935(self):
328        fatal = logging.getLevelName('FATAL')
329        self.assertEqual(fatal, logging.FATAL)
330
331    def test_regression_29220(self):
332        """See issue #29220 for more information."""
333        logging.addLevelName(logging.INFO, '')
334        self.addCleanup(logging.addLevelName, logging.INFO, 'INFO')
335        self.assertEqual(logging.getLevelName(logging.INFO), '')
336        self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET')
337        self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET)
338
339class BasicFilterTest(BaseTest):
340
341    """Test the bundled Filter class."""
342
343    def test_filter(self):
344        # Only messages satisfying the specified criteria pass through the
345        #  filter.
346        filter_ = logging.Filter("spam.eggs")
347        handler = self.root_logger.handlers[0]
348        try:
349            handler.addFilter(filter_)
350            spam = logging.getLogger("spam")
351            spam_eggs = logging.getLogger("spam.eggs")
352            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
353            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
354
355            spam.info(self.next_message())
356            spam_eggs.info(self.next_message())  # Good.
357            spam_eggs_fish.info(self.next_message())  # Good.
358            spam_bakedbeans.info(self.next_message())
359
360            self.assert_log_lines([
361                ('spam.eggs', 'INFO', '2'),
362                ('spam.eggs.fish', 'INFO', '3'),
363            ])
364        finally:
365            handler.removeFilter(filter_)
366
367    def test_callable_filter(self):
368        # Only messages satisfying the specified criteria pass through the
369        #  filter.
370
371        def filterfunc(record):
372            parts = record.name.split('.')
373            prefix = '.'.join(parts[:2])
374            return prefix == 'spam.eggs'
375
376        handler = self.root_logger.handlers[0]
377        try:
378            handler.addFilter(filterfunc)
379            spam = logging.getLogger("spam")
380            spam_eggs = logging.getLogger("spam.eggs")
381            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
382            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
383
384            spam.info(self.next_message())
385            spam_eggs.info(self.next_message())  # Good.
386            spam_eggs_fish.info(self.next_message())  # Good.
387            spam_bakedbeans.info(self.next_message())
388
389            self.assert_log_lines([
390                ('spam.eggs', 'INFO', '2'),
391                ('spam.eggs.fish', 'INFO', '3'),
392            ])
393        finally:
394            handler.removeFilter(filterfunc)
395
396    def test_empty_filter(self):
397        f = logging.Filter()
398        r = logging.makeLogRecord({'name': 'spam.eggs'})
399        self.assertTrue(f.filter(r))
400
401#
402#   First, we define our levels. There can be as many as you want - the only
403#     limitations are that they should be integers, the lowest should be > 0 and
404#   larger values mean less information being logged. If you need specific
405#   level values which do not fit into these limitations, you can use a
406#   mapping dictionary to convert between your application levels and the
407#   logging system.
408#
409SILENT      = 120
410TACITURN    = 119
411TERSE       = 118
412EFFUSIVE    = 117
413SOCIABLE    = 116
414VERBOSE     = 115
415TALKATIVE   = 114
416GARRULOUS   = 113
417CHATTERBOX  = 112
418BORING      = 111
419
420LEVEL_RANGE = range(BORING, SILENT + 1)
421
422#
423#   Next, we define names for our levels. You don't need to do this - in which
424#   case the system will use "Level n" to denote the text for the level.
425#
426my_logging_levels = {
427    SILENT      : 'Silent',
428    TACITURN    : 'Taciturn',
429    TERSE       : 'Terse',
430    EFFUSIVE    : 'Effusive',
431    SOCIABLE    : 'Sociable',
432    VERBOSE     : 'Verbose',
433    TALKATIVE   : 'Talkative',
434    GARRULOUS   : 'Garrulous',
435    CHATTERBOX  : 'Chatterbox',
436    BORING      : 'Boring',
437}
438
439class GarrulousFilter(logging.Filter):
440
441    """A filter which blocks garrulous messages."""
442
443    def filter(self, record):
444        return record.levelno != GARRULOUS
445
446class VerySpecificFilter(logging.Filter):
447
448    """A filter which blocks sociable and taciturn messages."""
449
450    def filter(self, record):
451        return record.levelno not in [SOCIABLE, TACITURN]
452
453
454class CustomLevelsAndFiltersTest(BaseTest):
455
456    """Test various filtering possibilities with custom logging levels."""
457
458    # Skip the logger name group.
459    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
460
461    def setUp(self):
462        BaseTest.setUp(self)
463        for k, v in my_logging_levels.items():
464            logging.addLevelName(k, v)
465
466    def log_at_all_levels(self, logger):
467        for lvl in LEVEL_RANGE:
468            logger.log(lvl, self.next_message())
469
470    def test_logger_filter(self):
471        # Filter at logger level.
472        self.root_logger.setLevel(VERBOSE)
473        # Levels >= 'Verbose' are good.
474        self.log_at_all_levels(self.root_logger)
475        self.assert_log_lines([
476            ('Verbose', '5'),
477            ('Sociable', '6'),
478            ('Effusive', '7'),
479            ('Terse', '8'),
480            ('Taciturn', '9'),
481            ('Silent', '10'),
482        ])
483
484    def test_handler_filter(self):
485        # Filter at handler level.
486        self.root_logger.handlers[0].setLevel(SOCIABLE)
487        try:
488            # Levels >= 'Sociable' are good.
489            self.log_at_all_levels(self.root_logger)
490            self.assert_log_lines([
491                ('Sociable', '6'),
492                ('Effusive', '7'),
493                ('Terse', '8'),
494                ('Taciturn', '9'),
495                ('Silent', '10'),
496            ])
497        finally:
498            self.root_logger.handlers[0].setLevel(logging.NOTSET)
499
500    def test_specific_filters(self):
501        # Set a specific filter object on the handler, and then add another
502        #  filter object on the logger itself.
503        handler = self.root_logger.handlers[0]
504        specific_filter = None
505        garr = GarrulousFilter()
506        handler.addFilter(garr)
507        try:
508            self.log_at_all_levels(self.root_logger)
509            first_lines = [
510                # Notice how 'Garrulous' is missing
511                ('Boring', '1'),
512                ('Chatterbox', '2'),
513                ('Talkative', '4'),
514                ('Verbose', '5'),
515                ('Sociable', '6'),
516                ('Effusive', '7'),
517                ('Terse', '8'),
518                ('Taciturn', '9'),
519                ('Silent', '10'),
520            ]
521            self.assert_log_lines(first_lines)
522
523            specific_filter = VerySpecificFilter()
524            self.root_logger.addFilter(specific_filter)
525            self.log_at_all_levels(self.root_logger)
526            self.assert_log_lines(first_lines + [
527                # Not only 'Garrulous' is still missing, but also 'Sociable'
528                # and 'Taciturn'
529                ('Boring', '11'),
530                ('Chatterbox', '12'),
531                ('Talkative', '14'),
532                ('Verbose', '15'),
533                ('Effusive', '17'),
534                ('Terse', '18'),
535                ('Silent', '20'),
536        ])
537        finally:
538            if specific_filter:
539                self.root_logger.removeFilter(specific_filter)
540            handler.removeFilter(garr)
541
542
543class HandlerTest(BaseTest):
544    def test_name(self):
545        h = logging.Handler()
546        h.name = 'generic'
547        self.assertEqual(h.name, 'generic')
548        h.name = 'anothergeneric'
549        self.assertEqual(h.name, 'anothergeneric')
550        self.assertRaises(NotImplementedError, h.emit, None)
551
552    def test_builtin_handlers(self):
553        # We can't actually *use* too many handlers in the tests,
554        # but we can try instantiating them with various options
555        if sys.platform in ('linux', 'darwin'):
556            for existing in (True, False):
557                fd, fn = tempfile.mkstemp()
558                os.close(fd)
559                if not existing:
560                    os.unlink(fn)
561                h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=True)
562                if existing:
563                    dev, ino = h.dev, h.ino
564                    self.assertEqual(dev, -1)
565                    self.assertEqual(ino, -1)
566                    r = logging.makeLogRecord({'msg': 'Test'})
567                    h.handle(r)
568                    # Now remove the file.
569                    os.unlink(fn)
570                    self.assertFalse(os.path.exists(fn))
571                    # The next call should recreate the file.
572                    h.handle(r)
573                    self.assertTrue(os.path.exists(fn))
574                else:
575                    self.assertEqual(h.dev, -1)
576                    self.assertEqual(h.ino, -1)
577                h.close()
578                if existing:
579                    os.unlink(fn)
580            if sys.platform == 'darwin':
581                sockname = '/var/run/syslog'
582            else:
583                sockname = '/dev/log'
584            try:
585                h = logging.handlers.SysLogHandler(sockname)
586                self.assertEqual(h.facility, h.LOG_USER)
587                self.assertTrue(h.unixsocket)
588                h.close()
589            except OSError: # syslogd might not be available
590                pass
591        for method in ('GET', 'POST', 'PUT'):
592            if method == 'PUT':
593                self.assertRaises(ValueError, logging.handlers.HTTPHandler,
594                                  'localhost', '/log', method)
595            else:
596                h = logging.handlers.HTTPHandler('localhost', '/log', method)
597                h.close()
598        h = logging.handlers.BufferingHandler(0)
599        r = logging.makeLogRecord({})
600        self.assertTrue(h.shouldFlush(r))
601        h.close()
602        h = logging.handlers.BufferingHandler(1)
603        self.assertFalse(h.shouldFlush(r))
604        h.close()
605
606    def test_path_objects(self):
607        """
608        Test that Path objects are accepted as filename arguments to handlers.
609
610        See Issue #27493.
611        """
612        fd, fn = tempfile.mkstemp()
613        os.close(fd)
614        os.unlink(fn)
615        pfn = pathlib.Path(fn)
616        cases = (
617                    (logging.FileHandler, (pfn, 'w')),
618                    (logging.handlers.RotatingFileHandler, (pfn, 'a')),
619                    (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')),
620                )
621        if sys.platform in ('linux', 'darwin'):
622            cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),)
623        for cls, args in cases:
624            h = cls(*args, encoding="utf-8")
625            self.assertTrue(os.path.exists(fn))
626            h.close()
627            os.unlink(fn)
628
629    @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
630    @unittest.skipIf(
631        support.is_emscripten, "Emscripten cannot fstat unlinked files."
632    )
633    @threading_helper.requires_working_threading()
634    def test_race(self):
635        # Issue #14632 refers.
636        def remove_loop(fname, tries):
637            for _ in range(tries):
638                try:
639                    os.unlink(fname)
640                    self.deletion_time = time.time()
641                except OSError:
642                    pass
643                time.sleep(0.004 * random.randint(0, 4))
644
645        del_count = 500
646        log_count = 500
647
648        self.handle_time = None
649        self.deletion_time = None
650
651        for delay in (False, True):
652            fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
653            os.close(fd)
654            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
655            remover.daemon = True
656            remover.start()
657            h = logging.handlers.WatchedFileHandler(fn, encoding='utf-8', delay=delay)
658            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
659            h.setFormatter(f)
660            try:
661                for _ in range(log_count):
662                    time.sleep(0.005)
663                    r = logging.makeLogRecord({'msg': 'testing' })
664                    try:
665                        self.handle_time = time.time()
666                        h.handle(r)
667                    except Exception:
668                        print('Deleted at %s, '
669                              'opened at %s' % (self.deletion_time,
670                                                self.handle_time))
671                        raise
672            finally:
673                remover.join()
674                h.close()
675                if os.path.exists(fn):
676                    os.unlink(fn)
677
678    # The implementation relies on os.register_at_fork existing, but we test
679    # based on os.fork existing because that is what users and this test use.
680    # This helps ensure that when fork exists (the important concept) that the
681    # register_at_fork mechanism is also present and used.
682    @support.requires_fork()
683    @threading_helper.requires_working_threading()
684    def test_post_fork_child_no_deadlock(self):
685        """Ensure child logging locks are not held; bpo-6721 & bpo-36533."""
686        class _OurHandler(logging.Handler):
687            def __init__(self):
688                super().__init__()
689                self.sub_handler = logging.StreamHandler(
690                    stream=open('/dev/null', 'wt', encoding='utf-8'))
691
692            def emit(self, record):
693                self.sub_handler.acquire()
694                try:
695                    self.sub_handler.emit(record)
696                finally:
697                    self.sub_handler.release()
698
699        self.assertEqual(len(logging._handlers), 0)
700        refed_h = _OurHandler()
701        self.addCleanup(refed_h.sub_handler.stream.close)
702        refed_h.name = 'because we need at least one for this test'
703        self.assertGreater(len(logging._handlers), 0)
704        self.assertGreater(len(logging._at_fork_reinit_lock_weakset), 1)
705        test_logger = logging.getLogger('test_post_fork_child_no_deadlock')
706        test_logger.addHandler(refed_h)
707        test_logger.setLevel(logging.DEBUG)
708
709        locks_held__ready_to_fork = threading.Event()
710        fork_happened__release_locks_and_end_thread = threading.Event()
711
712        def lock_holder_thread_fn():
713            logging._acquireLock()
714            try:
715                refed_h.acquire()
716                try:
717                    # Tell the main thread to do the fork.
718                    locks_held__ready_to_fork.set()
719
720                    # If the deadlock bug exists, the fork will happen
721                    # without dealing with the locks we hold, deadlocking
722                    # the child.
723
724                    # Wait for a successful fork or an unreasonable amount of
725                    # time before releasing our locks.  To avoid a timing based
726                    # test we'd need communication from os.fork() as to when it
727                    # has actually happened.  Given this is a regression test
728                    # for a fixed issue, potentially less reliably detecting
729                    # regression via timing is acceptable for simplicity.
730                    # The test will always take at least this long. :(
731                    fork_happened__release_locks_and_end_thread.wait(0.5)
732                finally:
733                    refed_h.release()
734            finally:
735                logging._releaseLock()
736
737        lock_holder_thread = threading.Thread(
738                target=lock_holder_thread_fn,
739                name='test_post_fork_child_no_deadlock lock holder')
740        lock_holder_thread.start()
741
742        locks_held__ready_to_fork.wait()
743        pid = os.fork()
744        if pid == 0:
745            # Child process
746            try:
747                test_logger.info(r'Child process did not deadlock. \o/')
748            finally:
749                os._exit(0)
750        else:
751            # Parent process
752            test_logger.info(r'Parent process returned from fork. \o/')
753            fork_happened__release_locks_and_end_thread.set()
754            lock_holder_thread.join()
755
756            support.wait_process(pid, exitcode=0)
757
758
759class BadStream(object):
760    def write(self, data):
761        raise RuntimeError('deliberate mistake')
762
763class TestStreamHandler(logging.StreamHandler):
764    def handleError(self, record):
765        self.error_record = record
766
767class StreamWithIntName(object):
768    level = logging.NOTSET
769    name = 2
770
771class StreamHandlerTest(BaseTest):
772    def test_error_handling(self):
773        h = TestStreamHandler(BadStream())
774        r = logging.makeLogRecord({})
775        old_raise = logging.raiseExceptions
776
777        try:
778            h.handle(r)
779            self.assertIs(h.error_record, r)
780
781            h = logging.StreamHandler(BadStream())
782            with support.captured_stderr() as stderr:
783                h.handle(r)
784                msg = '\nRuntimeError: deliberate mistake\n'
785                self.assertIn(msg, stderr.getvalue())
786
787            logging.raiseExceptions = False
788            with support.captured_stderr() as stderr:
789                h.handle(r)
790                self.assertEqual('', stderr.getvalue())
791        finally:
792            logging.raiseExceptions = old_raise
793
794    def test_stream_setting(self):
795        """
796        Test setting the handler's stream
797        """
798        h = logging.StreamHandler()
799        stream = io.StringIO()
800        old = h.setStream(stream)
801        self.assertIs(old, sys.stderr)
802        actual = h.setStream(old)
803        self.assertIs(actual, stream)
804        # test that setting to existing value returns None
805        actual = h.setStream(old)
806        self.assertIsNone(actual)
807
808    def test_can_represent_stream_with_int_name(self):
809        h = logging.StreamHandler(StreamWithIntName())
810        self.assertEqual(repr(h), '<StreamHandler 2 (NOTSET)>')
811
812# -- The following section could be moved into a server_helper.py module
813# -- if it proves to be of wider utility than just test_logging
814
815class TestSMTPServer(smtpd.SMTPServer):
816    """
817    This class implements a test SMTP server.
818
819    :param addr: A (host, port) tuple which the server listens on.
820                 You can specify a port value of zero: the server's
821                 *port* attribute will hold the actual port number
822                 used, which can be used in client connections.
823    :param handler: A callable which will be called to process
824                    incoming messages. The handler will be passed
825                    the client address tuple, who the message is from,
826                    a list of recipients and the message data.
827    :param poll_interval: The interval, in seconds, used in the underlying
828                          :func:`select` or :func:`poll` call by
829                          :func:`asyncore.loop`.
830    :param sockmap: A dictionary which will be used to hold
831                    :class:`asyncore.dispatcher` instances used by
832                    :func:`asyncore.loop`. This avoids changing the
833                    :mod:`asyncore` module's global state.
834    """
835
836    def __init__(self, addr, handler, poll_interval, sockmap):
837        smtpd.SMTPServer.__init__(self, addr, None, map=sockmap,
838                                  decode_data=True)
839        self.port = self.socket.getsockname()[1]
840        self._handler = handler
841        self._thread = None
842        self._quit = False
843        self.poll_interval = poll_interval
844
845    def process_message(self, peer, mailfrom, rcpttos, data):
846        """
847        Delegates to the handler passed in to the server's constructor.
848
849        Typically, this will be a test case method.
850        :param peer: The client (host, port) tuple.
851        :param mailfrom: The address of the sender.
852        :param rcpttos: The addresses of the recipients.
853        :param data: The message.
854        """
855        self._handler(peer, mailfrom, rcpttos, data)
856
857    def start(self):
858        """
859        Start the server running on a separate daemon thread.
860        """
861        self._thread = t = threading.Thread(target=self.serve_forever,
862                                            args=(self.poll_interval,))
863        t.daemon = True
864        t.start()
865
866    def serve_forever(self, poll_interval):
867        """
868        Run the :mod:`asyncore` loop until normal termination
869        conditions arise.
870        :param poll_interval: The interval, in seconds, used in the underlying
871                              :func:`select` or :func:`poll` call by
872                              :func:`asyncore.loop`.
873        """
874        while not self._quit:
875            asyncore.loop(poll_interval, map=self._map, count=1)
876
877    def stop(self):
878        """
879        Stop the thread by closing the server instance.
880        Wait for the server thread to terminate.
881        """
882        self._quit = True
883        threading_helper.join_thread(self._thread)
884        self._thread = None
885        self.close()
886        asyncore.close_all(map=self._map, ignore_all=True)
887
888
889class ControlMixin(object):
890    """
891    This mixin is used to start a server on a separate thread, and
892    shut it down programmatically. Request handling is simplified - instead
893    of needing to derive a suitable RequestHandler subclass, you just
894    provide a callable which will be passed each received request to be
895    processed.
896
897    :param handler: A handler callable which will be called with a
898                    single parameter - the request - in order to
899                    process the request. This handler is called on the
900                    server thread, effectively meaning that requests are
901                    processed serially. While not quite web scale ;-),
902                    this should be fine for testing applications.
903    :param poll_interval: The polling interval in seconds.
904    """
905    def __init__(self, handler, poll_interval):
906        self._thread = None
907        self.poll_interval = poll_interval
908        self._handler = handler
909        self.ready = threading.Event()
910
911    def start(self):
912        """
913        Create a daemon thread to run the server, and start it.
914        """
915        self._thread = t = threading.Thread(target=self.serve_forever,
916                                            args=(self.poll_interval,))
917        t.daemon = True
918        t.start()
919
920    def serve_forever(self, poll_interval):
921        """
922        Run the server. Set the ready flag before entering the
923        service loop.
924        """
925        self.ready.set()
926        super(ControlMixin, self).serve_forever(poll_interval)
927
928    def stop(self):
929        """
930        Tell the server thread to stop, and wait for it to do so.
931        """
932        self.shutdown()
933        if self._thread is not None:
934            threading_helper.join_thread(self._thread)
935            self._thread = None
936        self.server_close()
937        self.ready.clear()
938
939class TestHTTPServer(ControlMixin, HTTPServer):
940    """
941    An HTTP server which is controllable using :class:`ControlMixin`.
942
943    :param addr: A tuple with the IP address and port to listen on.
944    :param handler: A handler callable which will be called with a
945                    single parameter - the request - in order to
946                    process the request.
947    :param poll_interval: The polling interval in seconds.
948    :param log: Pass ``True`` to enable log messages.
949    """
950    def __init__(self, addr, handler, poll_interval=0.5,
951                 log=False, sslctx=None):
952        class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
953            def __getattr__(self, name, default=None):
954                if name.startswith('do_'):
955                    return self.process_request
956                raise AttributeError(name)
957
958            def process_request(self):
959                self.server._handler(self)
960
961            def log_message(self, format, *args):
962                if log:
963                    super(DelegatingHTTPRequestHandler,
964                          self).log_message(format, *args)
965        HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
966        ControlMixin.__init__(self, handler, poll_interval)
967        self.sslctx = sslctx
968
969    def get_request(self):
970        try:
971            sock, addr = self.socket.accept()
972            if self.sslctx:
973                sock = self.sslctx.wrap_socket(sock, server_side=True)
974        except OSError as e:
975            # socket errors are silenced by the caller, print them here
976            sys.stderr.write("Got an error:\n%s\n" % e)
977            raise
978        return sock, addr
979
980class TestTCPServer(ControlMixin, ThreadingTCPServer):
981    """
982    A TCP server which is controllable using :class:`ControlMixin`.
983
984    :param addr: A tuple with the IP address and port to listen on.
985    :param handler: A handler callable which will be called with a single
986                    parameter - the request - in order to process the request.
987    :param poll_interval: The polling interval in seconds.
988    :bind_and_activate: If True (the default), binds the server and starts it
989                        listening. If False, you need to call
990                        :meth:`server_bind` and :meth:`server_activate` at
991                        some later time before calling :meth:`start`, so that
992                        the server will set up the socket and listen on it.
993    """
994
995    allow_reuse_address = True
996
997    def __init__(self, addr, handler, poll_interval=0.5,
998                 bind_and_activate=True):
999        class DelegatingTCPRequestHandler(StreamRequestHandler):
1000
1001            def handle(self):
1002                self.server._handler(self)
1003        ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
1004                                    bind_and_activate)
1005        ControlMixin.__init__(self, handler, poll_interval)
1006
1007    def server_bind(self):
1008        super(TestTCPServer, self).server_bind()
1009        self.port = self.socket.getsockname()[1]
1010
1011class TestUDPServer(ControlMixin, ThreadingUDPServer):
1012    """
1013    A UDP server which is controllable using :class:`ControlMixin`.
1014
1015    :param addr: A tuple with the IP address and port to listen on.
1016    :param handler: A handler callable which will be called with a
1017                    single parameter - the request - in order to
1018                    process the request.
1019    :param poll_interval: The polling interval for shutdown requests,
1020                          in seconds.
1021    :bind_and_activate: If True (the default), binds the server and
1022                        starts it listening. If False, you need to
1023                        call :meth:`server_bind` and
1024                        :meth:`server_activate` at some later time
1025                        before calling :meth:`start`, so that the server will
1026                        set up the socket and listen on it.
1027    """
1028    def __init__(self, addr, handler, poll_interval=0.5,
1029                 bind_and_activate=True):
1030        class DelegatingUDPRequestHandler(DatagramRequestHandler):
1031
1032            def handle(self):
1033                self.server._handler(self)
1034
1035            def finish(self):
1036                data = self.wfile.getvalue()
1037                if data:
1038                    try:
1039                        super(DelegatingUDPRequestHandler, self).finish()
1040                    except OSError:
1041                        if not self.server._closed:
1042                            raise
1043
1044        ThreadingUDPServer.__init__(self, addr,
1045                                    DelegatingUDPRequestHandler,
1046                                    bind_and_activate)
1047        ControlMixin.__init__(self, handler, poll_interval)
1048        self._closed = False
1049
1050    def server_bind(self):
1051        super(TestUDPServer, self).server_bind()
1052        self.port = self.socket.getsockname()[1]
1053
1054    def server_close(self):
1055        super(TestUDPServer, self).server_close()
1056        self._closed = True
1057
1058if hasattr(socket, "AF_UNIX"):
1059    class TestUnixStreamServer(TestTCPServer):
1060        address_family = socket.AF_UNIX
1061
1062    class TestUnixDatagramServer(TestUDPServer):
1063        address_family = socket.AF_UNIX
1064
1065# - end of server_helper section
1066
1067@support.requires_working_socket()
1068@threading_helper.requires_working_threading()
1069class SMTPHandlerTest(BaseTest):
1070    # bpo-14314, bpo-19665, bpo-34092: don't wait forever
1071    TIMEOUT = support.LONG_TIMEOUT
1072
1073    def test_basic(self):
1074        sockmap = {}
1075        server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001,
1076                                sockmap)
1077        server.start()
1078        addr = (socket_helper.HOST, server.port)
1079        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
1080                                         timeout=self.TIMEOUT)
1081        self.assertEqual(h.toaddrs, ['you'])
1082        self.messages = []
1083        r = logging.makeLogRecord({'msg': 'Hello \u2713'})
1084        self.handled = threading.Event()
1085        h.handle(r)
1086        self.handled.wait(self.TIMEOUT)
1087        server.stop()
1088        self.assertTrue(self.handled.is_set())
1089        self.assertEqual(len(self.messages), 1)
1090        peer, mailfrom, rcpttos, data = self.messages[0]
1091        self.assertEqual(mailfrom, 'me')
1092        self.assertEqual(rcpttos, ['you'])
1093        self.assertIn('\nSubject: Log\n', data)
1094        self.assertTrue(data.endswith('\n\nHello \u2713'))
1095        h.close()
1096
1097    def process_message(self, *args):
1098        self.messages.append(args)
1099        self.handled.set()
1100
1101class MemoryHandlerTest(BaseTest):
1102
1103    """Tests for the MemoryHandler."""
1104
1105    # Do not bother with a logger name group.
1106    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
1107
1108    def setUp(self):
1109        BaseTest.setUp(self)
1110        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1111                                                       self.root_hdlr)
1112        self.mem_logger = logging.getLogger('mem')
1113        self.mem_logger.propagate = 0
1114        self.mem_logger.addHandler(self.mem_hdlr)
1115
1116    def tearDown(self):
1117        self.mem_hdlr.close()
1118        BaseTest.tearDown(self)
1119
1120    def test_flush(self):
1121        # The memory handler flushes to its target handler based on specific
1122        #  criteria (message count and message level).
1123        self.mem_logger.debug(self.next_message())
1124        self.assert_log_lines([])
1125        self.mem_logger.info(self.next_message())
1126        self.assert_log_lines([])
1127        # This will flush because the level is >= logging.WARNING
1128        self.mem_logger.warning(self.next_message())
1129        lines = [
1130            ('DEBUG', '1'),
1131            ('INFO', '2'),
1132            ('WARNING', '3'),
1133        ]
1134        self.assert_log_lines(lines)
1135        for n in (4, 14):
1136            for i in range(9):
1137                self.mem_logger.debug(self.next_message())
1138            self.assert_log_lines(lines)
1139            # This will flush because it's the 10th message since the last
1140            #  flush.
1141            self.mem_logger.debug(self.next_message())
1142            lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
1143            self.assert_log_lines(lines)
1144
1145        self.mem_logger.debug(self.next_message())
1146        self.assert_log_lines(lines)
1147
1148    def test_flush_on_close(self):
1149        """
1150        Test that the flush-on-close configuration works as expected.
1151        """
1152        self.mem_logger.debug(self.next_message())
1153        self.assert_log_lines([])
1154        self.mem_logger.info(self.next_message())
1155        self.assert_log_lines([])
1156        self.mem_logger.removeHandler(self.mem_hdlr)
1157        # Default behaviour is to flush on close. Check that it happens.
1158        self.mem_hdlr.close()
1159        lines = [
1160            ('DEBUG', '1'),
1161            ('INFO', '2'),
1162        ]
1163        self.assert_log_lines(lines)
1164        # Now configure for flushing not to be done on close.
1165        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1166                                                       self.root_hdlr,
1167                                                       False)
1168        self.mem_logger.addHandler(self.mem_hdlr)
1169        self.mem_logger.debug(self.next_message())
1170        self.assert_log_lines(lines)  # no change
1171        self.mem_logger.info(self.next_message())
1172        self.assert_log_lines(lines)  # no change
1173        self.mem_logger.removeHandler(self.mem_hdlr)
1174        self.mem_hdlr.close()
1175        # assert that no new lines have been added
1176        self.assert_log_lines(lines)  # no change
1177
1178    @threading_helper.requires_working_threading()
1179    def test_race_between_set_target_and_flush(self):
1180        class MockRaceConditionHandler:
1181            def __init__(self, mem_hdlr):
1182                self.mem_hdlr = mem_hdlr
1183                self.threads = []
1184
1185            def removeTarget(self):
1186                self.mem_hdlr.setTarget(None)
1187
1188            def handle(self, msg):
1189                thread = threading.Thread(target=self.removeTarget)
1190                self.threads.append(thread)
1191                thread.start()
1192
1193        target = MockRaceConditionHandler(self.mem_hdlr)
1194        try:
1195            self.mem_hdlr.setTarget(target)
1196
1197            for _ in range(10):
1198                time.sleep(0.005)
1199                self.mem_logger.info("not flushed")
1200                self.mem_logger.warning("flushed")
1201        finally:
1202            for thread in target.threads:
1203                threading_helper.join_thread(thread)
1204
1205
1206class ExceptionFormatter(logging.Formatter):
1207    """A special exception formatter."""
1208    def formatException(self, ei):
1209        return "Got a [%s]" % ei[0].__name__
1210
1211
1212class ConfigFileTest(BaseTest):
1213
1214    """Reading logging config from a .ini-style config file."""
1215
1216    check_no_resource_warning = warnings_helper.check_no_resource_warning
1217    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
1218
1219    # config0 is a standard configuration.
1220    config0 = """
1221    [loggers]
1222    keys=root
1223
1224    [handlers]
1225    keys=hand1
1226
1227    [formatters]
1228    keys=form1
1229
1230    [logger_root]
1231    level=WARNING
1232    handlers=hand1
1233
1234    [handler_hand1]
1235    class=StreamHandler
1236    level=NOTSET
1237    formatter=form1
1238    args=(sys.stdout,)
1239
1240    [formatter_form1]
1241    format=%(levelname)s ++ %(message)s
1242    datefmt=
1243    """
1244
1245    # config1 adds a little to the standard configuration.
1246    config1 = """
1247    [loggers]
1248    keys=root,parser
1249
1250    [handlers]
1251    keys=hand1
1252
1253    [formatters]
1254    keys=form1
1255
1256    [logger_root]
1257    level=WARNING
1258    handlers=
1259
1260    [logger_parser]
1261    level=DEBUG
1262    handlers=hand1
1263    propagate=1
1264    qualname=compiler.parser
1265
1266    [handler_hand1]
1267    class=StreamHandler
1268    level=NOTSET
1269    formatter=form1
1270    args=(sys.stdout,)
1271
1272    [formatter_form1]
1273    format=%(levelname)s ++ %(message)s
1274    datefmt=
1275    """
1276
1277    # config1a moves the handler to the root.
1278    config1a = """
1279    [loggers]
1280    keys=root,parser
1281
1282    [handlers]
1283    keys=hand1
1284
1285    [formatters]
1286    keys=form1
1287
1288    [logger_root]
1289    level=WARNING
1290    handlers=hand1
1291
1292    [logger_parser]
1293    level=DEBUG
1294    handlers=
1295    propagate=1
1296    qualname=compiler.parser
1297
1298    [handler_hand1]
1299    class=StreamHandler
1300    level=NOTSET
1301    formatter=form1
1302    args=(sys.stdout,)
1303
1304    [formatter_form1]
1305    format=%(levelname)s ++ %(message)s
1306    datefmt=
1307    """
1308
1309    # config2 has a subtle configuration error that should be reported
1310    config2 = config1.replace("sys.stdout", "sys.stbout")
1311
1312    # config3 has a less subtle configuration error
1313    config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
1314
1315    # config4 specifies a custom formatter class to be loaded
1316    config4 = """
1317    [loggers]
1318    keys=root
1319
1320    [handlers]
1321    keys=hand1
1322
1323    [formatters]
1324    keys=form1
1325
1326    [logger_root]
1327    level=NOTSET
1328    handlers=hand1
1329
1330    [handler_hand1]
1331    class=StreamHandler
1332    level=NOTSET
1333    formatter=form1
1334    args=(sys.stdout,)
1335
1336    [formatter_form1]
1337    class=""" + __name__ + """.ExceptionFormatter
1338    format=%(levelname)s:%(name)s:%(message)s
1339    datefmt=
1340    """
1341
1342    # config5 specifies a custom handler class to be loaded
1343    config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
1344
1345    # config6 uses ', ' delimiters in the handlers and formatters sections
1346    config6 = """
1347    [loggers]
1348    keys=root,parser
1349
1350    [handlers]
1351    keys=hand1, hand2
1352
1353    [formatters]
1354    keys=form1, form2
1355
1356    [logger_root]
1357    level=WARNING
1358    handlers=
1359
1360    [logger_parser]
1361    level=DEBUG
1362    handlers=hand1
1363    propagate=1
1364    qualname=compiler.parser
1365
1366    [handler_hand1]
1367    class=StreamHandler
1368    level=NOTSET
1369    formatter=form1
1370    args=(sys.stdout,)
1371
1372    [handler_hand2]
1373    class=StreamHandler
1374    level=NOTSET
1375    formatter=form1
1376    args=(sys.stderr,)
1377
1378    [formatter_form1]
1379    format=%(levelname)s ++ %(message)s
1380    datefmt=
1381
1382    [formatter_form2]
1383    format=%(message)s
1384    datefmt=
1385    """
1386
1387    # config7 adds a compiler logger, and uses kwargs instead of args.
1388    config7 = """
1389    [loggers]
1390    keys=root,parser,compiler
1391
1392    [handlers]
1393    keys=hand1
1394
1395    [formatters]
1396    keys=form1
1397
1398    [logger_root]
1399    level=WARNING
1400    handlers=hand1
1401
1402    [logger_compiler]
1403    level=DEBUG
1404    handlers=
1405    propagate=1
1406    qualname=compiler
1407
1408    [logger_parser]
1409    level=DEBUG
1410    handlers=
1411    propagate=1
1412    qualname=compiler.parser
1413
1414    [handler_hand1]
1415    class=StreamHandler
1416    level=NOTSET
1417    formatter=form1
1418    kwargs={'stream': sys.stdout,}
1419
1420    [formatter_form1]
1421    format=%(levelname)s ++ %(message)s
1422    datefmt=
1423    """
1424
1425    # config 8, check for resource warning
1426    config8 = r"""
1427    [loggers]
1428    keys=root
1429
1430    [handlers]
1431    keys=file
1432
1433    [formatters]
1434    keys=
1435
1436    [logger_root]
1437    level=DEBUG
1438    handlers=file
1439
1440    [handler_file]
1441    class=FileHandler
1442    level=DEBUG
1443    args=("{tempfile}",)
1444    kwargs={{"encoding": "utf-8"}}
1445    """
1446
1447    disable_test = """
1448    [loggers]
1449    keys=root
1450
1451    [handlers]
1452    keys=screen
1453
1454    [formatters]
1455    keys=
1456
1457    [logger_root]
1458    level=DEBUG
1459    handlers=screen
1460
1461    [handler_screen]
1462    level=DEBUG
1463    class=StreamHandler
1464    args=(sys.stdout,)
1465    formatter=
1466    """
1467
1468    def apply_config(self, conf, **kwargs):
1469        file = io.StringIO(textwrap.dedent(conf))
1470        logging.config.fileConfig(file, encoding="utf-8", **kwargs)
1471
1472    def test_config0_ok(self):
1473        # A simple config file which overrides the default settings.
1474        with support.captured_stdout() as output:
1475            self.apply_config(self.config0)
1476            logger = logging.getLogger()
1477            # Won't output anything
1478            logger.info(self.next_message())
1479            # Outputs a message
1480            logger.error(self.next_message())
1481            self.assert_log_lines([
1482                ('ERROR', '2'),
1483            ], stream=output)
1484            # Original logger output is empty.
1485            self.assert_log_lines([])
1486
1487    def test_config0_using_cp_ok(self):
1488        # A simple config file which overrides the default settings.
1489        with support.captured_stdout() as output:
1490            file = io.StringIO(textwrap.dedent(self.config0))
1491            cp = configparser.ConfigParser()
1492            cp.read_file(file)
1493            logging.config.fileConfig(cp)
1494            logger = logging.getLogger()
1495            # Won't output anything
1496            logger.info(self.next_message())
1497            # Outputs a message
1498            logger.error(self.next_message())
1499            self.assert_log_lines([
1500                ('ERROR', '2'),
1501            ], stream=output)
1502            # Original logger output is empty.
1503            self.assert_log_lines([])
1504
1505    def test_config1_ok(self, config=config1):
1506        # A config file defining a sub-parser as well.
1507        with support.captured_stdout() as output:
1508            self.apply_config(config)
1509            logger = logging.getLogger("compiler.parser")
1510            # Both will output a message
1511            logger.info(self.next_message())
1512            logger.error(self.next_message())
1513            self.assert_log_lines([
1514                ('INFO', '1'),
1515                ('ERROR', '2'),
1516            ], stream=output)
1517            # Original logger output is empty.
1518            self.assert_log_lines([])
1519
1520    def test_config2_failure(self):
1521        # A simple config file which overrides the default settings.
1522        self.assertRaises(Exception, self.apply_config, self.config2)
1523
1524    def test_config3_failure(self):
1525        # A simple config file which overrides the default settings.
1526        self.assertRaises(Exception, self.apply_config, self.config3)
1527
1528    def test_config4_ok(self):
1529        # A config file specifying a custom formatter class.
1530        with support.captured_stdout() as output:
1531            self.apply_config(self.config4)
1532            logger = logging.getLogger()
1533            try:
1534                raise RuntimeError()
1535            except RuntimeError:
1536                logging.exception("just testing")
1537            sys.stdout.seek(0)
1538            self.assertEqual(output.getvalue(),
1539                "ERROR:root:just testing\nGot a [RuntimeError]\n")
1540            # Original logger output is empty
1541            self.assert_log_lines([])
1542
1543    def test_config5_ok(self):
1544        self.test_config1_ok(config=self.config5)
1545
1546    def test_config6_ok(self):
1547        self.test_config1_ok(config=self.config6)
1548
1549    def test_config7_ok(self):
1550        with support.captured_stdout() as output:
1551            self.apply_config(self.config1a)
1552            logger = logging.getLogger("compiler.parser")
1553            # See issue #11424. compiler-hyphenated sorts
1554            # between compiler and compiler.xyz and this
1555            # was preventing compiler.xyz from being included
1556            # in the child loggers of compiler because of an
1557            # overzealous loop termination condition.
1558            hyphenated = logging.getLogger('compiler-hyphenated')
1559            # All will output a message
1560            logger.info(self.next_message())
1561            logger.error(self.next_message())
1562            hyphenated.critical(self.next_message())
1563            self.assert_log_lines([
1564                ('INFO', '1'),
1565                ('ERROR', '2'),
1566                ('CRITICAL', '3'),
1567            ], stream=output)
1568            # Original logger output is empty.
1569            self.assert_log_lines([])
1570        with support.captured_stdout() as output:
1571            self.apply_config(self.config7)
1572            logger = logging.getLogger("compiler.parser")
1573            self.assertFalse(logger.disabled)
1574            # Both will output a message
1575            logger.info(self.next_message())
1576            logger.error(self.next_message())
1577            logger = logging.getLogger("compiler.lexer")
1578            # Both will output a message
1579            logger.info(self.next_message())
1580            logger.error(self.next_message())
1581            # Will not appear
1582            hyphenated.critical(self.next_message())
1583            self.assert_log_lines([
1584                ('INFO', '4'),
1585                ('ERROR', '5'),
1586                ('INFO', '6'),
1587                ('ERROR', '7'),
1588            ], stream=output)
1589            # Original logger output is empty.
1590            self.assert_log_lines([])
1591
1592    def test_config8_ok(self):
1593
1594        def cleanup(h1, fn):
1595            h1.close()
1596            os.remove(fn)
1597
1598        with self.check_no_resource_warning():
1599            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
1600            os.close(fd)
1601
1602            # Replace single backslash with double backslash in windows
1603            # to avoid unicode error during string formatting
1604            if os.name == "nt":
1605                fn = fn.replace("\\", "\\\\")
1606
1607            config8 = self.config8.format(tempfile=fn)
1608
1609            self.apply_config(config8)
1610            self.apply_config(config8)
1611
1612        handler = logging.root.handlers[0]
1613        self.addCleanup(cleanup, handler, fn)
1614
1615    def test_logger_disabling(self):
1616        self.apply_config(self.disable_test)
1617        logger = logging.getLogger('some_pristine_logger')
1618        self.assertFalse(logger.disabled)
1619        self.apply_config(self.disable_test)
1620        self.assertTrue(logger.disabled)
1621        self.apply_config(self.disable_test, disable_existing_loggers=False)
1622        self.assertFalse(logger.disabled)
1623
1624    def test_config_set_handler_names(self):
1625        test_config = """
1626            [loggers]
1627            keys=root
1628
1629            [handlers]
1630            keys=hand1
1631
1632            [formatters]
1633            keys=form1
1634
1635            [logger_root]
1636            handlers=hand1
1637
1638            [handler_hand1]
1639            class=StreamHandler
1640            formatter=form1
1641
1642            [formatter_form1]
1643            format=%(levelname)s ++ %(message)s
1644            """
1645        self.apply_config(test_config)
1646        self.assertEqual(logging.getLogger().handlers[0].name, 'hand1')
1647
1648    def test_exception_if_confg_file_is_invalid(self):
1649        test_config = """
1650            [loggers]
1651            keys=root
1652
1653            [handlers]
1654            keys=hand1
1655
1656            [formatters]
1657            keys=form1
1658
1659            [logger_root]
1660            handlers=hand1
1661
1662            [handler_hand1]
1663            class=StreamHandler
1664            formatter=form1
1665
1666            [formatter_form1]
1667            format=%(levelname)s ++ %(message)s
1668
1669            prince
1670            """
1671
1672        file = io.StringIO(textwrap.dedent(test_config))
1673        self.assertRaises(RuntimeError, logging.config.fileConfig, file)
1674
1675    def test_exception_if_confg_file_is_empty(self):
1676        fd, fn = tempfile.mkstemp(prefix='test_empty_', suffix='.ini')
1677        os.close(fd)
1678        self.assertRaises(RuntimeError, logging.config.fileConfig, fn)
1679        os.remove(fn)
1680
1681    def test_exception_if_config_file_does_not_exist(self):
1682        self.assertRaises(FileNotFoundError, logging.config.fileConfig, 'filenotfound')
1683
1684    def test_defaults_do_no_interpolation(self):
1685        """bpo-33802 defaults should not get interpolated"""
1686        ini = textwrap.dedent("""
1687            [formatters]
1688            keys=default
1689
1690            [formatter_default]
1691
1692            [handlers]
1693            keys=console
1694
1695            [handler_console]
1696            class=logging.StreamHandler
1697            args=tuple()
1698
1699            [loggers]
1700            keys=root
1701
1702            [logger_root]
1703            formatter=default
1704            handlers=console
1705            """).strip()
1706        fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini')
1707        try:
1708            os.write(fd, ini.encode('ascii'))
1709            os.close(fd)
1710            logging.config.fileConfig(
1711                fn,
1712                encoding="utf-8",
1713                defaults=dict(
1714                    version=1,
1715                    disable_existing_loggers=False,
1716                    formatters={
1717                        "generic": {
1718                            "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s",
1719                            "datefmt": "[%Y-%m-%d %H:%M:%S %z]",
1720                            "class": "logging.Formatter"
1721                        },
1722                    },
1723                )
1724            )
1725        finally:
1726            os.unlink(fn)
1727
1728
1729@support.requires_working_socket()
1730@threading_helper.requires_working_threading()
1731class SocketHandlerTest(BaseTest):
1732
1733    """Test for SocketHandler objects."""
1734
1735    server_class = TestTCPServer
1736    address = ('localhost', 0)
1737
1738    def setUp(self):
1739        """Set up a TCP server to receive log messages, and a SocketHandler
1740        pointing to that server's address and port."""
1741        BaseTest.setUp(self)
1742        # Issue #29177: deal with errors that happen during setup
1743        self.server = self.sock_hdlr = self.server_exception = None
1744        try:
1745            self.server = server = self.server_class(self.address,
1746                                                     self.handle_socket, 0.01)
1747            server.start()
1748            # Uncomment next line to test error recovery in setUp()
1749            # raise OSError('dummy error raised')
1750        except OSError as e:
1751            self.server_exception = e
1752            return
1753        server.ready.wait()
1754        hcls = logging.handlers.SocketHandler
1755        if isinstance(server.server_address, tuple):
1756            self.sock_hdlr = hcls('localhost', server.port)
1757        else:
1758            self.sock_hdlr = hcls(server.server_address, None)
1759        self.log_output = ''
1760        self.root_logger.removeHandler(self.root_logger.handlers[0])
1761        self.root_logger.addHandler(self.sock_hdlr)
1762        self.handled = threading.Semaphore(0)
1763
1764    def tearDown(self):
1765        """Shutdown the TCP server."""
1766        try:
1767            if self.sock_hdlr:
1768                self.root_logger.removeHandler(self.sock_hdlr)
1769                self.sock_hdlr.close()
1770            if self.server:
1771                self.server.stop()
1772        finally:
1773            BaseTest.tearDown(self)
1774
1775    def handle_socket(self, request):
1776        conn = request.connection
1777        while True:
1778            chunk = conn.recv(4)
1779            if len(chunk) < 4:
1780                break
1781            slen = struct.unpack(">L", chunk)[0]
1782            chunk = conn.recv(slen)
1783            while len(chunk) < slen:
1784                chunk = chunk + conn.recv(slen - len(chunk))
1785            obj = pickle.loads(chunk)
1786            record = logging.makeLogRecord(obj)
1787            self.log_output += record.msg + '\n'
1788            self.handled.release()
1789
1790    def test_output(self):
1791        # The log message sent to the SocketHandler is properly received.
1792        if self.server_exception:
1793            self.skipTest(self.server_exception)
1794        logger = logging.getLogger("tcp")
1795        logger.error("spam")
1796        self.handled.acquire()
1797        logger.debug("eggs")
1798        self.handled.acquire()
1799        self.assertEqual(self.log_output, "spam\neggs\n")
1800
1801    def test_noserver(self):
1802        if self.server_exception:
1803            self.skipTest(self.server_exception)
1804        # Avoid timing-related failures due to SocketHandler's own hard-wired
1805        # one-second timeout on socket.create_connection() (issue #16264).
1806        self.sock_hdlr.retryStart = 2.5
1807        # Kill the server
1808        self.server.stop()
1809        # The logging call should try to connect, which should fail
1810        try:
1811            raise RuntimeError('Deliberate mistake')
1812        except RuntimeError:
1813            self.root_logger.exception('Never sent')
1814        self.root_logger.error('Never sent, either')
1815        now = time.time()
1816        self.assertGreater(self.sock_hdlr.retryTime, now)
1817        time.sleep(self.sock_hdlr.retryTime - now + 0.001)
1818        self.root_logger.error('Nor this')
1819
1820def _get_temp_domain_socket():
1821    fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock')
1822    os.close(fd)
1823    # just need a name - file can't be present, or we'll get an
1824    # 'address already in use' error.
1825    os.remove(fn)
1826    return fn
1827
1828@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1829class UnixSocketHandlerTest(SocketHandlerTest):
1830
1831    """Test for SocketHandler with unix sockets."""
1832
1833    if hasattr(socket, "AF_UNIX"):
1834        server_class = TestUnixStreamServer
1835
1836    def setUp(self):
1837        # override the definition in the base class
1838        self.address = _get_temp_domain_socket()
1839        SocketHandlerTest.setUp(self)
1840
1841    def tearDown(self):
1842        SocketHandlerTest.tearDown(self)
1843        os_helper.unlink(self.address)
1844
1845@support.requires_working_socket()
1846@threading_helper.requires_working_threading()
1847class DatagramHandlerTest(BaseTest):
1848
1849    """Test for DatagramHandler."""
1850
1851    server_class = TestUDPServer
1852    address = ('localhost', 0)
1853
1854    def setUp(self):
1855        """Set up a UDP server to receive log messages, and a DatagramHandler
1856        pointing to that server's address and port."""
1857        BaseTest.setUp(self)
1858        # Issue #29177: deal with errors that happen during setup
1859        self.server = self.sock_hdlr = self.server_exception = None
1860        try:
1861            self.server = server = self.server_class(self.address,
1862                                                     self.handle_datagram, 0.01)
1863            server.start()
1864            # Uncomment next line to test error recovery in setUp()
1865            # raise OSError('dummy error raised')
1866        except OSError as e:
1867            self.server_exception = e
1868            return
1869        server.ready.wait()
1870        hcls = logging.handlers.DatagramHandler
1871        if isinstance(server.server_address, tuple):
1872            self.sock_hdlr = hcls('localhost', server.port)
1873        else:
1874            self.sock_hdlr = hcls(server.server_address, None)
1875        self.log_output = ''
1876        self.root_logger.removeHandler(self.root_logger.handlers[0])
1877        self.root_logger.addHandler(self.sock_hdlr)
1878        self.handled = threading.Event()
1879
1880    def tearDown(self):
1881        """Shutdown the UDP server."""
1882        try:
1883            if self.server:
1884                self.server.stop()
1885            if self.sock_hdlr:
1886                self.root_logger.removeHandler(self.sock_hdlr)
1887                self.sock_hdlr.close()
1888        finally:
1889            BaseTest.tearDown(self)
1890
1891    def handle_datagram(self, request):
1892        slen = struct.pack('>L', 0) # length of prefix
1893        packet = request.packet[len(slen):]
1894        obj = pickle.loads(packet)
1895        record = logging.makeLogRecord(obj)
1896        self.log_output += record.msg + '\n'
1897        self.handled.set()
1898
1899    def test_output(self):
1900        # The log message sent to the DatagramHandler is properly received.
1901        if self.server_exception:
1902            self.skipTest(self.server_exception)
1903        logger = logging.getLogger("udp")
1904        logger.error("spam")
1905        self.handled.wait()
1906        self.handled.clear()
1907        logger.error("eggs")
1908        self.handled.wait()
1909        self.assertEqual(self.log_output, "spam\neggs\n")
1910
1911@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1912class UnixDatagramHandlerTest(DatagramHandlerTest):
1913
1914    """Test for DatagramHandler using Unix sockets."""
1915
1916    if hasattr(socket, "AF_UNIX"):
1917        server_class = TestUnixDatagramServer
1918
1919    def setUp(self):
1920        # override the definition in the base class
1921        self.address = _get_temp_domain_socket()
1922        DatagramHandlerTest.setUp(self)
1923
1924    def tearDown(self):
1925        DatagramHandlerTest.tearDown(self)
1926        os_helper.unlink(self.address)
1927
1928@support.requires_working_socket()
1929@threading_helper.requires_working_threading()
1930class SysLogHandlerTest(BaseTest):
1931
1932    """Test for SysLogHandler using UDP."""
1933
1934    server_class = TestUDPServer
1935    address = ('localhost', 0)
1936
1937    def setUp(self):
1938        """Set up a UDP server to receive log messages, and a SysLogHandler
1939        pointing to that server's address and port."""
1940        BaseTest.setUp(self)
1941        # Issue #29177: deal with errors that happen during setup
1942        self.server = self.sl_hdlr = self.server_exception = None
1943        try:
1944            self.server = server = self.server_class(self.address,
1945                                                     self.handle_datagram, 0.01)
1946            server.start()
1947            # Uncomment next line to test error recovery in setUp()
1948            # raise OSError('dummy error raised')
1949        except OSError as e:
1950            self.server_exception = e
1951            return
1952        server.ready.wait()
1953        hcls = logging.handlers.SysLogHandler
1954        if isinstance(server.server_address, tuple):
1955            self.sl_hdlr = hcls((server.server_address[0], server.port))
1956        else:
1957            self.sl_hdlr = hcls(server.server_address)
1958        self.log_output = b''
1959        self.root_logger.removeHandler(self.root_logger.handlers[0])
1960        self.root_logger.addHandler(self.sl_hdlr)
1961        self.handled = threading.Event()
1962
1963    def tearDown(self):
1964        """Shutdown the server."""
1965        try:
1966            if self.server:
1967                self.server.stop()
1968            if self.sl_hdlr:
1969                self.root_logger.removeHandler(self.sl_hdlr)
1970                self.sl_hdlr.close()
1971        finally:
1972            BaseTest.tearDown(self)
1973
1974    def handle_datagram(self, request):
1975        self.log_output = request.packet
1976        self.handled.set()
1977
1978    def test_output(self):
1979        if self.server_exception:
1980            self.skipTest(self.server_exception)
1981        # The log message sent to the SysLogHandler is properly received.
1982        logger = logging.getLogger("slh")
1983        logger.error("sp\xe4m")
1984        self.handled.wait()
1985        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
1986        self.handled.clear()
1987        self.sl_hdlr.append_nul = False
1988        logger.error("sp\xe4m")
1989        self.handled.wait()
1990        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m')
1991        self.handled.clear()
1992        self.sl_hdlr.ident = "h\xe4m-"
1993        logger.error("sp\xe4m")
1994        self.handled.wait()
1995        self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
1996
1997    def test_udp_reconnection(self):
1998        logger = logging.getLogger("slh")
1999        self.sl_hdlr.close()
2000        self.handled.clear()
2001        logger.error("sp\xe4m")
2002        self.handled.wait(0.1)
2003        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
2004
2005@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
2006class UnixSysLogHandlerTest(SysLogHandlerTest):
2007
2008    """Test for SysLogHandler with Unix sockets."""
2009
2010    if hasattr(socket, "AF_UNIX"):
2011        server_class = TestUnixDatagramServer
2012
2013    def setUp(self):
2014        # override the definition in the base class
2015        self.address = _get_temp_domain_socket()
2016        SysLogHandlerTest.setUp(self)
2017
2018    def tearDown(self):
2019        SysLogHandlerTest.tearDown(self)
2020        os_helper.unlink(self.address)
2021
2022@unittest.skipUnless(socket_helper.IPV6_ENABLED,
2023                     'IPv6 support required for this test.')
2024class IPv6SysLogHandlerTest(SysLogHandlerTest):
2025
2026    """Test for SysLogHandler with IPv6 host."""
2027
2028    server_class = TestUDPServer
2029    address = ('::1', 0)
2030
2031    def setUp(self):
2032        self.server_class.address_family = socket.AF_INET6
2033        super(IPv6SysLogHandlerTest, self).setUp()
2034
2035    def tearDown(self):
2036        self.server_class.address_family = socket.AF_INET
2037        super(IPv6SysLogHandlerTest, self).tearDown()
2038
2039@support.requires_working_socket()
2040@threading_helper.requires_working_threading()
2041class HTTPHandlerTest(BaseTest):
2042    """Test for HTTPHandler."""
2043
2044    def setUp(self):
2045        """Set up an HTTP server to receive log messages, and a HTTPHandler
2046        pointing to that server's address and port."""
2047        BaseTest.setUp(self)
2048        self.handled = threading.Event()
2049
2050    def handle_request(self, request):
2051        self.command = request.command
2052        self.log_data = urlparse(request.path)
2053        if self.command == 'POST':
2054            try:
2055                rlen = int(request.headers['Content-Length'])
2056                self.post_data = request.rfile.read(rlen)
2057            except:
2058                self.post_data = None
2059        request.send_response(200)
2060        request.end_headers()
2061        self.handled.set()
2062
2063    def test_output(self):
2064        # The log message sent to the HTTPHandler is properly received.
2065        logger = logging.getLogger("http")
2066        root_logger = self.root_logger
2067        root_logger.removeHandler(self.root_logger.handlers[0])
2068        for secure in (False, True):
2069            addr = ('localhost', 0)
2070            if secure:
2071                try:
2072                    import ssl
2073                except ImportError:
2074                    sslctx = None
2075                else:
2076                    here = os.path.dirname(__file__)
2077                    localhost_cert = os.path.join(here, "keycert.pem")
2078                    sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2079                    sslctx.load_cert_chain(localhost_cert)
2080
2081                    context = ssl.create_default_context(cafile=localhost_cert)
2082            else:
2083                sslctx = None
2084                context = None
2085            self.server = server = TestHTTPServer(addr, self.handle_request,
2086                                                    0.01, sslctx=sslctx)
2087            server.start()
2088            server.ready.wait()
2089            host = 'localhost:%d' % server.server_port
2090            secure_client = secure and sslctx
2091            self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
2092                                                       secure=secure_client,
2093                                                       context=context,
2094                                                       credentials=('foo', 'bar'))
2095            self.log_data = None
2096            root_logger.addHandler(self.h_hdlr)
2097
2098            for method in ('GET', 'POST'):
2099                self.h_hdlr.method = method
2100                self.handled.clear()
2101                msg = "sp\xe4m"
2102                logger.error(msg)
2103                self.handled.wait()
2104                self.assertEqual(self.log_data.path, '/frob')
2105                self.assertEqual(self.command, method)
2106                if method == 'GET':
2107                    d = parse_qs(self.log_data.query)
2108                else:
2109                    d = parse_qs(self.post_data.decode('utf-8'))
2110                self.assertEqual(d['name'], ['http'])
2111                self.assertEqual(d['funcName'], ['test_output'])
2112                self.assertEqual(d['msg'], [msg])
2113
2114            self.server.stop()
2115            self.root_logger.removeHandler(self.h_hdlr)
2116            self.h_hdlr.close()
2117
2118class MemoryTest(BaseTest):
2119
2120    """Test memory persistence of logger objects."""
2121
2122    def setUp(self):
2123        """Create a dict to remember potentially destroyed objects."""
2124        BaseTest.setUp(self)
2125        self._survivors = {}
2126
2127    def _watch_for_survival(self, *args):
2128        """Watch the given objects for survival, by creating weakrefs to
2129        them."""
2130        for obj in args:
2131            key = id(obj), repr(obj)
2132            self._survivors[key] = weakref.ref(obj)
2133
2134    def _assertTruesurvival(self):
2135        """Assert that all objects watched for survival have survived."""
2136        # Trigger cycle breaking.
2137        gc.collect()
2138        dead = []
2139        for (id_, repr_), ref in self._survivors.items():
2140            if ref() is None:
2141                dead.append(repr_)
2142        if dead:
2143            self.fail("%d objects should have survived "
2144                "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
2145
2146    def test_persistent_loggers(self):
2147        # Logger objects are persistent and retain their configuration, even
2148        #  if visible references are destroyed.
2149        self.root_logger.setLevel(logging.INFO)
2150        foo = logging.getLogger("foo")
2151        self._watch_for_survival(foo)
2152        foo.setLevel(logging.DEBUG)
2153        self.root_logger.debug(self.next_message())
2154        foo.debug(self.next_message())
2155        self.assert_log_lines([
2156            ('foo', 'DEBUG', '2'),
2157        ])
2158        del foo
2159        # foo has survived.
2160        self._assertTruesurvival()
2161        # foo has retained its settings.
2162        bar = logging.getLogger("foo")
2163        bar.debug(self.next_message())
2164        self.assert_log_lines([
2165            ('foo', 'DEBUG', '2'),
2166            ('foo', 'DEBUG', '3'),
2167        ])
2168
2169
2170class EncodingTest(BaseTest):
2171    def test_encoding_plain_file(self):
2172        # In Python 2.x, a plain file object is treated as having no encoding.
2173        log = logging.getLogger("test")
2174        fd, fn = tempfile.mkstemp(".log", "test_logging-1-")
2175        os.close(fd)
2176        # the non-ascii data we write to the log.
2177        data = "foo\x80"
2178        try:
2179            handler = logging.FileHandler(fn, encoding="utf-8")
2180            log.addHandler(handler)
2181            try:
2182                # write non-ascii data to the log.
2183                log.warning(data)
2184            finally:
2185                log.removeHandler(handler)
2186                handler.close()
2187            # check we wrote exactly those bytes, ignoring trailing \n etc
2188            f = open(fn, encoding="utf-8")
2189            try:
2190                self.assertEqual(f.read().rstrip(), data)
2191            finally:
2192                f.close()
2193        finally:
2194            if os.path.isfile(fn):
2195                os.remove(fn)
2196
2197    def test_encoding_cyrillic_unicode(self):
2198        log = logging.getLogger("test")
2199        # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
2200        message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
2201        # Ensure it's written in a Cyrillic encoding
2202        writer_class = codecs.getwriter('cp1251')
2203        writer_class.encoding = 'cp1251'
2204        stream = io.BytesIO()
2205        writer = writer_class(stream, 'strict')
2206        handler = logging.StreamHandler(writer)
2207        log.addHandler(handler)
2208        try:
2209            log.warning(message)
2210        finally:
2211            log.removeHandler(handler)
2212            handler.close()
2213        # check we wrote exactly those bytes, ignoring trailing \n etc
2214        s = stream.getvalue()
2215        # Compare against what the data should be when encoded in CP-1251
2216        self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
2217
2218
2219class WarningsTest(BaseTest):
2220
2221    def test_warnings(self):
2222        with warnings.catch_warnings():
2223            logging.captureWarnings(True)
2224            self.addCleanup(logging.captureWarnings, False)
2225            warnings.filterwarnings("always", category=UserWarning)
2226            stream = io.StringIO()
2227            h = logging.StreamHandler(stream)
2228            logger = logging.getLogger("py.warnings")
2229            logger.addHandler(h)
2230            warnings.warn("I'm warning you...")
2231            logger.removeHandler(h)
2232            s = stream.getvalue()
2233            h.close()
2234            self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
2235
2236            # See if an explicit file uses the original implementation
2237            a_file = io.StringIO()
2238            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
2239                                 a_file, "Dummy line")
2240            s = a_file.getvalue()
2241            a_file.close()
2242            self.assertEqual(s,
2243                "dummy.py:42: UserWarning: Explicit\n  Dummy line\n")
2244
2245    def test_warnings_no_handlers(self):
2246        with warnings.catch_warnings():
2247            logging.captureWarnings(True)
2248            self.addCleanup(logging.captureWarnings, False)
2249
2250            # confirm our assumption: no loggers are set
2251            logger = logging.getLogger("py.warnings")
2252            self.assertEqual(logger.handlers, [])
2253
2254            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
2255            self.assertEqual(len(logger.handlers), 1)
2256            self.assertIsInstance(logger.handlers[0], logging.NullHandler)
2257
2258
2259def formatFunc(format, datefmt=None):
2260    return logging.Formatter(format, datefmt)
2261
2262class myCustomFormatter:
2263    def __init__(self, fmt, datefmt=None):
2264        pass
2265
2266def handlerFunc():
2267    return logging.StreamHandler()
2268
2269class CustomHandler(logging.StreamHandler):
2270    pass
2271
2272class ConfigDictTest(BaseTest):
2273
2274    """Reading logging config from a dictionary."""
2275
2276    check_no_resource_warning = warnings_helper.check_no_resource_warning
2277    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
2278
2279    # config0 is a standard configuration.
2280    config0 = {
2281        'version': 1,
2282        'formatters': {
2283            'form1' : {
2284                'format' : '%(levelname)s ++ %(message)s',
2285            },
2286        },
2287        'handlers' : {
2288            'hand1' : {
2289                'class' : 'logging.StreamHandler',
2290                'formatter' : 'form1',
2291                'level' : 'NOTSET',
2292                'stream'  : 'ext://sys.stdout',
2293            },
2294        },
2295        'root' : {
2296            'level' : 'WARNING',
2297            'handlers' : ['hand1'],
2298        },
2299    }
2300
2301    # config1 adds a little to the standard configuration.
2302    config1 = {
2303        'version': 1,
2304        'formatters': {
2305            'form1' : {
2306                'format' : '%(levelname)s ++ %(message)s',
2307            },
2308        },
2309        'handlers' : {
2310            'hand1' : {
2311                'class' : 'logging.StreamHandler',
2312                'formatter' : 'form1',
2313                'level' : 'NOTSET',
2314                'stream'  : 'ext://sys.stdout',
2315            },
2316        },
2317        'loggers' : {
2318            'compiler.parser' : {
2319                'level' : 'DEBUG',
2320                'handlers' : ['hand1'],
2321            },
2322        },
2323        'root' : {
2324            'level' : 'WARNING',
2325        },
2326    }
2327
2328    # config1a moves the handler to the root. Used with config8a
2329    config1a = {
2330        'version': 1,
2331        'formatters': {
2332            'form1' : {
2333                'format' : '%(levelname)s ++ %(message)s',
2334            },
2335        },
2336        'handlers' : {
2337            'hand1' : {
2338                'class' : 'logging.StreamHandler',
2339                'formatter' : 'form1',
2340                'level' : 'NOTSET',
2341                'stream'  : 'ext://sys.stdout',
2342            },
2343        },
2344        'loggers' : {
2345            'compiler.parser' : {
2346                'level' : 'DEBUG',
2347            },
2348        },
2349        'root' : {
2350            'level' : 'WARNING',
2351            'handlers' : ['hand1'],
2352        },
2353    }
2354
2355    # config2 has a subtle configuration error that should be reported
2356    config2 = {
2357        'version': 1,
2358        'formatters': {
2359            'form1' : {
2360                'format' : '%(levelname)s ++ %(message)s',
2361            },
2362        },
2363        'handlers' : {
2364            'hand1' : {
2365                'class' : 'logging.StreamHandler',
2366                'formatter' : 'form1',
2367                'level' : 'NOTSET',
2368                'stream'  : 'ext://sys.stdbout',
2369            },
2370        },
2371        'loggers' : {
2372            'compiler.parser' : {
2373                'level' : 'DEBUG',
2374                'handlers' : ['hand1'],
2375            },
2376        },
2377        'root' : {
2378            'level' : 'WARNING',
2379        },
2380    }
2381
2382    # As config1 but with a misspelt level on a handler
2383    config2a = {
2384        'version': 1,
2385        'formatters': {
2386            'form1' : {
2387                'format' : '%(levelname)s ++ %(message)s',
2388            },
2389        },
2390        'handlers' : {
2391            'hand1' : {
2392                'class' : 'logging.StreamHandler',
2393                'formatter' : 'form1',
2394                'level' : 'NTOSET',
2395                'stream'  : 'ext://sys.stdout',
2396            },
2397        },
2398        'loggers' : {
2399            'compiler.parser' : {
2400                'level' : 'DEBUG',
2401                'handlers' : ['hand1'],
2402            },
2403        },
2404        'root' : {
2405            'level' : 'WARNING',
2406        },
2407    }
2408
2409
2410    # As config1 but with a misspelt level on a logger
2411    config2b = {
2412        'version': 1,
2413        'formatters': {
2414            'form1' : {
2415                'format' : '%(levelname)s ++ %(message)s',
2416            },
2417        },
2418        'handlers' : {
2419            'hand1' : {
2420                'class' : 'logging.StreamHandler',
2421                'formatter' : 'form1',
2422                'level' : 'NOTSET',
2423                'stream'  : 'ext://sys.stdout',
2424            },
2425        },
2426        'loggers' : {
2427            'compiler.parser' : {
2428                'level' : 'DEBUG',
2429                'handlers' : ['hand1'],
2430            },
2431        },
2432        'root' : {
2433            'level' : 'WRANING',
2434        },
2435    }
2436
2437    # config3 has a less subtle configuration error
2438    config3 = {
2439        'version': 1,
2440        'formatters': {
2441            'form1' : {
2442                'format' : '%(levelname)s ++ %(message)s',
2443            },
2444        },
2445        'handlers' : {
2446            'hand1' : {
2447                'class' : 'logging.StreamHandler',
2448                'formatter' : 'misspelled_name',
2449                'level' : 'NOTSET',
2450                'stream'  : 'ext://sys.stdout',
2451            },
2452        },
2453        'loggers' : {
2454            'compiler.parser' : {
2455                'level' : 'DEBUG',
2456                'handlers' : ['hand1'],
2457            },
2458        },
2459        'root' : {
2460            'level' : 'WARNING',
2461        },
2462    }
2463
2464    # config4 specifies a custom formatter class to be loaded
2465    config4 = {
2466        'version': 1,
2467        'formatters': {
2468            'form1' : {
2469                '()' : __name__ + '.ExceptionFormatter',
2470                'format' : '%(levelname)s:%(name)s:%(message)s',
2471            },
2472        },
2473        'handlers' : {
2474            'hand1' : {
2475                'class' : 'logging.StreamHandler',
2476                'formatter' : 'form1',
2477                'level' : 'NOTSET',
2478                'stream'  : 'ext://sys.stdout',
2479            },
2480        },
2481        'root' : {
2482            'level' : 'NOTSET',
2483                'handlers' : ['hand1'],
2484        },
2485    }
2486
2487    # As config4 but using an actual callable rather than a string
2488    config4a = {
2489        'version': 1,
2490        'formatters': {
2491            'form1' : {
2492                '()' : ExceptionFormatter,
2493                'format' : '%(levelname)s:%(name)s:%(message)s',
2494            },
2495            'form2' : {
2496                '()' : __name__ + '.formatFunc',
2497                'format' : '%(levelname)s:%(name)s:%(message)s',
2498            },
2499            'form3' : {
2500                '()' : formatFunc,
2501                'format' : '%(levelname)s:%(name)s:%(message)s',
2502            },
2503        },
2504        'handlers' : {
2505            'hand1' : {
2506                'class' : 'logging.StreamHandler',
2507                'formatter' : 'form1',
2508                'level' : 'NOTSET',
2509                'stream'  : 'ext://sys.stdout',
2510            },
2511            'hand2' : {
2512                '()' : handlerFunc,
2513            },
2514        },
2515        'root' : {
2516            'level' : 'NOTSET',
2517                'handlers' : ['hand1'],
2518        },
2519    }
2520
2521    # config5 specifies a custom handler class to be loaded
2522    config5 = {
2523        'version': 1,
2524        'formatters': {
2525            'form1' : {
2526                'format' : '%(levelname)s ++ %(message)s',
2527            },
2528        },
2529        'handlers' : {
2530            'hand1' : {
2531                'class' : __name__ + '.CustomHandler',
2532                'formatter' : 'form1',
2533                'level' : 'NOTSET',
2534                'stream'  : 'ext://sys.stdout',
2535            },
2536        },
2537        'loggers' : {
2538            'compiler.parser' : {
2539                'level' : 'DEBUG',
2540                'handlers' : ['hand1'],
2541            },
2542        },
2543        'root' : {
2544            'level' : 'WARNING',
2545        },
2546    }
2547
2548    # config6 specifies a custom handler class to be loaded
2549    # but has bad arguments
2550    config6 = {
2551        'version': 1,
2552        'formatters': {
2553            'form1' : {
2554                'format' : '%(levelname)s ++ %(message)s',
2555            },
2556        },
2557        'handlers' : {
2558            'hand1' : {
2559                'class' : __name__ + '.CustomHandler',
2560                'formatter' : 'form1',
2561                'level' : 'NOTSET',
2562                'stream'  : 'ext://sys.stdout',
2563                '9' : 'invalid parameter name',
2564            },
2565        },
2566        'loggers' : {
2567            'compiler.parser' : {
2568                'level' : 'DEBUG',
2569                'handlers' : ['hand1'],
2570            },
2571        },
2572        'root' : {
2573            'level' : 'WARNING',
2574        },
2575    }
2576
2577    # config 7 does not define compiler.parser but defines compiler.lexer
2578    # so compiler.parser should be disabled after applying it
2579    config7 = {
2580        'version': 1,
2581        'formatters': {
2582            'form1' : {
2583                'format' : '%(levelname)s ++ %(message)s',
2584            },
2585        },
2586        'handlers' : {
2587            'hand1' : {
2588                'class' : 'logging.StreamHandler',
2589                'formatter' : 'form1',
2590                'level' : 'NOTSET',
2591                'stream'  : 'ext://sys.stdout',
2592            },
2593        },
2594        'loggers' : {
2595            'compiler.lexer' : {
2596                'level' : 'DEBUG',
2597                'handlers' : ['hand1'],
2598            },
2599        },
2600        'root' : {
2601            'level' : 'WARNING',
2602        },
2603    }
2604
2605    # config8 defines both compiler and compiler.lexer
2606    # so compiler.parser should not be disabled (since
2607    # compiler is defined)
2608    config8 = {
2609        'version': 1,
2610        'disable_existing_loggers' : False,
2611        'formatters': {
2612            'form1' : {
2613                'format' : '%(levelname)s ++ %(message)s',
2614            },
2615        },
2616        'handlers' : {
2617            'hand1' : {
2618                'class' : 'logging.StreamHandler',
2619                'formatter' : 'form1',
2620                'level' : 'NOTSET',
2621                'stream'  : 'ext://sys.stdout',
2622            },
2623        },
2624        'loggers' : {
2625            'compiler' : {
2626                'level' : 'DEBUG',
2627                'handlers' : ['hand1'],
2628            },
2629            'compiler.lexer' : {
2630            },
2631        },
2632        'root' : {
2633            'level' : 'WARNING',
2634        },
2635    }
2636
2637    # config8a disables existing loggers
2638    config8a = {
2639        'version': 1,
2640        'disable_existing_loggers' : True,
2641        'formatters': {
2642            'form1' : {
2643                'format' : '%(levelname)s ++ %(message)s',
2644            },
2645        },
2646        'handlers' : {
2647            'hand1' : {
2648                'class' : 'logging.StreamHandler',
2649                'formatter' : 'form1',
2650                'level' : 'NOTSET',
2651                'stream'  : 'ext://sys.stdout',
2652            },
2653        },
2654        'loggers' : {
2655            'compiler' : {
2656                'level' : 'DEBUG',
2657                'handlers' : ['hand1'],
2658            },
2659            'compiler.lexer' : {
2660            },
2661        },
2662        'root' : {
2663            'level' : 'WARNING',
2664        },
2665    }
2666
2667    config9 = {
2668        'version': 1,
2669        'formatters': {
2670            'form1' : {
2671                'format' : '%(levelname)s ++ %(message)s',
2672            },
2673        },
2674        'handlers' : {
2675            'hand1' : {
2676                'class' : 'logging.StreamHandler',
2677                'formatter' : 'form1',
2678                'level' : 'WARNING',
2679                'stream'  : 'ext://sys.stdout',
2680            },
2681        },
2682        'loggers' : {
2683            'compiler.parser' : {
2684                'level' : 'WARNING',
2685                'handlers' : ['hand1'],
2686            },
2687        },
2688        'root' : {
2689            'level' : 'NOTSET',
2690        },
2691    }
2692
2693    config9a = {
2694        'version': 1,
2695        'incremental' : True,
2696        'handlers' : {
2697            'hand1' : {
2698                'level' : 'WARNING',
2699            },
2700        },
2701        'loggers' : {
2702            'compiler.parser' : {
2703                'level' : 'INFO',
2704            },
2705        },
2706    }
2707
2708    config9b = {
2709        'version': 1,
2710        'incremental' : True,
2711        'handlers' : {
2712            'hand1' : {
2713                'level' : 'INFO',
2714            },
2715        },
2716        'loggers' : {
2717            'compiler.parser' : {
2718                'level' : 'INFO',
2719            },
2720        },
2721    }
2722
2723    # As config1 but with a filter added
2724    config10 = {
2725        'version': 1,
2726        'formatters': {
2727            'form1' : {
2728                'format' : '%(levelname)s ++ %(message)s',
2729            },
2730        },
2731        'filters' : {
2732            'filt1' : {
2733                'name' : 'compiler.parser',
2734            },
2735        },
2736        'handlers' : {
2737            'hand1' : {
2738                'class' : 'logging.StreamHandler',
2739                'formatter' : 'form1',
2740                'level' : 'NOTSET',
2741                'stream'  : 'ext://sys.stdout',
2742                'filters' : ['filt1'],
2743            },
2744        },
2745        'loggers' : {
2746            'compiler.parser' : {
2747                'level' : 'DEBUG',
2748                'filters' : ['filt1'],
2749            },
2750        },
2751        'root' : {
2752            'level' : 'WARNING',
2753            'handlers' : ['hand1'],
2754        },
2755    }
2756
2757    # As config1 but using cfg:// references
2758    config11 = {
2759        'version': 1,
2760        'true_formatters': {
2761            'form1' : {
2762                'format' : '%(levelname)s ++ %(message)s',
2763            },
2764        },
2765        'handler_configs': {
2766            'hand1' : {
2767                'class' : 'logging.StreamHandler',
2768                'formatter' : 'form1',
2769                'level' : 'NOTSET',
2770                'stream'  : 'ext://sys.stdout',
2771            },
2772        },
2773        'formatters' : 'cfg://true_formatters',
2774        'handlers' : {
2775            'hand1' : 'cfg://handler_configs[hand1]',
2776        },
2777        'loggers' : {
2778            'compiler.parser' : {
2779                'level' : 'DEBUG',
2780                'handlers' : ['hand1'],
2781            },
2782        },
2783        'root' : {
2784            'level' : 'WARNING',
2785        },
2786    }
2787
2788    # As config11 but missing the version key
2789    config12 = {
2790        'true_formatters': {
2791            'form1' : {
2792                'format' : '%(levelname)s ++ %(message)s',
2793            },
2794        },
2795        'handler_configs': {
2796            'hand1' : {
2797                'class' : 'logging.StreamHandler',
2798                'formatter' : 'form1',
2799                'level' : 'NOTSET',
2800                'stream'  : 'ext://sys.stdout',
2801            },
2802        },
2803        'formatters' : 'cfg://true_formatters',
2804        'handlers' : {
2805            'hand1' : 'cfg://handler_configs[hand1]',
2806        },
2807        'loggers' : {
2808            'compiler.parser' : {
2809                'level' : 'DEBUG',
2810                'handlers' : ['hand1'],
2811            },
2812        },
2813        'root' : {
2814            'level' : 'WARNING',
2815        },
2816    }
2817
2818    # As config11 but using an unsupported version
2819    config13 = {
2820        'version': 2,
2821        'true_formatters': {
2822            'form1' : {
2823                'format' : '%(levelname)s ++ %(message)s',
2824            },
2825        },
2826        'handler_configs': {
2827            'hand1' : {
2828                'class' : 'logging.StreamHandler',
2829                'formatter' : 'form1',
2830                'level' : 'NOTSET',
2831                'stream'  : 'ext://sys.stdout',
2832            },
2833        },
2834        'formatters' : 'cfg://true_formatters',
2835        'handlers' : {
2836            'hand1' : 'cfg://handler_configs[hand1]',
2837        },
2838        'loggers' : {
2839            'compiler.parser' : {
2840                'level' : 'DEBUG',
2841                'handlers' : ['hand1'],
2842            },
2843        },
2844        'root' : {
2845            'level' : 'WARNING',
2846        },
2847    }
2848
2849    # As config0, but with properties
2850    config14 = {
2851        'version': 1,
2852        'formatters': {
2853            'form1' : {
2854                'format' : '%(levelname)s ++ %(message)s',
2855            },
2856        },
2857        'handlers' : {
2858            'hand1' : {
2859                'class' : 'logging.StreamHandler',
2860                'formatter' : 'form1',
2861                'level' : 'NOTSET',
2862                'stream'  : 'ext://sys.stdout',
2863                '.': {
2864                    'foo': 'bar',
2865                    'terminator': '!\n',
2866                }
2867            },
2868        },
2869        'root' : {
2870            'level' : 'WARNING',
2871            'handlers' : ['hand1'],
2872        },
2873    }
2874
2875    out_of_order = {
2876        "version": 1,
2877        "formatters": {
2878            "mySimpleFormatter": {
2879                "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s",
2880                "style": "$"
2881            }
2882        },
2883        "handlers": {
2884            "fileGlobal": {
2885                "class": "logging.StreamHandler",
2886                "level": "DEBUG",
2887                "formatter": "mySimpleFormatter"
2888            },
2889            "bufferGlobal": {
2890                "class": "logging.handlers.MemoryHandler",
2891                "capacity": 5,
2892                "formatter": "mySimpleFormatter",
2893                "target": "fileGlobal",
2894                "level": "DEBUG"
2895                }
2896        },
2897        "loggers": {
2898            "mymodule": {
2899                "level": "DEBUG",
2900                "handlers": ["bufferGlobal"],
2901                "propagate": "true"
2902            }
2903        }
2904    }
2905
2906    # Configuration with custom logging.Formatter subclass as '()' key and 'validate' set to False
2907    custom_formatter_class_validate = {
2908        'version': 1,
2909        'formatters': {
2910            'form1': {
2911                '()': __name__ + '.ExceptionFormatter',
2912                'format': '%(levelname)s:%(name)s:%(message)s',
2913                'validate': False,
2914            },
2915        },
2916        'handlers' : {
2917            'hand1' : {
2918                'class': 'logging.StreamHandler',
2919                'formatter': 'form1',
2920                'level': 'NOTSET',
2921                'stream': 'ext://sys.stdout',
2922            },
2923        },
2924        "loggers": {
2925            "my_test_logger_custom_formatter": {
2926                "level": "DEBUG",
2927                "handlers": ["hand1"],
2928                "propagate": "true"
2929            }
2930        }
2931    }
2932
2933    # Configuration with custom logging.Formatter subclass as 'class' key and 'validate' set to False
2934    custom_formatter_class_validate2 = {
2935        'version': 1,
2936        'formatters': {
2937            'form1': {
2938                'class': __name__ + '.ExceptionFormatter',
2939                'format': '%(levelname)s:%(name)s:%(message)s',
2940                'validate': False,
2941            },
2942        },
2943        'handlers' : {
2944            'hand1' : {
2945                'class': 'logging.StreamHandler',
2946                'formatter': 'form1',
2947                'level': 'NOTSET',
2948                'stream': 'ext://sys.stdout',
2949            },
2950        },
2951        "loggers": {
2952            "my_test_logger_custom_formatter": {
2953                "level": "DEBUG",
2954                "handlers": ["hand1"],
2955                "propagate": "true"
2956            }
2957        }
2958    }
2959
2960    # Configuration with custom class that is not inherited from logging.Formatter
2961    custom_formatter_class_validate3 = {
2962        'version': 1,
2963        'formatters': {
2964            'form1': {
2965                'class': __name__ + '.myCustomFormatter',
2966                'format': '%(levelname)s:%(name)s:%(message)s',
2967                'validate': False,
2968            },
2969        },
2970        'handlers' : {
2971            'hand1' : {
2972                'class': 'logging.StreamHandler',
2973                'formatter': 'form1',
2974                'level': 'NOTSET',
2975                'stream': 'ext://sys.stdout',
2976            },
2977        },
2978        "loggers": {
2979            "my_test_logger_custom_formatter": {
2980                "level": "DEBUG",
2981                "handlers": ["hand1"],
2982                "propagate": "true"
2983            }
2984        }
2985    }
2986
2987    # Configuration with custom function and 'validate' set to False
2988    custom_formatter_with_function = {
2989        'version': 1,
2990        'formatters': {
2991            'form1': {
2992                '()': formatFunc,
2993                'format': '%(levelname)s:%(name)s:%(message)s',
2994                'validate': False,
2995            },
2996        },
2997        'handlers' : {
2998            'hand1' : {
2999                'class': 'logging.StreamHandler',
3000                'formatter': 'form1',
3001                'level': 'NOTSET',
3002                'stream': 'ext://sys.stdout',
3003            },
3004        },
3005        "loggers": {
3006            "my_test_logger_custom_formatter": {
3007                "level": "DEBUG",
3008                "handlers": ["hand1"],
3009                "propagate": "true"
3010            }
3011        }
3012    }
3013
3014    def apply_config(self, conf):
3015        logging.config.dictConfig(conf)
3016
3017    def test_config0_ok(self):
3018        # A simple config which overrides the default settings.
3019        with support.captured_stdout() as output:
3020            self.apply_config(self.config0)
3021            logger = logging.getLogger()
3022            # Won't output anything
3023            logger.info(self.next_message())
3024            # Outputs a message
3025            logger.error(self.next_message())
3026            self.assert_log_lines([
3027                ('ERROR', '2'),
3028            ], stream=output)
3029            # Original logger output is empty.
3030            self.assert_log_lines([])
3031
3032    def test_config1_ok(self, config=config1):
3033        # A config defining a sub-parser as well.
3034        with support.captured_stdout() as output:
3035            self.apply_config(config)
3036            logger = logging.getLogger("compiler.parser")
3037            # Both will output a message
3038            logger.info(self.next_message())
3039            logger.error(self.next_message())
3040            self.assert_log_lines([
3041                ('INFO', '1'),
3042                ('ERROR', '2'),
3043            ], stream=output)
3044            # Original logger output is empty.
3045            self.assert_log_lines([])
3046
3047    def test_config2_failure(self):
3048        # A simple config which overrides the default settings.
3049        self.assertRaises(Exception, self.apply_config, self.config2)
3050
3051    def test_config2a_failure(self):
3052        # A simple config which overrides the default settings.
3053        self.assertRaises(Exception, self.apply_config, self.config2a)
3054
3055    def test_config2b_failure(self):
3056        # A simple config which overrides the default settings.
3057        self.assertRaises(Exception, self.apply_config, self.config2b)
3058
3059    def test_config3_failure(self):
3060        # A simple config which overrides the default settings.
3061        self.assertRaises(Exception, self.apply_config, self.config3)
3062
3063    def test_config4_ok(self):
3064        # A config specifying a custom formatter class.
3065        with support.captured_stdout() as output:
3066            self.apply_config(self.config4)
3067            #logger = logging.getLogger()
3068            try:
3069                raise RuntimeError()
3070            except RuntimeError:
3071                logging.exception("just testing")
3072            sys.stdout.seek(0)
3073            self.assertEqual(output.getvalue(),
3074                "ERROR:root:just testing\nGot a [RuntimeError]\n")
3075            # Original logger output is empty
3076            self.assert_log_lines([])
3077
3078    def test_config4a_ok(self):
3079        # A config specifying a custom formatter class.
3080        with support.captured_stdout() as output:
3081            self.apply_config(self.config4a)
3082            #logger = logging.getLogger()
3083            try:
3084                raise RuntimeError()
3085            except RuntimeError:
3086                logging.exception("just testing")
3087            sys.stdout.seek(0)
3088            self.assertEqual(output.getvalue(),
3089                "ERROR:root:just testing\nGot a [RuntimeError]\n")
3090            # Original logger output is empty
3091            self.assert_log_lines([])
3092
3093    def test_config5_ok(self):
3094        self.test_config1_ok(config=self.config5)
3095
3096    def test_config6_failure(self):
3097        self.assertRaises(Exception, self.apply_config, self.config6)
3098
3099    def test_config7_ok(self):
3100        with support.captured_stdout() as output:
3101            self.apply_config(self.config1)
3102            logger = logging.getLogger("compiler.parser")
3103            # Both will output a message
3104            logger.info(self.next_message())
3105            logger.error(self.next_message())
3106            self.assert_log_lines([
3107                ('INFO', '1'),
3108                ('ERROR', '2'),
3109            ], stream=output)
3110            # Original logger output is empty.
3111            self.assert_log_lines([])
3112        with support.captured_stdout() as output:
3113            self.apply_config(self.config7)
3114            logger = logging.getLogger("compiler.parser")
3115            self.assertTrue(logger.disabled)
3116            logger = logging.getLogger("compiler.lexer")
3117            # Both will output a message
3118            logger.info(self.next_message())
3119            logger.error(self.next_message())
3120            self.assert_log_lines([
3121                ('INFO', '3'),
3122                ('ERROR', '4'),
3123            ], stream=output)
3124            # Original logger output is empty.
3125            self.assert_log_lines([])
3126
3127    # Same as test_config_7_ok but don't disable old loggers.
3128    def test_config_8_ok(self):
3129        with support.captured_stdout() as output:
3130            self.apply_config(self.config1)
3131            logger = logging.getLogger("compiler.parser")
3132            # All will output a message
3133            logger.info(self.next_message())
3134            logger.error(self.next_message())
3135            self.assert_log_lines([
3136                ('INFO', '1'),
3137                ('ERROR', '2'),
3138            ], stream=output)
3139            # Original logger output is empty.
3140            self.assert_log_lines([])
3141        with support.captured_stdout() as output:
3142            self.apply_config(self.config8)
3143            logger = logging.getLogger("compiler.parser")
3144            self.assertFalse(logger.disabled)
3145            # Both will output a message
3146            logger.info(self.next_message())
3147            logger.error(self.next_message())
3148            logger = logging.getLogger("compiler.lexer")
3149            # Both will output a message
3150            logger.info(self.next_message())
3151            logger.error(self.next_message())
3152            self.assert_log_lines([
3153                ('INFO', '3'),
3154                ('ERROR', '4'),
3155                ('INFO', '5'),
3156                ('ERROR', '6'),
3157            ], stream=output)
3158            # Original logger output is empty.
3159            self.assert_log_lines([])
3160
3161    def test_config_8a_ok(self):
3162        with support.captured_stdout() as output:
3163            self.apply_config(self.config1a)
3164            logger = logging.getLogger("compiler.parser")
3165            # See issue #11424. compiler-hyphenated sorts
3166            # between compiler and compiler.xyz and this
3167            # was preventing compiler.xyz from being included
3168            # in the child loggers of compiler because of an
3169            # overzealous loop termination condition.
3170            hyphenated = logging.getLogger('compiler-hyphenated')
3171            # All will output a message
3172            logger.info(self.next_message())
3173            logger.error(self.next_message())
3174            hyphenated.critical(self.next_message())
3175            self.assert_log_lines([
3176                ('INFO', '1'),
3177                ('ERROR', '2'),
3178                ('CRITICAL', '3'),
3179            ], stream=output)
3180            # Original logger output is empty.
3181            self.assert_log_lines([])
3182        with support.captured_stdout() as output:
3183            self.apply_config(self.config8a)
3184            logger = logging.getLogger("compiler.parser")
3185            self.assertFalse(logger.disabled)
3186            # Both will output a message
3187            logger.info(self.next_message())
3188            logger.error(self.next_message())
3189            logger = logging.getLogger("compiler.lexer")
3190            # Both will output a message
3191            logger.info(self.next_message())
3192            logger.error(self.next_message())
3193            # Will not appear
3194            hyphenated.critical(self.next_message())
3195            self.assert_log_lines([
3196                ('INFO', '4'),
3197                ('ERROR', '5'),
3198                ('INFO', '6'),
3199                ('ERROR', '7'),
3200            ], stream=output)
3201            # Original logger output is empty.
3202            self.assert_log_lines([])
3203
3204    def test_config_9_ok(self):
3205        with support.captured_stdout() as output:
3206            self.apply_config(self.config9)
3207            logger = logging.getLogger("compiler.parser")
3208            # Nothing will be output since both handler and logger are set to WARNING
3209            logger.info(self.next_message())
3210            self.assert_log_lines([], stream=output)
3211            self.apply_config(self.config9a)
3212            # Nothing will be output since handler is still set to WARNING
3213            logger.info(self.next_message())
3214            self.assert_log_lines([], stream=output)
3215            self.apply_config(self.config9b)
3216            # Message should now be output
3217            logger.info(self.next_message())
3218            self.assert_log_lines([
3219                ('INFO', '3'),
3220            ], stream=output)
3221
3222    def test_config_10_ok(self):
3223        with support.captured_stdout() as output:
3224            self.apply_config(self.config10)
3225            logger = logging.getLogger("compiler.parser")
3226            logger.warning(self.next_message())
3227            logger = logging.getLogger('compiler')
3228            # Not output, because filtered
3229            logger.warning(self.next_message())
3230            logger = logging.getLogger('compiler.lexer')
3231            # Not output, because filtered
3232            logger.warning(self.next_message())
3233            logger = logging.getLogger("compiler.parser.codegen")
3234            # Output, as not filtered
3235            logger.error(self.next_message())
3236            self.assert_log_lines([
3237                ('WARNING', '1'),
3238                ('ERROR', '4'),
3239            ], stream=output)
3240
3241    def test_config11_ok(self):
3242        self.test_config1_ok(self.config11)
3243
3244    def test_config12_failure(self):
3245        self.assertRaises(Exception, self.apply_config, self.config12)
3246
3247    def test_config13_failure(self):
3248        self.assertRaises(Exception, self.apply_config, self.config13)
3249
3250    def test_config14_ok(self):
3251        with support.captured_stdout() as output:
3252            self.apply_config(self.config14)
3253            h = logging._handlers['hand1']
3254            self.assertEqual(h.foo, 'bar')
3255            self.assertEqual(h.terminator, '!\n')
3256            logging.warning('Exclamation')
3257            self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
3258
3259    def test_config15_ok(self):
3260
3261        def cleanup(h1, fn):
3262            h1.close()
3263            os.remove(fn)
3264
3265        with self.check_no_resource_warning():
3266            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
3267            os.close(fd)
3268
3269            config = {
3270                "version": 1,
3271                "handlers": {
3272                    "file": {
3273                        "class": "logging.FileHandler",
3274                        "filename": fn,
3275                        "encoding": "utf-8",
3276                    }
3277                },
3278                "root": {
3279                    "handlers": ["file"]
3280                }
3281            }
3282
3283            self.apply_config(config)
3284            self.apply_config(config)
3285
3286        handler = logging.root.handlers[0]
3287        self.addCleanup(cleanup, handler, fn)
3288
3289    def setup_via_listener(self, text, verify=None):
3290        text = text.encode("utf-8")
3291        # Ask for a randomly assigned port (by using port 0)
3292        t = logging.config.listen(0, verify)
3293        t.start()
3294        t.ready.wait()
3295        # Now get the port allocated
3296        port = t.port
3297        t.ready.clear()
3298        try:
3299            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3300            sock.settimeout(2.0)
3301            sock.connect(('localhost', port))
3302
3303            slen = struct.pack('>L', len(text))
3304            s = slen + text
3305            sentsofar = 0
3306            left = len(s)
3307            while left > 0:
3308                sent = sock.send(s[sentsofar:])
3309                sentsofar += sent
3310                left -= sent
3311            sock.close()
3312        finally:
3313            t.ready.wait(2.0)
3314            logging.config.stopListening()
3315            threading_helper.join_thread(t)
3316
3317    @support.requires_working_socket()
3318    def test_listen_config_10_ok(self):
3319        with support.captured_stdout() as output:
3320            self.setup_via_listener(json.dumps(self.config10))
3321            logger = logging.getLogger("compiler.parser")
3322            logger.warning(self.next_message())
3323            logger = logging.getLogger('compiler')
3324            # Not output, because filtered
3325            logger.warning(self.next_message())
3326            logger = logging.getLogger('compiler.lexer')
3327            # Not output, because filtered
3328            logger.warning(self.next_message())
3329            logger = logging.getLogger("compiler.parser.codegen")
3330            # Output, as not filtered
3331            logger.error(self.next_message())
3332            self.assert_log_lines([
3333                ('WARNING', '1'),
3334                ('ERROR', '4'),
3335            ], stream=output)
3336
3337    @support.requires_working_socket()
3338    def test_listen_config_1_ok(self):
3339        with support.captured_stdout() as output:
3340            self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
3341            logger = logging.getLogger("compiler.parser")
3342            # Both will output a message
3343            logger.info(self.next_message())
3344            logger.error(self.next_message())
3345            self.assert_log_lines([
3346                ('INFO', '1'),
3347                ('ERROR', '2'),
3348            ], stream=output)
3349            # Original logger output is empty.
3350            self.assert_log_lines([])
3351
3352    @support.requires_working_socket()
3353    def test_listen_verify(self):
3354
3355        def verify_fail(stuff):
3356            return None
3357
3358        def verify_reverse(stuff):
3359            return stuff[::-1]
3360
3361        logger = logging.getLogger("compiler.parser")
3362        to_send = textwrap.dedent(ConfigFileTest.config1)
3363        # First, specify a verification function that will fail.
3364        # We expect to see no output, since our configuration
3365        # never took effect.
3366        with support.captured_stdout() as output:
3367            self.setup_via_listener(to_send, verify_fail)
3368            # Both will output a message
3369            logger.info(self.next_message())
3370            logger.error(self.next_message())
3371        self.assert_log_lines([], stream=output)
3372        # Original logger output has the stuff we logged.
3373        self.assert_log_lines([
3374            ('INFO', '1'),
3375            ('ERROR', '2'),
3376        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3377
3378        # Now, perform no verification. Our configuration
3379        # should take effect.
3380
3381        with support.captured_stdout() as output:
3382            self.setup_via_listener(to_send)    # no verify callable specified
3383            logger = logging.getLogger("compiler.parser")
3384            # Both will output a message
3385            logger.info(self.next_message())
3386            logger.error(self.next_message())
3387        self.assert_log_lines([
3388            ('INFO', '3'),
3389            ('ERROR', '4'),
3390        ], stream=output)
3391        # Original logger output still has the stuff we logged before.
3392        self.assert_log_lines([
3393            ('INFO', '1'),
3394            ('ERROR', '2'),
3395        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3396
3397        # Now, perform verification which transforms the bytes.
3398
3399        with support.captured_stdout() as output:
3400            self.setup_via_listener(to_send[::-1], verify_reverse)
3401            logger = logging.getLogger("compiler.parser")
3402            # Both will output a message
3403            logger.info(self.next_message())
3404            logger.error(self.next_message())
3405        self.assert_log_lines([
3406            ('INFO', '5'),
3407            ('ERROR', '6'),
3408        ], stream=output)
3409        # Original logger output still has the stuff we logged before.
3410        self.assert_log_lines([
3411            ('INFO', '1'),
3412            ('ERROR', '2'),
3413        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3414
3415    def test_out_of_order(self):
3416        self.assertRaises(ValueError, self.apply_config, self.out_of_order)
3417
3418    def test_out_of_order_with_dollar_style(self):
3419        config = copy.deepcopy(self.out_of_order)
3420        config['formatters']['mySimpleFormatter']['format'] = "${asctime} (${name}) ${levelname}: ${message}"
3421
3422        self.apply_config(config)
3423        handler = logging.getLogger('mymodule').handlers[0]
3424        self.assertIsInstance(handler.target, logging.Handler)
3425        self.assertIsInstance(handler.formatter._style,
3426                              logging.StringTemplateStyle)
3427
3428    def test_custom_formatter_class_with_validate(self):
3429        self.apply_config(self.custom_formatter_class_validate)
3430        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3431        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3432
3433    def test_custom_formatter_class_with_validate2(self):
3434        self.apply_config(self.custom_formatter_class_validate2)
3435        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3436        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3437
3438    def test_custom_formatter_class_with_validate2_with_wrong_fmt(self):
3439        config = self.custom_formatter_class_validate.copy()
3440        config['formatters']['form1']['style'] = "$"
3441
3442        # Exception should not be raise as we have configured 'validate' to False
3443        self.apply_config(config)
3444        handler = logging.getLogger("my_test_logger_custom_formatter").handlers[0]
3445        self.assertIsInstance(handler.formatter, ExceptionFormatter)
3446
3447    def test_custom_formatter_class_with_validate3(self):
3448        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_class_validate3)
3449
3450    def test_custom_formatter_function_with_validate(self):
3451        self.assertRaises(ValueError, self.apply_config, self.custom_formatter_with_function)
3452
3453    def test_baseconfig(self):
3454        d = {
3455            'atuple': (1, 2, 3),
3456            'alist': ['a', 'b', 'c'],
3457            'adict': {'d': 'e', 'f': 3 },
3458            'nest1': ('g', ('h', 'i'), 'j'),
3459            'nest2': ['k', ['l', 'm'], 'n'],
3460            'nest3': ['o', 'cfg://alist', 'p'],
3461        }
3462        bc = logging.config.BaseConfigurator(d)
3463        self.assertEqual(bc.convert('cfg://atuple[1]'), 2)
3464        self.assertEqual(bc.convert('cfg://alist[1]'), 'b')
3465        self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h')
3466        self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm')
3467        self.assertEqual(bc.convert('cfg://adict.d'), 'e')
3468        self.assertEqual(bc.convert('cfg://adict[f]'), 3)
3469        v = bc.convert('cfg://nest3')
3470        self.assertEqual(v.pop(1), ['a', 'b', 'c'])
3471        self.assertRaises(KeyError, bc.convert, 'cfg://nosuch')
3472        self.assertRaises(ValueError, bc.convert, 'cfg://!')
3473        self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]')
3474
3475    def test_namedtuple(self):
3476        # see bpo-39142
3477        from collections import namedtuple
3478
3479        class MyHandler(logging.StreamHandler):
3480            def __init__(self, resource, *args, **kwargs):
3481                super().__init__(*args, **kwargs)
3482                self.resource: namedtuple = resource
3483
3484            def emit(self, record):
3485                record.msg += f' {self.resource.type}'
3486                return super().emit(record)
3487
3488        Resource = namedtuple('Resource', ['type', 'labels'])
3489        resource = Resource(type='my_type', labels=['a'])
3490
3491        config = {
3492            'version': 1,
3493            'handlers': {
3494                'myhandler': {
3495                    '()': MyHandler,
3496                    'resource': resource
3497                }
3498            },
3499            'root':  {'level': 'INFO', 'handlers': ['myhandler']},
3500        }
3501        with support.captured_stderr() as stderr:
3502            self.apply_config(config)
3503            logging.info('some log')
3504        self.assertEqual(stderr.getvalue(), 'some log my_type\n')
3505
3506    def test_config_callable_filter_works(self):
3507        def filter_(_):
3508            return 1
3509        self.apply_config({
3510            "version": 1, "root": {"level": "DEBUG", "filters": [filter_]}
3511        })
3512        assert logging.getLogger().filters[0] is filter_
3513        logging.getLogger().filters = []
3514
3515    def test_config_filter_works(self):
3516        filter_ = logging.Filter("spam.eggs")
3517        self.apply_config({
3518            "version": 1, "root": {"level": "DEBUG", "filters": [filter_]}
3519        })
3520        assert logging.getLogger().filters[0] is filter_
3521        logging.getLogger().filters = []
3522
3523    def test_config_filter_method_works(self):
3524        class FakeFilter:
3525            def filter(self, _):
3526                return 1
3527        filter_ = FakeFilter()
3528        self.apply_config({
3529            "version": 1, "root": {"level": "DEBUG", "filters": [filter_]}
3530        })
3531        assert logging.getLogger().filters[0] is filter_
3532        logging.getLogger().filters = []
3533
3534    def test_invalid_type_raises(self):
3535        class NotAFilter: pass
3536        for filter_ in [None, 1, NotAFilter()]:
3537            self.assertRaises(
3538                ValueError,
3539                self.apply_config,
3540                {"version": 1, "root": {"level": "DEBUG", "filters": [filter_]}}
3541            )
3542
3543    def test_90195(self):
3544        # See gh-90195
3545        config = {
3546            'version': 1,
3547            'disable_existing_loggers': False,
3548            'handlers': {
3549                'console': {
3550                    'level': 'DEBUG',
3551                    'class': 'logging.StreamHandler',
3552                },
3553            },
3554            'loggers': {
3555                'a': {
3556                    'level': 'DEBUG',
3557                    'handlers': ['console']
3558                }
3559            }
3560        }
3561        logger = logging.getLogger('a')
3562        self.assertFalse(logger.disabled)
3563        self.apply_config(config)
3564        self.assertFalse(logger.disabled)
3565        # Should disable all loggers ...
3566        self.apply_config({'version': 1})
3567        self.assertTrue(logger.disabled)
3568        del config['disable_existing_loggers']
3569        self.apply_config(config)
3570        # Logger should be enabled, since explicitly mentioned
3571        self.assertFalse(logger.disabled)
3572
3573class ManagerTest(BaseTest):
3574    def test_manager_loggerclass(self):
3575        logged = []
3576
3577        class MyLogger(logging.Logger):
3578            def _log(self, level, msg, args, exc_info=None, extra=None):
3579                logged.append(msg)
3580
3581        man = logging.Manager(None)
3582        self.assertRaises(TypeError, man.setLoggerClass, int)
3583        man.setLoggerClass(MyLogger)
3584        logger = man.getLogger('test')
3585        logger.warning('should appear in logged')
3586        logging.warning('should not appear in logged')
3587
3588        self.assertEqual(logged, ['should appear in logged'])
3589
3590    def test_set_log_record_factory(self):
3591        man = logging.Manager(None)
3592        expected = object()
3593        man.setLogRecordFactory(expected)
3594        self.assertEqual(man.logRecordFactory, expected)
3595
3596class ChildLoggerTest(BaseTest):
3597    def test_child_loggers(self):
3598        r = logging.getLogger()
3599        l1 = logging.getLogger('abc')
3600        l2 = logging.getLogger('def.ghi')
3601        c1 = r.getChild('xyz')
3602        c2 = r.getChild('uvw.xyz')
3603        self.assertIs(c1, logging.getLogger('xyz'))
3604        self.assertIs(c2, logging.getLogger('uvw.xyz'))
3605        c1 = l1.getChild('def')
3606        c2 = c1.getChild('ghi')
3607        c3 = l1.getChild('def.ghi')
3608        self.assertIs(c1, logging.getLogger('abc.def'))
3609        self.assertIs(c2, logging.getLogger('abc.def.ghi'))
3610        self.assertIs(c2, c3)
3611
3612
3613class DerivedLogRecord(logging.LogRecord):
3614    pass
3615
3616class LogRecordFactoryTest(BaseTest):
3617
3618    def setUp(self):
3619        class CheckingFilter(logging.Filter):
3620            def __init__(self, cls):
3621                self.cls = cls
3622
3623            def filter(self, record):
3624                t = type(record)
3625                if t is not self.cls:
3626                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
3627                            self.cls)
3628                    raise TypeError(msg)
3629                return True
3630
3631        BaseTest.setUp(self)
3632        self.filter = CheckingFilter(DerivedLogRecord)
3633        self.root_logger.addFilter(self.filter)
3634        self.orig_factory = logging.getLogRecordFactory()
3635
3636    def tearDown(self):
3637        self.root_logger.removeFilter(self.filter)
3638        BaseTest.tearDown(self)
3639        logging.setLogRecordFactory(self.orig_factory)
3640
3641    def test_logrecord_class(self):
3642        self.assertRaises(TypeError, self.root_logger.warning,
3643                          self.next_message())
3644        logging.setLogRecordFactory(DerivedLogRecord)
3645        self.root_logger.error(self.next_message())
3646        self.assert_log_lines([
3647           ('root', 'ERROR', '2'),
3648        ])
3649
3650
3651@threading_helper.requires_working_threading()
3652class QueueHandlerTest(BaseTest):
3653    # Do not bother with a logger name group.
3654    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
3655
3656    def setUp(self):
3657        BaseTest.setUp(self)
3658        self.queue = queue.Queue(-1)
3659        self.que_hdlr = logging.handlers.QueueHandler(self.queue)
3660        self.name = 'que'
3661        self.que_logger = logging.getLogger('que')
3662        self.que_logger.propagate = False
3663        self.que_logger.setLevel(logging.WARNING)
3664        self.que_logger.addHandler(self.que_hdlr)
3665
3666    def tearDown(self):
3667        self.que_hdlr.close()
3668        BaseTest.tearDown(self)
3669
3670    def test_queue_handler(self):
3671        self.que_logger.debug(self.next_message())
3672        self.assertRaises(queue.Empty, self.queue.get_nowait)
3673        self.que_logger.info(self.next_message())
3674        self.assertRaises(queue.Empty, self.queue.get_nowait)
3675        msg = self.next_message()
3676        self.que_logger.warning(msg)
3677        data = self.queue.get_nowait()
3678        self.assertTrue(isinstance(data, logging.LogRecord))
3679        self.assertEqual(data.name, self.que_logger.name)
3680        self.assertEqual((data.msg, data.args), (msg, None))
3681
3682    def test_formatting(self):
3683        msg = self.next_message()
3684        levelname = logging.getLevelName(logging.WARNING)
3685        log_format_str = '{name} -> {levelname}: {message}'
3686        formatted_msg = log_format_str.format(name=self.name,
3687                                              levelname=levelname, message=msg)
3688        formatter = logging.Formatter(self.log_format)
3689        self.que_hdlr.setFormatter(formatter)
3690        self.que_logger.warning(msg)
3691        log_record = self.queue.get_nowait()
3692        self.assertEqual(formatted_msg, log_record.msg)
3693        self.assertEqual(formatted_msg, log_record.message)
3694
3695    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3696                         'logging.handlers.QueueListener required for this test')
3697    def test_queue_listener(self):
3698        handler = TestHandler(support.Matcher())
3699        listener = logging.handlers.QueueListener(self.queue, handler)
3700        listener.start()
3701        try:
3702            self.que_logger.warning(self.next_message())
3703            self.que_logger.error(self.next_message())
3704            self.que_logger.critical(self.next_message())
3705        finally:
3706            listener.stop()
3707        self.assertTrue(handler.matches(levelno=logging.WARNING, message='1'))
3708        self.assertTrue(handler.matches(levelno=logging.ERROR, message='2'))
3709        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3'))
3710        handler.close()
3711
3712        # Now test with respect_handler_level set
3713
3714        handler = TestHandler(support.Matcher())
3715        handler.setLevel(logging.CRITICAL)
3716        listener = logging.handlers.QueueListener(self.queue, handler,
3717                                                  respect_handler_level=True)
3718        listener.start()
3719        try:
3720            self.que_logger.warning(self.next_message())
3721            self.que_logger.error(self.next_message())
3722            self.que_logger.critical(self.next_message())
3723        finally:
3724            listener.stop()
3725        self.assertFalse(handler.matches(levelno=logging.WARNING, message='4'))
3726        self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
3727        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
3728        handler.close()
3729
3730    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3731                         'logging.handlers.QueueListener required for this test')
3732    def test_queue_listener_with_StreamHandler(self):
3733        # Test that traceback and stack-info only appends once (bpo-34334, bpo-46755).
3734        listener = logging.handlers.QueueListener(self.queue, self.root_hdlr)
3735        listener.start()
3736        try:
3737            1 / 0
3738        except ZeroDivisionError as e:
3739            exc = e
3740            self.que_logger.exception(self.next_message(), exc_info=exc)
3741        self.que_logger.error(self.next_message(), stack_info=True)
3742        listener.stop()
3743        self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1)
3744        self.assertEqual(self.stream.getvalue().strip().count('Stack'), 1)
3745
3746    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3747                         'logging.handlers.QueueListener required for this test')
3748    def test_queue_listener_with_multiple_handlers(self):
3749        # Test that queue handler format doesn't affect other handler formats (bpo-35726).
3750        self.que_hdlr.setFormatter(self.root_formatter)
3751        self.que_logger.addHandler(self.root_hdlr)
3752
3753        listener = logging.handlers.QueueListener(self.queue, self.que_hdlr)
3754        listener.start()
3755        self.que_logger.error("error")
3756        listener.stop()
3757        self.assertEqual(self.stream.getvalue().strip(), "que -> ERROR: error")
3758
3759if hasattr(logging.handlers, 'QueueListener'):
3760    import multiprocessing
3761    from unittest.mock import patch
3762
3763    @threading_helper.requires_working_threading()
3764    class QueueListenerTest(BaseTest):
3765        """
3766        Tests based on patch submitted for issue #27930. Ensure that
3767        QueueListener handles all log messages.
3768        """
3769
3770        repeat = 20
3771
3772        @staticmethod
3773        def setup_and_log(log_queue, ident):
3774            """
3775            Creates a logger with a QueueHandler that logs to a queue read by a
3776            QueueListener. Starts the listener, logs five messages, and stops
3777            the listener.
3778            """
3779            logger = logging.getLogger('test_logger_with_id_%s' % ident)
3780            logger.setLevel(logging.DEBUG)
3781            handler = logging.handlers.QueueHandler(log_queue)
3782            logger.addHandler(handler)
3783            listener = logging.handlers.QueueListener(log_queue)
3784            listener.start()
3785
3786            logger.info('one')
3787            logger.info('two')
3788            logger.info('three')
3789            logger.info('four')
3790            logger.info('five')
3791
3792            listener.stop()
3793            logger.removeHandler(handler)
3794            handler.close()
3795
3796        @patch.object(logging.handlers.QueueListener, 'handle')
3797        def test_handle_called_with_queue_queue(self, mock_handle):
3798            for i in range(self.repeat):
3799                log_queue = queue.Queue()
3800                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3801            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3802                             'correct number of handled log messages')
3803
3804        @patch.object(logging.handlers.QueueListener, 'handle')
3805        def test_handle_called_with_mp_queue(self, mock_handle):
3806            # bpo-28668: The multiprocessing (mp) module is not functional
3807            # when the mp.synchronize module cannot be imported.
3808            support.skip_if_broken_multiprocessing_synchronize()
3809            for i in range(self.repeat):
3810                log_queue = multiprocessing.Queue()
3811                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3812                log_queue.close()
3813                log_queue.join_thread()
3814            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3815                             'correct number of handled log messages')
3816
3817        @staticmethod
3818        def get_all_from_queue(log_queue):
3819            try:
3820                while True:
3821                    yield log_queue.get_nowait()
3822            except queue.Empty:
3823                return []
3824
3825        def test_no_messages_in_queue_after_stop(self):
3826            """
3827            Five messages are logged then the QueueListener is stopped. This
3828            test then gets everything off the queue. Failure of this test
3829            indicates that messages were not registered on the queue until
3830            _after_ the QueueListener stopped.
3831            """
3832            # bpo-28668: The multiprocessing (mp) module is not functional
3833            # when the mp.synchronize module cannot be imported.
3834            support.skip_if_broken_multiprocessing_synchronize()
3835            for i in range(self.repeat):
3836                queue = multiprocessing.Queue()
3837                self.setup_and_log(queue, '%s_%s' %(self.id(), i))
3838                # time.sleep(1)
3839                items = list(self.get_all_from_queue(queue))
3840                queue.close()
3841                queue.join_thread()
3842
3843                expected = [[], [logging.handlers.QueueListener._sentinel]]
3844                self.assertIn(items, expected,
3845                              'Found unexpected messages in queue: %s' % (
3846                                    [m.msg if isinstance(m, logging.LogRecord)
3847                                     else m for m in items]))
3848
3849        def test_calls_task_done_after_stop(self):
3850            # Issue 36813: Make sure queue.join does not deadlock.
3851            log_queue = queue.Queue()
3852            listener = logging.handlers.QueueListener(log_queue)
3853            listener.start()
3854            listener.stop()
3855            with self.assertRaises(ValueError):
3856                # Make sure all tasks are done and .join won't block.
3857                log_queue.task_done()
3858
3859
3860ZERO = datetime.timedelta(0)
3861
3862class UTC(datetime.tzinfo):
3863    def utcoffset(self, dt):
3864        return ZERO
3865
3866    dst = utcoffset
3867
3868    def tzname(self, dt):
3869        return 'UTC'
3870
3871utc = UTC()
3872
3873class AssertErrorMessage:
3874
3875    def assert_error_message(self, exception, message, *args, **kwargs):
3876        try:
3877            self.assertRaises((), *args, **kwargs)
3878        except exception as e:
3879            self.assertEqual(message, str(e))
3880
3881class FormatterTest(unittest.TestCase, AssertErrorMessage):
3882    def setUp(self):
3883        self.common = {
3884            'name': 'formatter.test',
3885            'level': logging.DEBUG,
3886            'pathname': os.path.join('path', 'to', 'dummy.ext'),
3887            'lineno': 42,
3888            'exc_info': None,
3889            'func': None,
3890            'msg': 'Message with %d %s',
3891            'args': (2, 'placeholders'),
3892        }
3893        self.variants = {
3894            'custom': {
3895                'custom': 1234
3896            }
3897        }
3898
3899    def get_record(self, name=None):
3900        result = dict(self.common)
3901        if name is not None:
3902            result.update(self.variants[name])
3903        return logging.makeLogRecord(result)
3904
3905    def test_percent(self):
3906        # Test %-formatting
3907        r = self.get_record()
3908        f = logging.Formatter('${%(message)s}')
3909        self.assertEqual(f.format(r), '${Message with 2 placeholders}')
3910        f = logging.Formatter('%(random)s')
3911        self.assertRaises(ValueError, f.format, r)
3912        self.assertFalse(f.usesTime())
3913        f = logging.Formatter('%(asctime)s')
3914        self.assertTrue(f.usesTime())
3915        f = logging.Formatter('%(asctime)-15s')
3916        self.assertTrue(f.usesTime())
3917        f = logging.Formatter('%(asctime)#15s')
3918        self.assertTrue(f.usesTime())
3919
3920    def test_braces(self):
3921        # Test {}-formatting
3922        r = self.get_record()
3923        f = logging.Formatter('$%{message}%$', style='{')
3924        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3925        f = logging.Formatter('{random}', style='{')
3926        self.assertRaises(ValueError, f.format, r)
3927        f = logging.Formatter("{message}", style='{')
3928        self.assertFalse(f.usesTime())
3929        f = logging.Formatter('{asctime}', style='{')
3930        self.assertTrue(f.usesTime())
3931        f = logging.Formatter('{asctime!s:15}', style='{')
3932        self.assertTrue(f.usesTime())
3933        f = logging.Formatter('{asctime:15}', style='{')
3934        self.assertTrue(f.usesTime())
3935
3936    def test_dollars(self):
3937        # Test $-formatting
3938        r = self.get_record()
3939        f = logging.Formatter('${message}', style='$')
3940        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3941        f = logging.Formatter('$message', style='$')
3942        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3943        f = logging.Formatter('$$%${message}%$$', style='$')
3944        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3945        f = logging.Formatter('${random}', style='$')
3946        self.assertRaises(ValueError, f.format, r)
3947        self.assertFalse(f.usesTime())
3948        f = logging.Formatter('${asctime}', style='$')
3949        self.assertTrue(f.usesTime())
3950        f = logging.Formatter('$asctime', style='$')
3951        self.assertTrue(f.usesTime())
3952        f = logging.Formatter('${message}', style='$')
3953        self.assertFalse(f.usesTime())
3954        f = logging.Formatter('${asctime}--', style='$')
3955        self.assertTrue(f.usesTime())
3956
3957    def test_format_validate(self):
3958        # Check correct formatting
3959        # Percentage style
3960        f = logging.Formatter("%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
3961        self.assertEqual(f._fmt, "%(levelname)-15s - %(message) 5s - %(process)03d - %(module) - %(asctime)*.3s")
3962        f = logging.Formatter("%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
3963        self.assertEqual(f._fmt, "%(asctime)*s - %(asctime)*.3s - %(process)-34.33o")
3964        f = logging.Formatter("%(process)#+027.23X")
3965        self.assertEqual(f._fmt, "%(process)#+027.23X")
3966        f = logging.Formatter("%(foo)#.*g")
3967        self.assertEqual(f._fmt, "%(foo)#.*g")
3968
3969        # StrFormat Style
3970        f = logging.Formatter("$%{message}%$ - {asctime!a:15} - {customfield['key']}", style="{")
3971        self.assertEqual(f._fmt, "$%{message}%$ - {asctime!a:15} - {customfield['key']}")
3972        f = logging.Formatter("{process:.2f} - {custom.f:.4f}", style="{")
3973        self.assertEqual(f._fmt, "{process:.2f} - {custom.f:.4f}")
3974        f = logging.Formatter("{customfield!s:#<30}", style="{")
3975        self.assertEqual(f._fmt, "{customfield!s:#<30}")
3976        f = logging.Formatter("{message!r}", style="{")
3977        self.assertEqual(f._fmt, "{message!r}")
3978        f = logging.Formatter("{message!s}", style="{")
3979        self.assertEqual(f._fmt, "{message!s}")
3980        f = logging.Formatter("{message!a}", style="{")
3981        self.assertEqual(f._fmt, "{message!a}")
3982        f = logging.Formatter("{process!r:4.2}", style="{")
3983        self.assertEqual(f._fmt, "{process!r:4.2}")
3984        f = logging.Formatter("{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}", style="{")
3985        self.assertEqual(f._fmt, "{process!s:<#30,.12f}- {custom:=+#30,.1d} - {module:^30}")
3986        f = logging.Formatter("{process!s:{w},.{p}}", style="{")
3987        self.assertEqual(f._fmt, "{process!s:{w},.{p}}")
3988        f = logging.Formatter("{foo:12.{p}}", style="{")
3989        self.assertEqual(f._fmt, "{foo:12.{p}}")
3990        f = logging.Formatter("{foo:{w}.6}", style="{")
3991        self.assertEqual(f._fmt, "{foo:{w}.6}")
3992        f = logging.Formatter("{foo[0].bar[1].baz}", style="{")
3993        self.assertEqual(f._fmt, "{foo[0].bar[1].baz}")
3994        f = logging.Formatter("{foo[k1].bar[k2].baz}", style="{")
3995        self.assertEqual(f._fmt, "{foo[k1].bar[k2].baz}")
3996        f = logging.Formatter("{12[k1].bar[k2].baz}", style="{")
3997        self.assertEqual(f._fmt, "{12[k1].bar[k2].baz}")
3998
3999        # Dollar style
4000        f = logging.Formatter("${asctime} - $message", style="$")
4001        self.assertEqual(f._fmt, "${asctime} - $message")
4002        f = logging.Formatter("$bar $$", style="$")
4003        self.assertEqual(f._fmt, "$bar $$")
4004        f = logging.Formatter("$bar $$$$", style="$")
4005        self.assertEqual(f._fmt, "$bar $$$$")  # this would print two $($$)
4006
4007        # Testing when ValueError being raised from incorrect format
4008        # Percentage Style
4009        self.assertRaises(ValueError, logging.Formatter, "%(asctime)Z")
4010        self.assertRaises(ValueError, logging.Formatter, "%(asctime)b")
4011        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*")
4012        self.assertRaises(ValueError, logging.Formatter, "%(asctime)*3s")
4013        self.assertRaises(ValueError, logging.Formatter, "%(asctime)_")
4014        self.assertRaises(ValueError, logging.Formatter, '{asctime}')
4015        self.assertRaises(ValueError, logging.Formatter, '${message}')
4016        self.assertRaises(ValueError, logging.Formatter, '%(foo)#12.3*f')  # with both * and decimal number as precision
4017        self.assertRaises(ValueError, logging.Formatter, '%(foo)0*.8*f')
4018
4019        # StrFormat Style
4020        # Testing failure for '-' in field name
4021        self.assert_error_message(
4022            ValueError,
4023            "invalid format: invalid field name/expression: 'name-thing'",
4024            logging.Formatter, "{name-thing}", style="{"
4025        )
4026        # Testing failure for style mismatch
4027        self.assert_error_message(
4028            ValueError,
4029            "invalid format: no fields",
4030            logging.Formatter, '%(asctime)s', style='{'
4031        )
4032        # Testing failure for invalid conversion
4033        self.assert_error_message(
4034            ValueError,
4035            "invalid conversion: 'Z'"
4036        )
4037        self.assertRaises(ValueError, logging.Formatter, '{asctime!s:#30,15f}', style='{')
4038        self.assert_error_message(
4039            ValueError,
4040            "invalid format: expected ':' after conversion specifier",
4041            logging.Formatter, '{asctime!aa:15}', style='{'
4042        )
4043        # Testing failure for invalid spec
4044        self.assert_error_message(
4045            ValueError,
4046            "invalid format: bad specifier: '.2ff'",
4047            logging.Formatter, '{process:.2ff}', style='{'
4048        )
4049        self.assertRaises(ValueError, logging.Formatter, '{process:.2Z}', style='{')
4050        self.assertRaises(ValueError, logging.Formatter, '{process!s:<##30,12g}', style='{')
4051        self.assertRaises(ValueError, logging.Formatter, '{process!s:<#30#,12g}', style='{')
4052        self.assertRaises(ValueError, logging.Formatter, '{process!s:{{w}},{{p}}}', style='{')
4053        # Testing failure for mismatch braces
4054        self.assert_error_message(
4055            ValueError,
4056            "invalid format: expected '}' before end of string",
4057            logging.Formatter, '{process', style='{'
4058        )
4059        self.assert_error_message(
4060            ValueError,
4061            "invalid format: Single '}' encountered in format string",
4062            logging.Formatter, 'process}', style='{'
4063        )
4064        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}', style='{')
4065        self.assertRaises(ValueError, logging.Formatter, '{{foo!r:4.2}}', style='{')
4066        self.assertRaises(ValueError, logging.Formatter, '{foo/bar}', style='{')
4067        self.assertRaises(ValueError, logging.Formatter, '{foo:{{w}}.{{p}}}}', style='{')
4068        self.assertRaises(ValueError, logging.Formatter, '{foo!X:{{w}}.{{p}}}', style='{')
4069        self.assertRaises(ValueError, logging.Formatter, '{foo!a:random}', style='{')
4070        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{dom}', style='{')
4071        self.assertRaises(ValueError, logging.Formatter, '{foo!a:ran{d}om}', style='{')
4072        self.assertRaises(ValueError, logging.Formatter, '{foo.!a:d}', style='{')
4073
4074        # Dollar style
4075        # Testing failure for mismatch bare $
4076        self.assert_error_message(
4077            ValueError,
4078            "invalid format: bare \'$\' not allowed",
4079            logging.Formatter, '$bar $$$', style='$'
4080        )
4081        self.assert_error_message(
4082            ValueError,
4083            "invalid format: bare \'$\' not allowed",
4084            logging.Formatter, 'bar $', style='$'
4085        )
4086        self.assert_error_message(
4087            ValueError,
4088            "invalid format: bare \'$\' not allowed",
4089            logging.Formatter, 'foo $.', style='$'
4090        )
4091        # Testing failure for mismatch style
4092        self.assert_error_message(
4093            ValueError,
4094            "invalid format: no fields",
4095            logging.Formatter, '{asctime}', style='$'
4096        )
4097        self.assertRaises(ValueError, logging.Formatter, '%(asctime)s', style='$')
4098
4099        # Testing failure for incorrect fields
4100        self.assert_error_message(
4101            ValueError,
4102            "invalid format: no fields",
4103            logging.Formatter, 'foo', style='$'
4104        )
4105        self.assertRaises(ValueError, logging.Formatter, '${asctime', style='$')
4106
4107    def test_defaults_parameter(self):
4108        fmts = ['%(custom)s %(message)s', '{custom} {message}', '$custom $message']
4109        styles = ['%', '{', '$']
4110        for fmt, style in zip(fmts, styles):
4111            f = logging.Formatter(fmt, style=style, defaults={'custom': 'Default'})
4112            r = self.get_record()
4113            self.assertEqual(f.format(r), 'Default Message with 2 placeholders')
4114            r = self.get_record("custom")
4115            self.assertEqual(f.format(r), '1234 Message with 2 placeholders')
4116
4117            # Without default
4118            f = logging.Formatter(fmt, style=style)
4119            r = self.get_record()
4120            self.assertRaises(ValueError, f.format, r)
4121
4122            # Non-existing default is ignored
4123            f = logging.Formatter(fmt, style=style, defaults={'Non-existing': 'Default'})
4124            r = self.get_record("custom")
4125            self.assertEqual(f.format(r), '1234 Message with 2 placeholders')
4126
4127    def test_invalid_style(self):
4128        self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
4129
4130    def test_time(self):
4131        r = self.get_record()
4132        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc)
4133        # We use None to indicate we want the local timezone
4134        # We're essentially converting a UTC time to local time
4135        r.created = time.mktime(dt.astimezone(None).timetuple())
4136        r.msecs = 123
4137        f = logging.Formatter('%(asctime)s %(message)s')
4138        f.converter = time.gmtime
4139        self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
4140        self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
4141        f.format(r)
4142        self.assertEqual(r.asctime, '1993-04-21 08:03:00,123')
4143
4144    def test_default_msec_format_none(self):
4145        class NoMsecFormatter(logging.Formatter):
4146            default_msec_format = None
4147            default_time_format = '%d/%m/%Y %H:%M:%S'
4148
4149        r = self.get_record()
4150        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 123, utc)
4151        r.created = time.mktime(dt.astimezone(None).timetuple())
4152        f = NoMsecFormatter()
4153        f.converter = time.gmtime
4154        self.assertEqual(f.formatTime(r), '21/04/1993 08:03:00')
4155
4156    def test_issue_89047(self):
4157        f = logging.Formatter(fmt='{asctime}.{msecs:03.0f} {message}', style='{', datefmt="%Y-%m-%d %H:%M:%S")
4158        for i in range(2500):
4159            time.sleep(0.0004)
4160            r = logging.makeLogRecord({'msg': 'Message %d' % (i + 1)})
4161            s = f.format(r)
4162            self.assertNotIn('.1000', s)
4163
4164
4165class TestBufferingFormatter(logging.BufferingFormatter):
4166    def formatHeader(self, records):
4167        return '[(%d)' % len(records)
4168
4169    def formatFooter(self, records):
4170        return '(%d)]' % len(records)
4171
4172class BufferingFormatterTest(unittest.TestCase):
4173    def setUp(self):
4174        self.records = [
4175            logging.makeLogRecord({'msg': 'one'}),
4176            logging.makeLogRecord({'msg': 'two'}),
4177        ]
4178
4179    def test_default(self):
4180        f = logging.BufferingFormatter()
4181        self.assertEqual('', f.format([]))
4182        self.assertEqual('onetwo', f.format(self.records))
4183
4184    def test_custom(self):
4185        f = TestBufferingFormatter()
4186        self.assertEqual('[(2)onetwo(2)]', f.format(self.records))
4187        lf = logging.Formatter('<%(message)s>')
4188        f = TestBufferingFormatter(lf)
4189        self.assertEqual('[(2)<one><two>(2)]', f.format(self.records))
4190
4191class ExceptionTest(BaseTest):
4192    def test_formatting(self):
4193        r = self.root_logger
4194        h = RecordingHandler()
4195        r.addHandler(h)
4196        try:
4197            raise RuntimeError('deliberate mistake')
4198        except:
4199            logging.exception('failed', stack_info=True)
4200        r.removeHandler(h)
4201        h.close()
4202        r = h.records[0]
4203        self.assertTrue(r.exc_text.startswith('Traceback (most recent '
4204                                              'call last):\n'))
4205        self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
4206                                            'deliberate mistake'))
4207        self.assertTrue(r.stack_info.startswith('Stack (most recent '
4208                                              'call last):\n'))
4209        self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
4210                                            'stack_info=True)'))
4211
4212
4213class LastResortTest(BaseTest):
4214    def test_last_resort(self):
4215        # Test the last resort handler
4216        root = self.root_logger
4217        root.removeHandler(self.root_hdlr)
4218        old_lastresort = logging.lastResort
4219        old_raise_exceptions = logging.raiseExceptions
4220
4221        try:
4222            with support.captured_stderr() as stderr:
4223                root.debug('This should not appear')
4224                self.assertEqual(stderr.getvalue(), '')
4225                root.warning('Final chance!')
4226                self.assertEqual(stderr.getvalue(), 'Final chance!\n')
4227
4228            # No handlers and no last resort, so 'No handlers' message
4229            logging.lastResort = None
4230            with support.captured_stderr() as stderr:
4231                root.warning('Final chance!')
4232                msg = 'No handlers could be found for logger "root"\n'
4233                self.assertEqual(stderr.getvalue(), msg)
4234
4235            # 'No handlers' message only printed once
4236            with support.captured_stderr() as stderr:
4237                root.warning('Final chance!')
4238                self.assertEqual(stderr.getvalue(), '')
4239
4240            # If raiseExceptions is False, no message is printed
4241            root.manager.emittedNoHandlerWarning = False
4242            logging.raiseExceptions = False
4243            with support.captured_stderr() as stderr:
4244                root.warning('Final chance!')
4245                self.assertEqual(stderr.getvalue(), '')
4246        finally:
4247            root.addHandler(self.root_hdlr)
4248            logging.lastResort = old_lastresort
4249            logging.raiseExceptions = old_raise_exceptions
4250
4251
4252class FakeHandler:
4253
4254    def __init__(self, identifier, called):
4255        for method in ('acquire', 'flush', 'close', 'release'):
4256            setattr(self, method, self.record_call(identifier, method, called))
4257
4258    def record_call(self, identifier, method_name, called):
4259        def inner():
4260            called.append('{} - {}'.format(identifier, method_name))
4261        return inner
4262
4263
4264class RecordingHandler(logging.NullHandler):
4265
4266    def __init__(self, *args, **kwargs):
4267        super(RecordingHandler, self).__init__(*args, **kwargs)
4268        self.records = []
4269
4270    def handle(self, record):
4271        """Keep track of all the emitted records."""
4272        self.records.append(record)
4273
4274
4275class ShutdownTest(BaseTest):
4276
4277    """Test suite for the shutdown method."""
4278
4279    def setUp(self):
4280        super(ShutdownTest, self).setUp()
4281        self.called = []
4282
4283        raise_exceptions = logging.raiseExceptions
4284        self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions)
4285
4286    def raise_error(self, error):
4287        def inner():
4288            raise error()
4289        return inner
4290
4291    def test_no_failure(self):
4292        # create some fake handlers
4293        handler0 = FakeHandler(0, self.called)
4294        handler1 = FakeHandler(1, self.called)
4295        handler2 = FakeHandler(2, self.called)
4296
4297        # create live weakref to those handlers
4298        handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
4299
4300        logging.shutdown(handlerList=list(handlers))
4301
4302        expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
4303                    '1 - acquire', '1 - flush', '1 - close', '1 - release',
4304                    '0 - acquire', '0 - flush', '0 - close', '0 - release']
4305        self.assertEqual(expected, self.called)
4306
4307    def _test_with_failure_in_method(self, method, error):
4308        handler = FakeHandler(0, self.called)
4309        setattr(handler, method, self.raise_error(error))
4310        handlers = [logging.weakref.ref(handler)]
4311
4312        logging.shutdown(handlerList=list(handlers))
4313
4314        self.assertEqual('0 - release', self.called[-1])
4315
4316    def test_with_ioerror_in_acquire(self):
4317        self._test_with_failure_in_method('acquire', OSError)
4318
4319    def test_with_ioerror_in_flush(self):
4320        self._test_with_failure_in_method('flush', OSError)
4321
4322    def test_with_ioerror_in_close(self):
4323        self._test_with_failure_in_method('close', OSError)
4324
4325    def test_with_valueerror_in_acquire(self):
4326        self._test_with_failure_in_method('acquire', ValueError)
4327
4328    def test_with_valueerror_in_flush(self):
4329        self._test_with_failure_in_method('flush', ValueError)
4330
4331    def test_with_valueerror_in_close(self):
4332        self._test_with_failure_in_method('close', ValueError)
4333
4334    def test_with_other_error_in_acquire_without_raise(self):
4335        logging.raiseExceptions = False
4336        self._test_with_failure_in_method('acquire', IndexError)
4337
4338    def test_with_other_error_in_flush_without_raise(self):
4339        logging.raiseExceptions = False
4340        self._test_with_failure_in_method('flush', IndexError)
4341
4342    def test_with_other_error_in_close_without_raise(self):
4343        logging.raiseExceptions = False
4344        self._test_with_failure_in_method('close', IndexError)
4345
4346    def test_with_other_error_in_acquire_with_raise(self):
4347        logging.raiseExceptions = True
4348        self.assertRaises(IndexError, self._test_with_failure_in_method,
4349                          'acquire', IndexError)
4350
4351    def test_with_other_error_in_flush_with_raise(self):
4352        logging.raiseExceptions = True
4353        self.assertRaises(IndexError, self._test_with_failure_in_method,
4354                          'flush', IndexError)
4355
4356    def test_with_other_error_in_close_with_raise(self):
4357        logging.raiseExceptions = True
4358        self.assertRaises(IndexError, self._test_with_failure_in_method,
4359                          'close', IndexError)
4360
4361
4362class ModuleLevelMiscTest(BaseTest):
4363
4364    """Test suite for some module level methods."""
4365
4366    def test_disable(self):
4367        old_disable = logging.root.manager.disable
4368        # confirm our assumptions are correct
4369        self.assertEqual(old_disable, 0)
4370        self.addCleanup(logging.disable, old_disable)
4371
4372        logging.disable(83)
4373        self.assertEqual(logging.root.manager.disable, 83)
4374
4375        self.assertRaises(ValueError, logging.disable, "doesnotexists")
4376
4377        class _NotAnIntOrString:
4378            pass
4379
4380        self.assertRaises(TypeError, logging.disable, _NotAnIntOrString())
4381
4382        logging.disable("WARN")
4383
4384        # test the default value introduced in 3.7
4385        # (Issue #28524)
4386        logging.disable()
4387        self.assertEqual(logging.root.manager.disable, logging.CRITICAL)
4388
4389    def _test_log(self, method, level=None):
4390        called = []
4391        support.patch(self, logging, 'basicConfig',
4392                      lambda *a, **kw: called.append((a, kw)))
4393
4394        recording = RecordingHandler()
4395        logging.root.addHandler(recording)
4396
4397        log_method = getattr(logging, method)
4398        if level is not None:
4399            log_method(level, "test me: %r", recording)
4400        else:
4401            log_method("test me: %r", recording)
4402
4403        self.assertEqual(len(recording.records), 1)
4404        record = recording.records[0]
4405        self.assertEqual(record.getMessage(), "test me: %r" % recording)
4406
4407        expected_level = level if level is not None else getattr(logging, method.upper())
4408        self.assertEqual(record.levelno, expected_level)
4409
4410        # basicConfig was not called!
4411        self.assertEqual(called, [])
4412
4413    def test_log(self):
4414        self._test_log('log', logging.ERROR)
4415
4416    def test_debug(self):
4417        self._test_log('debug')
4418
4419    def test_info(self):
4420        self._test_log('info')
4421
4422    def test_warning(self):
4423        self._test_log('warning')
4424
4425    def test_error(self):
4426        self._test_log('error')
4427
4428    def test_critical(self):
4429        self._test_log('critical')
4430
4431    def test_set_logger_class(self):
4432        self.assertRaises(TypeError, logging.setLoggerClass, object)
4433
4434        class MyLogger(logging.Logger):
4435            pass
4436
4437        logging.setLoggerClass(MyLogger)
4438        self.assertEqual(logging.getLoggerClass(), MyLogger)
4439
4440        logging.setLoggerClass(logging.Logger)
4441        self.assertEqual(logging.getLoggerClass(), logging.Logger)
4442
4443    def test_subclass_logger_cache(self):
4444        # bpo-37258
4445        message = []
4446
4447        class MyLogger(logging.getLoggerClass()):
4448            def __init__(self, name='MyLogger', level=logging.NOTSET):
4449                super().__init__(name, level)
4450                message.append('initialized')
4451
4452        logging.setLoggerClass(MyLogger)
4453        logger = logging.getLogger('just_some_logger')
4454        self.assertEqual(message, ['initialized'])
4455        stream = io.StringIO()
4456        h = logging.StreamHandler(stream)
4457        logger.addHandler(h)
4458        try:
4459            logger.setLevel(logging.DEBUG)
4460            logger.debug("hello")
4461            self.assertEqual(stream.getvalue().strip(), "hello")
4462
4463            stream.truncate(0)
4464            stream.seek(0)
4465
4466            logger.setLevel(logging.INFO)
4467            logger.debug("hello")
4468            self.assertEqual(stream.getvalue(), "")
4469        finally:
4470            logger.removeHandler(h)
4471            h.close()
4472            logging.setLoggerClass(logging.Logger)
4473
4474    def test_logging_at_shutdown(self):
4475        # bpo-20037: Doing text I/O late at interpreter shutdown must not crash
4476        code = textwrap.dedent("""
4477            import logging
4478
4479            class A:
4480                def __del__(self):
4481                    try:
4482                        raise ValueError("some error")
4483                    except Exception:
4484                        logging.exception("exception in __del__")
4485
4486            a = A()
4487        """)
4488        rc, out, err = assert_python_ok("-c", code)
4489        err = err.decode()
4490        self.assertIn("exception in __del__", err)
4491        self.assertIn("ValueError: some error", err)
4492
4493    def test_logging_at_shutdown_open(self):
4494        # bpo-26789: FileHandler keeps a reference to the builtin open()
4495        # function to be able to open or reopen the file during Python
4496        # finalization.
4497        filename = os_helper.TESTFN
4498        self.addCleanup(os_helper.unlink, filename)
4499
4500        code = textwrap.dedent(f"""
4501            import builtins
4502            import logging
4503
4504            class A:
4505                def __del__(self):
4506                    logging.error("log in __del__")
4507
4508            # basicConfig() opens the file, but logging.shutdown() closes
4509            # it at Python exit. When A.__del__() is called,
4510            # FileHandler._open() must be called again to re-open the file.
4511            logging.basicConfig(filename={filename!r}, encoding="utf-8")
4512
4513            a = A()
4514
4515            # Simulate the Python finalization which removes the builtin
4516            # open() function.
4517            del builtins.open
4518        """)
4519        assert_python_ok("-c", code)
4520
4521        with open(filename, encoding="utf-8") as fp:
4522            self.assertEqual(fp.read().rstrip(), "ERROR:root:log in __del__")
4523
4524    def test_recursion_error(self):
4525        # Issue 36272
4526        code = textwrap.dedent("""
4527            import logging
4528
4529            def rec():
4530                logging.error("foo")
4531                rec()
4532
4533            rec()
4534        """)
4535        rc, out, err = assert_python_failure("-c", code)
4536        err = err.decode()
4537        self.assertNotIn("Cannot recover from stack overflow.", err)
4538        self.assertEqual(rc, 1)
4539
4540    def test_get_level_names_mapping(self):
4541        mapping = logging.getLevelNamesMapping()
4542        self.assertEqual(logging._nameToLevel, mapping)  # value is equivalent
4543        self.assertIsNot(logging._nameToLevel, mapping)  # but not the internal data
4544        new_mapping = logging.getLevelNamesMapping()     # another call -> another copy
4545        self.assertIsNot(mapping, new_mapping)           # verify not the same object as before
4546        self.assertEqual(mapping, new_mapping)           # but equivalent in value
4547
4548
4549class LogRecordTest(BaseTest):
4550    def test_str_rep(self):
4551        r = logging.makeLogRecord({})
4552        s = str(r)
4553        self.assertTrue(s.startswith('<LogRecord: '))
4554        self.assertTrue(s.endswith('>'))
4555
4556    def test_dict_arg(self):
4557        h = RecordingHandler()
4558        r = logging.getLogger()
4559        r.addHandler(h)
4560        d = {'less' : 'more' }
4561        logging.warning('less is %(less)s', d)
4562        self.assertIs(h.records[0].args, d)
4563        self.assertEqual(h.records[0].message, 'less is more')
4564        r.removeHandler(h)
4565        h.close()
4566
4567    @staticmethod # pickled as target of child process in the following test
4568    def _extract_logrecord_process_name(key, logMultiprocessing, conn=None):
4569        prev_logMultiprocessing = logging.logMultiprocessing
4570        logging.logMultiprocessing = logMultiprocessing
4571        try:
4572            import multiprocessing as mp
4573            name = mp.current_process().name
4574
4575            r1 = logging.makeLogRecord({'msg': f'msg1_{key}'})
4576
4577            # https://bugs.python.org/issue45128
4578            with support.swap_item(sys.modules, 'multiprocessing', None):
4579                r2 = logging.makeLogRecord({'msg': f'msg2_{key}'})
4580
4581            results = {'processName'  : name,
4582                       'r1.processName': r1.processName,
4583                       'r2.processName': r2.processName,
4584                      }
4585        finally:
4586            logging.logMultiprocessing = prev_logMultiprocessing
4587        if conn:
4588            conn.send(results)
4589        else:
4590            return results
4591
4592    def test_multiprocessing(self):
4593        support.skip_if_broken_multiprocessing_synchronize()
4594        multiprocessing_imported = 'multiprocessing' in sys.modules
4595        try:
4596            # logMultiprocessing is True by default
4597            self.assertEqual(logging.logMultiprocessing, True)
4598
4599            LOG_MULTI_PROCESSING = True
4600            # When logMultiprocessing == True:
4601            # In the main process processName = 'MainProcess'
4602            r = logging.makeLogRecord({})
4603            self.assertEqual(r.processName, 'MainProcess')
4604
4605            results = self._extract_logrecord_process_name(1, LOG_MULTI_PROCESSING)
4606            self.assertEqual('MainProcess', results['processName'])
4607            self.assertEqual('MainProcess', results['r1.processName'])
4608            self.assertEqual('MainProcess', results['r2.processName'])
4609
4610            # In other processes, processName is correct when multiprocessing in imported,
4611            # but it is (incorrectly) defaulted to 'MainProcess' otherwise (bpo-38762).
4612            import multiprocessing
4613            parent_conn, child_conn = multiprocessing.Pipe()
4614            p = multiprocessing.Process(
4615                target=self._extract_logrecord_process_name,
4616                args=(2, LOG_MULTI_PROCESSING, child_conn,)
4617            )
4618            p.start()
4619            results = parent_conn.recv()
4620            self.assertNotEqual('MainProcess', results['processName'])
4621            self.assertEqual(results['processName'], results['r1.processName'])
4622            self.assertEqual('MainProcess', results['r2.processName'])
4623            p.join()
4624
4625        finally:
4626            if multiprocessing_imported:
4627                import multiprocessing
4628
4629    def test_optional(self):
4630        r = logging.makeLogRecord({})
4631        NOT_NONE = self.assertIsNotNone
4632        NOT_NONE(r.thread)
4633        NOT_NONE(r.threadName)
4634        NOT_NONE(r.process)
4635        NOT_NONE(r.processName)
4636        log_threads = logging.logThreads
4637        log_processes = logging.logProcesses
4638        log_multiprocessing = logging.logMultiprocessing
4639        try:
4640            logging.logThreads = False
4641            logging.logProcesses = False
4642            logging.logMultiprocessing = False
4643            r = logging.makeLogRecord({})
4644            NONE = self.assertIsNone
4645            NONE(r.thread)
4646            NONE(r.threadName)
4647            NONE(r.process)
4648            NONE(r.processName)
4649        finally:
4650            logging.logThreads = log_threads
4651            logging.logProcesses = log_processes
4652            logging.logMultiprocessing = log_multiprocessing
4653
4654class BasicConfigTest(unittest.TestCase):
4655
4656    """Test suite for logging.basicConfig."""
4657
4658    def setUp(self):
4659        super(BasicConfigTest, self).setUp()
4660        self.handlers = logging.root.handlers
4661        self.saved_handlers = logging._handlers.copy()
4662        self.saved_handler_list = logging._handlerList[:]
4663        self.original_logging_level = logging.root.level
4664        self.addCleanup(self.cleanup)
4665        logging.root.handlers = []
4666
4667    def tearDown(self):
4668        for h in logging.root.handlers[:]:
4669            logging.root.removeHandler(h)
4670            h.close()
4671        super(BasicConfigTest, self).tearDown()
4672
4673    def cleanup(self):
4674        setattr(logging.root, 'handlers', self.handlers)
4675        logging._handlers.clear()
4676        logging._handlers.update(self.saved_handlers)
4677        logging._handlerList[:] = self.saved_handler_list
4678        logging.root.setLevel(self.original_logging_level)
4679
4680    def test_no_kwargs(self):
4681        logging.basicConfig()
4682
4683        # handler defaults to a StreamHandler to sys.stderr
4684        self.assertEqual(len(logging.root.handlers), 1)
4685        handler = logging.root.handlers[0]
4686        self.assertIsInstance(handler, logging.StreamHandler)
4687        self.assertEqual(handler.stream, sys.stderr)
4688
4689        formatter = handler.formatter
4690        # format defaults to logging.BASIC_FORMAT
4691        self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
4692        # datefmt defaults to None
4693        self.assertIsNone(formatter.datefmt)
4694        # style defaults to %
4695        self.assertIsInstance(formatter._style, logging.PercentStyle)
4696
4697        # level is not explicitly set
4698        self.assertEqual(logging.root.level, self.original_logging_level)
4699
4700    def test_strformatstyle(self):
4701        with support.captured_stdout() as output:
4702            logging.basicConfig(stream=sys.stdout, style="{")
4703            logging.error("Log an error")
4704            sys.stdout.seek(0)
4705            self.assertEqual(output.getvalue().strip(),
4706                "ERROR:root:Log an error")
4707
4708    def test_stringtemplatestyle(self):
4709        with support.captured_stdout() as output:
4710            logging.basicConfig(stream=sys.stdout, style="$")
4711            logging.error("Log an error")
4712            sys.stdout.seek(0)
4713            self.assertEqual(output.getvalue().strip(),
4714                "ERROR:root:Log an error")
4715
4716    def test_filename(self):
4717
4718        def cleanup(h1, h2, fn):
4719            h1.close()
4720            h2.close()
4721            os.remove(fn)
4722
4723        logging.basicConfig(filename='test.log', encoding='utf-8')
4724
4725        self.assertEqual(len(logging.root.handlers), 1)
4726        handler = logging.root.handlers[0]
4727        self.assertIsInstance(handler, logging.FileHandler)
4728
4729        expected = logging.FileHandler('test.log', 'a', encoding='utf-8')
4730        self.assertEqual(handler.stream.mode, expected.stream.mode)
4731        self.assertEqual(handler.stream.name, expected.stream.name)
4732        self.addCleanup(cleanup, handler, expected, 'test.log')
4733
4734    def test_filemode(self):
4735
4736        def cleanup(h1, h2, fn):
4737            h1.close()
4738            h2.close()
4739            os.remove(fn)
4740
4741        logging.basicConfig(filename='test.log', filemode='wb')
4742
4743        handler = logging.root.handlers[0]
4744        expected = logging.FileHandler('test.log', 'wb')
4745        self.assertEqual(handler.stream.mode, expected.stream.mode)
4746        self.addCleanup(cleanup, handler, expected, 'test.log')
4747
4748    def test_stream(self):
4749        stream = io.StringIO()
4750        self.addCleanup(stream.close)
4751        logging.basicConfig(stream=stream)
4752
4753        self.assertEqual(len(logging.root.handlers), 1)
4754        handler = logging.root.handlers[0]
4755        self.assertIsInstance(handler, logging.StreamHandler)
4756        self.assertEqual(handler.stream, stream)
4757
4758    def test_format(self):
4759        logging.basicConfig(format='%(asctime)s - %(message)s')
4760
4761        formatter = logging.root.handlers[0].formatter
4762        self.assertEqual(formatter._style._fmt, '%(asctime)s - %(message)s')
4763
4764    def test_datefmt(self):
4765        logging.basicConfig(datefmt='bar')
4766
4767        formatter = logging.root.handlers[0].formatter
4768        self.assertEqual(formatter.datefmt, 'bar')
4769
4770    def test_style(self):
4771        logging.basicConfig(style='$')
4772
4773        formatter = logging.root.handlers[0].formatter
4774        self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
4775
4776    def test_level(self):
4777        old_level = logging.root.level
4778        self.addCleanup(logging.root.setLevel, old_level)
4779
4780        logging.basicConfig(level=57)
4781        self.assertEqual(logging.root.level, 57)
4782        # Test that second call has no effect
4783        logging.basicConfig(level=58)
4784        self.assertEqual(logging.root.level, 57)
4785
4786    def test_incompatible(self):
4787        assertRaises = self.assertRaises
4788        handlers = [logging.StreamHandler()]
4789        stream = sys.stderr
4790        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4791                                                      stream=stream)
4792        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4793                                                      handlers=handlers)
4794        assertRaises(ValueError, logging.basicConfig, stream=stream,
4795                                                      handlers=handlers)
4796        # Issue 23207: test for invalid kwargs
4797        assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO)
4798        # Should pop both filename and filemode even if filename is None
4799        logging.basicConfig(filename=None, filemode='a')
4800
4801    def test_handlers(self):
4802        handlers = [
4803            logging.StreamHandler(),
4804            logging.StreamHandler(sys.stdout),
4805            logging.StreamHandler(),
4806        ]
4807        f = logging.Formatter()
4808        handlers[2].setFormatter(f)
4809        logging.basicConfig(handlers=handlers)
4810        self.assertIs(handlers[0], logging.root.handlers[0])
4811        self.assertIs(handlers[1], logging.root.handlers[1])
4812        self.assertIs(handlers[2], logging.root.handlers[2])
4813        self.assertIsNotNone(handlers[0].formatter)
4814        self.assertIsNotNone(handlers[1].formatter)
4815        self.assertIs(handlers[2].formatter, f)
4816        self.assertIs(handlers[0].formatter, handlers[1].formatter)
4817
4818    def test_force(self):
4819        old_string_io = io.StringIO()
4820        new_string_io = io.StringIO()
4821        old_handlers = [logging.StreamHandler(old_string_io)]
4822        new_handlers = [logging.StreamHandler(new_string_io)]
4823        logging.basicConfig(level=logging.WARNING, handlers=old_handlers)
4824        logging.warning('warn')
4825        logging.info('info')
4826        logging.debug('debug')
4827        self.assertEqual(len(logging.root.handlers), 1)
4828        logging.basicConfig(level=logging.INFO, handlers=new_handlers,
4829                            force=True)
4830        logging.warning('warn')
4831        logging.info('info')
4832        logging.debug('debug')
4833        self.assertEqual(len(logging.root.handlers), 1)
4834        self.assertEqual(old_string_io.getvalue().strip(),
4835                         'WARNING:root:warn')
4836        self.assertEqual(new_string_io.getvalue().strip(),
4837                         'WARNING:root:warn\nINFO:root:info')
4838
4839    def test_encoding(self):
4840        try:
4841            encoding = 'utf-8'
4842            logging.basicConfig(filename='test.log', encoding=encoding,
4843                                errors='strict',
4844                                format='%(message)s', level=logging.DEBUG)
4845
4846            self.assertEqual(len(logging.root.handlers), 1)
4847            handler = logging.root.handlers[0]
4848            self.assertIsInstance(handler, logging.FileHandler)
4849            self.assertEqual(handler.encoding, encoding)
4850            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
4851        finally:
4852            handler.close()
4853            with open('test.log', encoding='utf-8') as f:
4854                data = f.read().strip()
4855            os.remove('test.log')
4856            self.assertEqual(data,
4857                             'The Øresund Bridge joins Copenhagen to Malmö')
4858
4859    def test_encoding_errors(self):
4860        try:
4861            encoding = 'ascii'
4862            logging.basicConfig(filename='test.log', encoding=encoding,
4863                                errors='ignore',
4864                                format='%(message)s', level=logging.DEBUG)
4865
4866            self.assertEqual(len(logging.root.handlers), 1)
4867            handler = logging.root.handlers[0]
4868            self.assertIsInstance(handler, logging.FileHandler)
4869            self.assertEqual(handler.encoding, encoding)
4870            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
4871        finally:
4872            handler.close()
4873            with open('test.log', encoding='utf-8') as f:
4874                data = f.read().strip()
4875            os.remove('test.log')
4876            self.assertEqual(data, 'The resund Bridge joins Copenhagen to Malm')
4877
4878    def test_encoding_errors_default(self):
4879        try:
4880            encoding = 'ascii'
4881            logging.basicConfig(filename='test.log', encoding=encoding,
4882                                format='%(message)s', level=logging.DEBUG)
4883
4884            self.assertEqual(len(logging.root.handlers), 1)
4885            handler = logging.root.handlers[0]
4886            self.assertIsInstance(handler, logging.FileHandler)
4887            self.assertEqual(handler.encoding, encoding)
4888            self.assertEqual(handler.errors, 'backslashreplace')
4889            logging.debug('��: ☃️: The Øresund Bridge joins Copenhagen to Malmö')
4890        finally:
4891            handler.close()
4892            with open('test.log', encoding='utf-8') as f:
4893                data = f.read().strip()
4894            os.remove('test.log')
4895            self.assertEqual(data, r'\U0001f602: \u2603\ufe0f: The \xd8resund '
4896                                   r'Bridge joins Copenhagen to Malm\xf6')
4897
4898    def test_encoding_errors_none(self):
4899        # Specifying None should behave as 'strict'
4900        try:
4901            encoding = 'ascii'
4902            logging.basicConfig(filename='test.log', encoding=encoding,
4903                                errors=None,
4904                                format='%(message)s', level=logging.DEBUG)
4905
4906            self.assertEqual(len(logging.root.handlers), 1)
4907            handler = logging.root.handlers[0]
4908            self.assertIsInstance(handler, logging.FileHandler)
4909            self.assertEqual(handler.encoding, encoding)
4910            self.assertIsNone(handler.errors)
4911
4912            message = []
4913
4914            def dummy_handle_error(record):
4915                _, v, _ = sys.exc_info()
4916                message.append(str(v))
4917
4918            handler.handleError = dummy_handle_error
4919            logging.debug('The Øresund Bridge joins Copenhagen to Malmö')
4920            self.assertTrue(message)
4921            self.assertIn("'ascii' codec can't encode "
4922                          "character '\\xd8' in position 4:", message[0])
4923        finally:
4924            handler.close()
4925            with open('test.log', encoding='utf-8') as f:
4926                data = f.read().strip()
4927            os.remove('test.log')
4928            # didn't write anything due to the encoding error
4929            self.assertEqual(data, r'')
4930
4931
4932    def _test_log(self, method, level=None):
4933        # logging.root has no handlers so basicConfig should be called
4934        called = []
4935
4936        old_basic_config = logging.basicConfig
4937        def my_basic_config(*a, **kw):
4938            old_basic_config()
4939            old_level = logging.root.level
4940            logging.root.setLevel(100)  # avoid having messages in stderr
4941            self.addCleanup(logging.root.setLevel, old_level)
4942            called.append((a, kw))
4943
4944        support.patch(self, logging, 'basicConfig', my_basic_config)
4945
4946        log_method = getattr(logging, method)
4947        if level is not None:
4948            log_method(level, "test me")
4949        else:
4950            log_method("test me")
4951
4952        # basicConfig was called with no arguments
4953        self.assertEqual(called, [((), {})])
4954
4955    def test_log(self):
4956        self._test_log('log', logging.WARNING)
4957
4958    def test_debug(self):
4959        self._test_log('debug')
4960
4961    def test_info(self):
4962        self._test_log('info')
4963
4964    def test_warning(self):
4965        self._test_log('warning')
4966
4967    def test_error(self):
4968        self._test_log('error')
4969
4970    def test_critical(self):
4971        self._test_log('critical')
4972
4973
4974class LoggerAdapterTest(unittest.TestCase):
4975    def setUp(self):
4976        super(LoggerAdapterTest, self).setUp()
4977        old_handler_list = logging._handlerList[:]
4978
4979        self.recording = RecordingHandler()
4980        self.logger = logging.root
4981        self.logger.addHandler(self.recording)
4982        self.addCleanup(self.logger.removeHandler, self.recording)
4983        self.addCleanup(self.recording.close)
4984
4985        def cleanup():
4986            logging._handlerList[:] = old_handler_list
4987
4988        self.addCleanup(cleanup)
4989        self.addCleanup(logging.shutdown)
4990        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
4991
4992    def test_exception(self):
4993        msg = 'testing exception: %r'
4994        exc = None
4995        try:
4996            1 / 0
4997        except ZeroDivisionError as e:
4998            exc = e
4999            self.adapter.exception(msg, self.recording)
5000
5001        self.assertEqual(len(self.recording.records), 1)
5002        record = self.recording.records[0]
5003        self.assertEqual(record.levelno, logging.ERROR)
5004        self.assertEqual(record.msg, msg)
5005        self.assertEqual(record.args, (self.recording,))
5006        self.assertEqual(record.exc_info,
5007                         (exc.__class__, exc, exc.__traceback__))
5008
5009    def test_exception_excinfo(self):
5010        try:
5011            1 / 0
5012        except ZeroDivisionError as e:
5013            exc = e
5014
5015        self.adapter.exception('exc_info test', exc_info=exc)
5016
5017        self.assertEqual(len(self.recording.records), 1)
5018        record = self.recording.records[0]
5019        self.assertEqual(record.exc_info,
5020                         (exc.__class__, exc, exc.__traceback__))
5021
5022    def test_critical(self):
5023        msg = 'critical test! %r'
5024        self.adapter.critical(msg, self.recording)
5025
5026        self.assertEqual(len(self.recording.records), 1)
5027        record = self.recording.records[0]
5028        self.assertEqual(record.levelno, logging.CRITICAL)
5029        self.assertEqual(record.msg, msg)
5030        self.assertEqual(record.args, (self.recording,))
5031
5032    def test_is_enabled_for(self):
5033        old_disable = self.adapter.logger.manager.disable
5034        self.adapter.logger.manager.disable = 33
5035        self.addCleanup(setattr, self.adapter.logger.manager, 'disable',
5036                        old_disable)
5037        self.assertFalse(self.adapter.isEnabledFor(32))
5038
5039    def test_has_handlers(self):
5040        self.assertTrue(self.adapter.hasHandlers())
5041
5042        for handler in self.logger.handlers:
5043            self.logger.removeHandler(handler)
5044
5045        self.assertFalse(self.logger.hasHandlers())
5046        self.assertFalse(self.adapter.hasHandlers())
5047
5048    def test_nested(self):
5049        class Adapter(logging.LoggerAdapter):
5050            prefix = 'Adapter'
5051
5052            def process(self, msg, kwargs):
5053                return f"{self.prefix} {msg}", kwargs
5054
5055        msg = 'Adapters can be nested, yo.'
5056        adapter = Adapter(logger=self.logger, extra=None)
5057        adapter_adapter = Adapter(logger=adapter, extra=None)
5058        adapter_adapter.prefix = 'AdapterAdapter'
5059        self.assertEqual(repr(adapter), repr(adapter_adapter))
5060        adapter_adapter.log(logging.CRITICAL, msg, self.recording)
5061        self.assertEqual(len(self.recording.records), 1)
5062        record = self.recording.records[0]
5063        self.assertEqual(record.levelno, logging.CRITICAL)
5064        self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}")
5065        self.assertEqual(record.args, (self.recording,))
5066        orig_manager = adapter_adapter.manager
5067        self.assertIs(adapter.manager, orig_manager)
5068        self.assertIs(self.logger.manager, orig_manager)
5069        temp_manager = object()
5070        try:
5071            adapter_adapter.manager = temp_manager
5072            self.assertIs(adapter_adapter.manager, temp_manager)
5073            self.assertIs(adapter.manager, temp_manager)
5074            self.assertIs(self.logger.manager, temp_manager)
5075        finally:
5076            adapter_adapter.manager = orig_manager
5077        self.assertIs(adapter_adapter.manager, orig_manager)
5078        self.assertIs(adapter.manager, orig_manager)
5079        self.assertIs(self.logger.manager, orig_manager)
5080
5081
5082class LoggerTest(BaseTest, AssertErrorMessage):
5083
5084    def setUp(self):
5085        super(LoggerTest, self).setUp()
5086        self.recording = RecordingHandler()
5087        self.logger = logging.Logger(name='blah')
5088        self.logger.addHandler(self.recording)
5089        self.addCleanup(self.logger.removeHandler, self.recording)
5090        self.addCleanup(self.recording.close)
5091        self.addCleanup(logging.shutdown)
5092
5093    def test_set_invalid_level(self):
5094        self.assert_error_message(
5095            TypeError, 'Level not an integer or a valid string: None',
5096            self.logger.setLevel, None)
5097        self.assert_error_message(
5098            TypeError, 'Level not an integer or a valid string: (0, 0)',
5099            self.logger.setLevel, (0, 0))
5100
5101    def test_exception(self):
5102        msg = 'testing exception: %r'
5103        exc = None
5104        try:
5105            1 / 0
5106        except ZeroDivisionError as e:
5107            exc = e
5108            self.logger.exception(msg, self.recording)
5109
5110        self.assertEqual(len(self.recording.records), 1)
5111        record = self.recording.records[0]
5112        self.assertEqual(record.levelno, logging.ERROR)
5113        self.assertEqual(record.msg, msg)
5114        self.assertEqual(record.args, (self.recording,))
5115        self.assertEqual(record.exc_info,
5116                         (exc.__class__, exc, exc.__traceback__))
5117
5118    def test_log_invalid_level_with_raise(self):
5119        with support.swap_attr(logging, 'raiseExceptions', True):
5120            self.assertRaises(TypeError, self.logger.log, '10', 'test message')
5121
5122    def test_log_invalid_level_no_raise(self):
5123        with support.swap_attr(logging, 'raiseExceptions', False):
5124            self.logger.log('10', 'test message')  # no exception happens
5125
5126    def test_find_caller_with_stack_info(self):
5127        called = []
5128        support.patch(self, logging.traceback, 'print_stack',
5129                      lambda f, file: called.append(file.getvalue()))
5130
5131        self.logger.findCaller(stack_info=True)
5132
5133        self.assertEqual(len(called), 1)
5134        self.assertEqual('Stack (most recent call last):\n', called[0])
5135
5136    def test_find_caller_with_stacklevel(self):
5137        the_level = 1
5138        trigger = self.logger.warning
5139
5140        def innermost():
5141            trigger('test', stacklevel=the_level)
5142
5143        def inner():
5144            innermost()
5145
5146        def outer():
5147            inner()
5148
5149        records = self.recording.records
5150        outer()
5151        self.assertEqual(records[-1].funcName, 'innermost')
5152        lineno = records[-1].lineno
5153        the_level += 1
5154        outer()
5155        self.assertEqual(records[-1].funcName, 'inner')
5156        self.assertGreater(records[-1].lineno, lineno)
5157        lineno = records[-1].lineno
5158        the_level += 1
5159        outer()
5160        self.assertEqual(records[-1].funcName, 'outer')
5161        self.assertGreater(records[-1].lineno, lineno)
5162        lineno = records[-1].lineno
5163        root_logger = logging.getLogger()
5164        root_logger.addHandler(self.recording)
5165        trigger = logging.warning
5166        outer()
5167        self.assertEqual(records[-1].funcName, 'outer')
5168        root_logger.removeHandler(self.recording)
5169        trigger = self.logger.warning
5170        the_level += 1
5171        outer()
5172        self.assertEqual(records[-1].funcName, 'test_find_caller_with_stacklevel')
5173        self.assertGreater(records[-1].lineno, lineno)
5174
5175    def test_make_record_with_extra_overwrite(self):
5176        name = 'my record'
5177        level = 13
5178        fn = lno = msg = args = exc_info = func = sinfo = None
5179        rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
5180                                       exc_info, func, sinfo)
5181
5182        for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
5183            extra = {key: 'some value'}
5184            self.assertRaises(KeyError, self.logger.makeRecord, name, level,
5185                              fn, lno, msg, args, exc_info,
5186                              extra=extra, sinfo=sinfo)
5187
5188    def test_make_record_with_extra_no_overwrite(self):
5189        name = 'my record'
5190        level = 13
5191        fn = lno = msg = args = exc_info = func = sinfo = None
5192        extra = {'valid_key': 'some value'}
5193        result = self.logger.makeRecord(name, level, fn, lno, msg, args,
5194                                        exc_info, extra=extra, sinfo=sinfo)
5195        self.assertIn('valid_key', result.__dict__)
5196
5197    def test_has_handlers(self):
5198        self.assertTrue(self.logger.hasHandlers())
5199
5200        for handler in self.logger.handlers:
5201            self.logger.removeHandler(handler)
5202        self.assertFalse(self.logger.hasHandlers())
5203
5204    def test_has_handlers_no_propagate(self):
5205        child_logger = logging.getLogger('blah.child')
5206        child_logger.propagate = False
5207        self.assertFalse(child_logger.hasHandlers())
5208
5209    def test_is_enabled_for(self):
5210        old_disable = self.logger.manager.disable
5211        self.logger.manager.disable = 23
5212        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
5213        self.assertFalse(self.logger.isEnabledFor(22))
5214
5215    def test_is_enabled_for_disabled_logger(self):
5216        old_disabled = self.logger.disabled
5217        old_disable = self.logger.manager.disable
5218
5219        self.logger.disabled = True
5220        self.logger.manager.disable = 21
5221
5222        self.addCleanup(setattr, self.logger, 'disabled', old_disabled)
5223        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
5224
5225        self.assertFalse(self.logger.isEnabledFor(22))
5226
5227    def test_root_logger_aliases(self):
5228        root = logging.getLogger()
5229        self.assertIs(root, logging.root)
5230        self.assertIs(root, logging.getLogger(None))
5231        self.assertIs(root, logging.getLogger(''))
5232        self.assertIs(root, logging.getLogger('root'))
5233        self.assertIs(root, logging.getLogger('foo').root)
5234        self.assertIs(root, logging.getLogger('foo.bar').root)
5235        self.assertIs(root, logging.getLogger('foo').parent)
5236
5237        self.assertIsNot(root, logging.getLogger('\0'))
5238        self.assertIsNot(root, logging.getLogger('foo.bar').parent)
5239
5240    def test_invalid_names(self):
5241        self.assertRaises(TypeError, logging.getLogger, any)
5242        self.assertRaises(TypeError, logging.getLogger, b'foo')
5243
5244    def test_pickling(self):
5245        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
5246            for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'):
5247                logger = logging.getLogger(name)
5248                s = pickle.dumps(logger, proto)
5249                unpickled = pickle.loads(s)
5250                self.assertIs(unpickled, logger)
5251
5252    def test_caching(self):
5253        root = self.root_logger
5254        logger1 = logging.getLogger("abc")
5255        logger2 = logging.getLogger("abc.def")
5256
5257        # Set root logger level and ensure cache is empty
5258        root.setLevel(logging.ERROR)
5259        self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR)
5260        self.assertEqual(logger2._cache, {})
5261
5262        # Ensure cache is populated and calls are consistent
5263        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
5264        self.assertFalse(logger2.isEnabledFor(logging.DEBUG))
5265        self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False})
5266        self.assertEqual(root._cache, {})
5267        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
5268
5269        # Ensure root cache gets populated
5270        self.assertEqual(root._cache, {})
5271        self.assertTrue(root.isEnabledFor(logging.ERROR))
5272        self.assertEqual(root._cache, {logging.ERROR: True})
5273
5274        # Set parent logger level and ensure caches are emptied
5275        logger1.setLevel(logging.CRITICAL)
5276        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
5277        self.assertEqual(logger2._cache, {})
5278
5279        # Ensure logger2 uses parent logger's effective level
5280        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
5281
5282        # Set level to NOTSET and ensure caches are empty
5283        logger2.setLevel(logging.NOTSET)
5284        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
5285        self.assertEqual(logger2._cache, {})
5286        self.assertEqual(logger1._cache, {})
5287        self.assertEqual(root._cache, {})
5288
5289        # Verify logger2 follows parent and not root
5290        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
5291        self.assertTrue(logger2.isEnabledFor(logging.CRITICAL))
5292        self.assertFalse(logger1.isEnabledFor(logging.ERROR))
5293        self.assertTrue(logger1.isEnabledFor(logging.CRITICAL))
5294        self.assertTrue(root.isEnabledFor(logging.ERROR))
5295
5296        # Disable logging in manager and ensure caches are clear
5297        logging.disable()
5298        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
5299        self.assertEqual(logger2._cache, {})
5300        self.assertEqual(logger1._cache, {})
5301        self.assertEqual(root._cache, {})
5302
5303        # Ensure no loggers are enabled
5304        self.assertFalse(logger1.isEnabledFor(logging.CRITICAL))
5305        self.assertFalse(logger2.isEnabledFor(logging.CRITICAL))
5306        self.assertFalse(root.isEnabledFor(logging.CRITICAL))
5307
5308
5309class BaseFileTest(BaseTest):
5310    "Base class for handler tests that write log files"
5311
5312    def setUp(self):
5313        BaseTest.setUp(self)
5314        fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-")
5315        os.close(fd)
5316        self.rmfiles = []
5317
5318    def tearDown(self):
5319        for fn in self.rmfiles:
5320            os.unlink(fn)
5321        if os.path.exists(self.fn):
5322            os.unlink(self.fn)
5323        BaseTest.tearDown(self)
5324
5325    def assertLogFile(self, filename):
5326        "Assert a log file is there and register it for deletion"
5327        self.assertTrue(os.path.exists(filename),
5328                        msg="Log file %r does not exist" % filename)
5329        self.rmfiles.append(filename)
5330
5331    def next_rec(self):
5332        return logging.LogRecord('n', logging.DEBUG, 'p', 1,
5333                                 self.next_message(), None, None, None)
5334
5335class FileHandlerTest(BaseFileTest):
5336    def test_delay(self):
5337        os.unlink(self.fn)
5338        fh = logging.FileHandler(self.fn, encoding='utf-8', delay=True)
5339        self.assertIsNone(fh.stream)
5340        self.assertFalse(os.path.exists(self.fn))
5341        fh.handle(logging.makeLogRecord({}))
5342        self.assertIsNotNone(fh.stream)
5343        self.assertTrue(os.path.exists(self.fn))
5344        fh.close()
5345
5346    def test_emit_after_closing_in_write_mode(self):
5347        # Issue #42378
5348        os.unlink(self.fn)
5349        fh = logging.FileHandler(self.fn, encoding='utf-8', mode='w')
5350        fh.setFormatter(logging.Formatter('%(message)s'))
5351        fh.emit(self.next_rec())    # '1'
5352        fh.close()
5353        fh.emit(self.next_rec())    # '2'
5354        with open(self.fn) as fp:
5355            self.assertEqual(fp.read().strip(), '1')
5356
5357class RotatingFileHandlerTest(BaseFileTest):
5358    @unittest.skipIf(support.is_wasi, "WASI does not have /dev/null.")
5359    def test_should_not_rollover(self):
5360        # If maxbytes is zero rollover never occurs
5361        rh = logging.handlers.RotatingFileHandler(
5362                self.fn, encoding="utf-8", maxBytes=0)
5363        self.assertFalse(rh.shouldRollover(None))
5364        rh.close()
5365        # bpo-45401 - test with special file
5366        # We set maxBytes to 1 so that rollover would normally happen, except
5367        # for the check for regular files
5368        rh = logging.handlers.RotatingFileHandler(
5369                os.devnull, encoding="utf-8", maxBytes=1)
5370        self.assertFalse(rh.shouldRollover(self.next_rec()))
5371        rh.close()
5372
5373    def test_should_rollover(self):
5374        rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8", maxBytes=1)
5375        self.assertTrue(rh.shouldRollover(self.next_rec()))
5376        rh.close()
5377
5378    def test_file_created(self):
5379        # checks that the file is created and assumes it was created
5380        # by us
5381        rh = logging.handlers.RotatingFileHandler(self.fn, encoding="utf-8")
5382        rh.emit(self.next_rec())
5383        self.assertLogFile(self.fn)
5384        rh.close()
5385
5386    def test_rollover_filenames(self):
5387        def namer(name):
5388            return name + ".test"
5389        rh = logging.handlers.RotatingFileHandler(
5390            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
5391        rh.namer = namer
5392        rh.emit(self.next_rec())
5393        self.assertLogFile(self.fn)
5394        rh.emit(self.next_rec())
5395        self.assertLogFile(namer(self.fn + ".1"))
5396        rh.emit(self.next_rec())
5397        self.assertLogFile(namer(self.fn + ".2"))
5398        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
5399        rh.close()
5400
5401    def test_namer_rotator_inheritance(self):
5402        class HandlerWithNamerAndRotator(logging.handlers.RotatingFileHandler):
5403            def namer(self, name):
5404                return name + ".test"
5405
5406            def rotator(self, source, dest):
5407                if os.path.exists(source):
5408                    os.replace(source, dest + ".rotated")
5409
5410        rh = HandlerWithNamerAndRotator(
5411            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
5412        self.assertEqual(rh.namer(self.fn), self.fn + ".test")
5413        rh.emit(self.next_rec())
5414        self.assertLogFile(self.fn)
5415        rh.emit(self.next_rec())
5416        self.assertLogFile(rh.namer(self.fn + ".1") + ".rotated")
5417        self.assertFalse(os.path.exists(rh.namer(self.fn + ".1")))
5418        rh.close()
5419
5420    @support.requires_zlib()
5421    def test_rotator(self):
5422        def namer(name):
5423            return name + ".gz"
5424
5425        def rotator(source, dest):
5426            with open(source, "rb") as sf:
5427                data = sf.read()
5428                compressed = zlib.compress(data, 9)
5429                with open(dest, "wb") as df:
5430                    df.write(compressed)
5431            os.remove(source)
5432
5433        rh = logging.handlers.RotatingFileHandler(
5434            self.fn, encoding="utf-8", backupCount=2, maxBytes=1)
5435        rh.rotator = rotator
5436        rh.namer = namer
5437        m1 = self.next_rec()
5438        rh.emit(m1)
5439        self.assertLogFile(self.fn)
5440        m2 = self.next_rec()
5441        rh.emit(m2)
5442        fn = namer(self.fn + ".1")
5443        self.assertLogFile(fn)
5444        newline = os.linesep
5445        with open(fn, "rb") as f:
5446            compressed = f.read()
5447            data = zlib.decompress(compressed)
5448            self.assertEqual(data.decode("ascii"), m1.msg + newline)
5449        rh.emit(self.next_rec())
5450        fn = namer(self.fn + ".2")
5451        self.assertLogFile(fn)
5452        with open(fn, "rb") as f:
5453            compressed = f.read()
5454            data = zlib.decompress(compressed)
5455            self.assertEqual(data.decode("ascii"), m1.msg + newline)
5456        rh.emit(self.next_rec())
5457        fn = namer(self.fn + ".2")
5458        with open(fn, "rb") as f:
5459            compressed = f.read()
5460            data = zlib.decompress(compressed)
5461            self.assertEqual(data.decode("ascii"), m2.msg + newline)
5462        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
5463        rh.close()
5464
5465class TimedRotatingFileHandlerTest(BaseFileTest):
5466    @unittest.skipIf(support.is_wasi, "WASI does not have /dev/null.")
5467    def test_should_not_rollover(self):
5468        # See bpo-45401. Should only ever rollover regular files
5469        fh = logging.handlers.TimedRotatingFileHandler(
5470                os.devnull, 'S', encoding="utf-8", backupCount=1)
5471        time.sleep(1.1)    # a little over a second ...
5472        r = logging.makeLogRecord({'msg': 'testing - device file'})
5473        self.assertFalse(fh.shouldRollover(r))
5474        fh.close()
5475
5476    # other test methods added below
5477    def test_rollover(self):
5478        fh = logging.handlers.TimedRotatingFileHandler(
5479                self.fn, 'S', encoding="utf-8", backupCount=1)
5480        fmt = logging.Formatter('%(asctime)s %(message)s')
5481        fh.setFormatter(fmt)
5482        r1 = logging.makeLogRecord({'msg': 'testing - initial'})
5483        fh.emit(r1)
5484        self.assertLogFile(self.fn)
5485        time.sleep(1.1)    # a little over a second ...
5486        r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
5487        fh.emit(r2)
5488        fh.close()
5489        # At this point, we should have a recent rotated file which we
5490        # can test for the existence of. However, in practice, on some
5491        # machines which run really slowly, we don't know how far back
5492        # in time to go to look for the log file. So, we go back a fair
5493        # bit, and stop as soon as we see a rotated file. In theory this
5494        # could of course still fail, but the chances are lower.
5495        found = False
5496        now = datetime.datetime.now()
5497        GO_BACK = 5 * 60 # seconds
5498        for secs in range(GO_BACK):
5499            prev = now - datetime.timedelta(seconds=secs)
5500            fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
5501            found = os.path.exists(fn)
5502            if found:
5503                self.rmfiles.append(fn)
5504                break
5505        msg = 'No rotated files found, went back %d seconds' % GO_BACK
5506        if not found:
5507            # print additional diagnostics
5508            dn, fn = os.path.split(self.fn)
5509            files = [f for f in os.listdir(dn) if f.startswith(fn)]
5510            print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
5511            print('The only matching files are: %s' % files, file=sys.stderr)
5512            for f in files:
5513                print('Contents of %s:' % f)
5514                path = os.path.join(dn, f)
5515                with open(path, 'r') as tf:
5516                    print(tf.read())
5517        self.assertTrue(found, msg=msg)
5518
5519    def test_invalid(self):
5520        assertRaises = self.assertRaises
5521        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5522                     self.fn, 'X', encoding="utf-8", delay=True)
5523        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5524                     self.fn, 'W', encoding="utf-8", delay=True)
5525        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
5526                     self.fn, 'W7', encoding="utf-8", delay=True)
5527
5528    def test_compute_rollover_daily_attime(self):
5529        currentTime = 0
5530        atTime = datetime.time(12, 0, 0)
5531        rh = logging.handlers.TimedRotatingFileHandler(
5532            self.fn, encoding="utf-8", when='MIDNIGHT', interval=1, backupCount=0,
5533            utc=True, atTime=atTime)
5534        try:
5535            actual = rh.computeRollover(currentTime)
5536            self.assertEqual(actual, currentTime + 12 * 60 * 60)
5537
5538            actual = rh.computeRollover(currentTime + 13 * 60 * 60)
5539            self.assertEqual(actual, currentTime + 36 * 60 * 60)
5540        finally:
5541            rh.close()
5542
5543    #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.')
5544    def test_compute_rollover_weekly_attime(self):
5545        currentTime = int(time.time())
5546        today = currentTime - currentTime % 86400
5547
5548        atTime = datetime.time(12, 0, 0)
5549
5550        wday = time.gmtime(today).tm_wday
5551        for day in range(7):
5552            rh = logging.handlers.TimedRotatingFileHandler(
5553                self.fn, encoding="utf-8", when='W%d' % day, interval=1, backupCount=0,
5554                utc=True, atTime=atTime)
5555            try:
5556                if wday > day:
5557                    # The rollover day has already passed this week, so we
5558                    # go over into next week
5559                    expected = (7 - wday + day)
5560                else:
5561                    expected = (day - wday)
5562                # At this point expected is in days from now, convert to seconds
5563                expected *= 24 * 60 * 60
5564                # Add in the rollover time
5565                expected += 12 * 60 * 60
5566                # Add in adjustment for today
5567                expected += today
5568                actual = rh.computeRollover(today)
5569                if actual != expected:
5570                    print('failed in timezone: %d' % time.timezone)
5571                    print('local vars: %s' % locals())
5572                self.assertEqual(actual, expected)
5573                if day == wday:
5574                    # goes into following week
5575                    expected += 7 * 24 * 60 * 60
5576                actual = rh.computeRollover(today + 13 * 60 * 60)
5577                if actual != expected:
5578                    print('failed in timezone: %d' % time.timezone)
5579                    print('local vars: %s' % locals())
5580                self.assertEqual(actual, expected)
5581            finally:
5582                rh.close()
5583
5584    def test_compute_files_to_delete(self):
5585        # See bpo-46063 for background
5586        wd = tempfile.mkdtemp(prefix='test_logging_')
5587        self.addCleanup(shutil.rmtree, wd)
5588        times = []
5589        dt = datetime.datetime.now()
5590        for i in range(10):
5591            times.append(dt.strftime('%Y-%m-%d_%H-%M-%S'))
5592            dt += datetime.timedelta(seconds=5)
5593        prefixes = ('a.b', 'a.b.c', 'd.e', 'd.e.f')
5594        files = []
5595        rotators = []
5596        for prefix in prefixes:
5597            p = os.path.join(wd, '%s.log' % prefix)
5598            rotator = logging.handlers.TimedRotatingFileHandler(p, when='s',
5599                                                                interval=5,
5600                                                                backupCount=7,
5601                                                                delay=True)
5602            rotators.append(rotator)
5603            if prefix.startswith('a.b'):
5604                for t in times:
5605                    files.append('%s.log.%s' % (prefix, t))
5606            else:
5607                rotator.namer = lambda name: name.replace('.log', '') + '.log'
5608                for t in times:
5609                    files.append('%s.%s.log' % (prefix, t))
5610        # Create empty files
5611        for fn in files:
5612            p = os.path.join(wd, fn)
5613            with open(p, 'wb') as f:
5614                pass
5615        # Now the checks that only the correct files are offered up for deletion
5616        for i, prefix in enumerate(prefixes):
5617            rotator = rotators[i]
5618            candidates = rotator.getFilesToDelete()
5619            self.assertEqual(len(candidates), 3)
5620            if prefix.startswith('a.b'):
5621                p = '%s.log.' % prefix
5622                for c in candidates:
5623                    d, fn = os.path.split(c)
5624                    self.assertTrue(fn.startswith(p))
5625            else:
5626                for c in candidates:
5627                    d, fn = os.path.split(c)
5628                    self.assertTrue(fn.endswith('.log'))
5629                    self.assertTrue(fn.startswith(prefix + '.') and
5630                                    fn[len(prefix) + 2].isdigit())
5631
5632
5633def secs(**kw):
5634    return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
5635
5636for when, exp in (('S', 1),
5637                  ('M', 60),
5638                  ('H', 60 * 60),
5639                  ('D', 60 * 60 * 24),
5640                  ('MIDNIGHT', 60 * 60 * 24),
5641                  # current time (epoch start) is a Thursday, W0 means Monday
5642                  ('W0', secs(days=4, hours=24)),
5643                 ):
5644    def test_compute_rollover(self, when=when, exp=exp):
5645        rh = logging.handlers.TimedRotatingFileHandler(
5646            self.fn, encoding="utf-8", when=when, interval=1, backupCount=0, utc=True)
5647        currentTime = 0.0
5648        actual = rh.computeRollover(currentTime)
5649        if exp != actual:
5650            # Failures occur on some systems for MIDNIGHT and W0.
5651            # Print detailed calculation for MIDNIGHT so we can try to see
5652            # what's going on
5653            if when == 'MIDNIGHT':
5654                try:
5655                    if rh.utc:
5656                        t = time.gmtime(currentTime)
5657                    else:
5658                        t = time.localtime(currentTime)
5659                    currentHour = t[3]
5660                    currentMinute = t[4]
5661                    currentSecond = t[5]
5662                    # r is the number of seconds left between now and midnight
5663                    r = logging.handlers._MIDNIGHT - ((currentHour * 60 +
5664                                                       currentMinute) * 60 +
5665                            currentSecond)
5666                    result = currentTime + r
5667                    print('t: %s (%s)' % (t, rh.utc), file=sys.stderr)
5668                    print('currentHour: %s' % currentHour, file=sys.stderr)
5669                    print('currentMinute: %s' % currentMinute, file=sys.stderr)
5670                    print('currentSecond: %s' % currentSecond, file=sys.stderr)
5671                    print('r: %s' % r, file=sys.stderr)
5672                    print('result: %s' % result, file=sys.stderr)
5673                except Exception as e:
5674                    print('exception in diagnostic code: %s' % e, file=sys.stderr)
5675        self.assertEqual(exp, actual)
5676        rh.close()
5677    setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
5678
5679
5680@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
5681class NTEventLogHandlerTest(BaseTest):
5682    def test_basic(self):
5683        logtype = 'Application'
5684        elh = win32evtlog.OpenEventLog(None, logtype)
5685        num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
5686
5687        try:
5688            h = logging.handlers.NTEventLogHandler('test_logging')
5689        except pywintypes.error as e:
5690            if e.winerror == 5:  # access denied
5691                raise unittest.SkipTest('Insufficient privileges to run test')
5692            raise
5693
5694        r = logging.makeLogRecord({'msg': 'Test Log Message'})
5695        h.handle(r)
5696        h.close()
5697        # Now see if the event is recorded
5698        self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
5699        flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
5700                win32evtlog.EVENTLOG_SEQUENTIAL_READ
5701        found = False
5702        GO_BACK = 100
5703        events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
5704        for e in events:
5705            if e.SourceName != 'test_logging':
5706                continue
5707            msg = win32evtlogutil.SafeFormatMessage(e, logtype)
5708            if msg != 'Test Log Message\r\n':
5709                continue
5710            found = True
5711            break
5712        msg = 'Record not found in event log, went back %d records' % GO_BACK
5713        self.assertTrue(found, msg=msg)
5714
5715
5716class MiscTestCase(unittest.TestCase):
5717    def test__all__(self):
5718        not_exported = {
5719            'logThreads', 'logMultiprocessing', 'logProcesses', 'currentframe',
5720            'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle',
5721            'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 'root',
5722            'threading'}
5723        support.check__all__(self, logging, not_exported=not_exported)
5724
5725
5726# Set the locale to the platform-dependent default.  I have no idea
5727# why the test does this, but in any case we save the current locale
5728# first and restore it at the end.
5729def setUpModule():
5730    unittest.enterModuleContext(support.run_with_locale('LC_ALL', ''))
5731
5732
5733if __name__ == "__main__":
5734    unittest.main()
5735