1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.support':
4    raise ImportError('support must be imported from the test package')
5
6import contextlib
7import functools
8import getpass
9import os
10import re
11import stat
12import sys
13import sysconfig
14import time
15import types
16import unittest
17import warnings
18
19from .testresult import get_test_runner
20
21
22try:
23    from _testcapi import unicode_legacy_string
24except ImportError:
25    unicode_legacy_string = None
26
27__all__ = [
28    # globals
29    "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
30    # exceptions
31    "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
32    # io
33    "record_original_stdout", "get_original_stdout", "captured_stdout",
34    "captured_stdin", "captured_stderr",
35    # unittest
36    "is_resource_enabled", "requires", "requires_freebsd_version",
37    "requires_linux_version", "requires_mac_ver",
38    "check_syntax_error",
39    "BasicTestRunner", "run_unittest", "run_doctest",
40    "requires_gzip", "requires_bz2", "requires_lzma",
41    "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
42    "requires_IEEE_754", "requires_zlib",
43    "has_fork_support", "requires_fork",
44    "has_subprocess_support", "requires_subprocess",
45    "has_socket_support", "requires_working_socket",
46    "anticipate_failure", "load_package_tests", "detect_api_mismatch",
47    "check__all__", "skip_if_buggy_ucrt_strfptime",
48    "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
49    # sys
50    "is_jython", "is_android", "is_emscripten", "is_wasi",
51    "check_impl_detail", "unix_shell", "setswitchinterval",
52    # network
53    "open_urlresource",
54    # processes
55    "reap_children",
56    # miscellaneous
57    "run_with_locale", "swap_item", "findfile", "infinite_recursion",
58    "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
59    "run_with_tz", "PGO", "missing_compiler_executable",
60    "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
61    "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT",
62    ]
63
64
65# Timeout in seconds for tests using a network server listening on the network
66# local loopback interface like 127.0.0.1.
67#
68# The timeout is long enough to prevent test failure: it takes into account
69# that the client and the server can run in different threads or even different
70# processes.
71#
72# The timeout should be long enough for connect(), recv() and send() methods
73# of socket.socket.
74LOOPBACK_TIMEOUT = 5.0
75if sys.platform == 'win32' and ' 32 bit (ARM)' in sys.version:
76    # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2
77    # seconds on Windows ARM32 buildbot
78    LOOPBACK_TIMEOUT = 10
79elif sys.platform == 'vxworks':
80    LOOPBACK_TIMEOUT = 10
81
82# Timeout in seconds for network requests going to the internet. The timeout is
83# short enough to prevent a test to wait for too long if the internet request
84# is blocked for whatever reason.
85#
86# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed,
87# but skip the test instead: see transient_internet().
88INTERNET_TIMEOUT = 60.0
89
90# Timeout in seconds to mark a test as failed if the test takes "too long".
91#
92# The timeout value depends on the regrtest --timeout command line option.
93#
94# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use
95# LONG_TIMEOUT instead.
96SHORT_TIMEOUT = 30.0
97
98# Timeout in seconds to detect when a test hangs.
99#
100# It is long enough to reduce the risk of test failure on the slowest Python
101# buildbots. It should not be used to mark a test as failed if the test takes
102# "too long". The timeout value depends on the regrtest --timeout command line
103# option.
104LONG_TIMEOUT = 5 * 60.0
105
106# TEST_HOME_DIR refers to the top level directory of the "test" package
107# that contains Python's regression test suite
108TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
109TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
110STDLIB_DIR = os.path.dirname(TEST_HOME_DIR)
111REPO_ROOT = os.path.dirname(STDLIB_DIR)
112
113
114class Error(Exception):
115    """Base class for regression test exceptions."""
116
117class TestFailed(Error):
118    """Test failed."""
119
120class TestFailedWithDetails(TestFailed):
121    """Test failed."""
122    def __init__(self, msg, errors, failures):
123        self.msg = msg
124        self.errors = errors
125        self.failures = failures
126        super().__init__(msg, errors, failures)
127
128    def __str__(self):
129        return self.msg
130
131class TestDidNotRun(Error):
132    """Test did not run any subtests."""
133
134class ResourceDenied(unittest.SkipTest):
135    """Test skipped because it requested a disallowed resource.
136
137    This is raised when a test calls requires() for a resource that
138    has not be enabled.  It is used to distinguish between expected
139    and unexpected skips.
140    """
141
142def anticipate_failure(condition):
143    """Decorator to mark a test that is known to be broken in some cases
144
145       Any use of this decorator should have a comment identifying the
146       associated tracker issue.
147    """
148    if condition:
149        return unittest.expectedFailure
150    return lambda f: f
151
152def load_package_tests(pkg_dir, loader, standard_tests, pattern):
153    """Generic load_tests implementation for simple test packages.
154
155    Most packages can implement load_tests using this function as follows:
156
157       def load_tests(*args):
158           return load_package_tests(os.path.dirname(__file__), *args)
159    """
160    if pattern is None:
161        pattern = "test*"
162    top_dir = STDLIB_DIR
163    package_tests = loader.discover(start_dir=pkg_dir,
164                                    top_level_dir=top_dir,
165                                    pattern=pattern)
166    standard_tests.addTests(package_tests)
167    return standard_tests
168
169
170def get_attribute(obj, name):
171    """Get an attribute, raising SkipTest if AttributeError is raised."""
172    try:
173        attribute = getattr(obj, name)
174    except AttributeError:
175        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
176    else:
177        return attribute
178
179verbose = 1              # Flag set to 0 by regrtest.py
180use_resources = None     # Flag set to [] by regrtest.py
181max_memuse = 0           # Disable bigmem tests (they will still be run with
182                         # small sizes, to make sure they work.)
183real_max_memuse = 0
184junit_xml_list = None    # list of testsuite XML elements
185failfast = False
186
187# _original_stdout is meant to hold stdout at the time regrtest began.
188# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
189# The point is to have some flavor of stdout the user can actually see.
190_original_stdout = None
191def record_original_stdout(stdout):
192    global _original_stdout
193    _original_stdout = stdout
194
195def get_original_stdout():
196    return _original_stdout or sys.stdout
197
198
199def _force_run(path, func, *args):
200    try:
201        return func(*args)
202    except FileNotFoundError as err:
203        # chmod() won't fix a missing file.
204        if verbose >= 2:
205            print('%s: %s' % (err.__class__.__name__, err))
206        raise
207    except OSError as err:
208        if verbose >= 2:
209            print('%s: %s' % (err.__class__.__name__, err))
210            print('re-run %s%r' % (func.__name__, args))
211        os.chmod(path, stat.S_IRWXU)
212        return func(*args)
213
214
215# Check whether a gui is actually available
216def _is_gui_available():
217    if hasattr(_is_gui_available, 'result'):
218        return _is_gui_available.result
219    import platform
220    reason = None
221    if sys.platform.startswith('win') and platform.win32_is_iot():
222        reason = "gui is not available on Windows IoT Core"
223    elif sys.platform.startswith('win'):
224        # if Python is running as a service (such as the buildbot service),
225        # gui interaction may be disallowed
226        import ctypes
227        import ctypes.wintypes
228        UOI_FLAGS = 1
229        WSF_VISIBLE = 0x0001
230        class USEROBJECTFLAGS(ctypes.Structure):
231            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
232                        ("fReserved", ctypes.wintypes.BOOL),
233                        ("dwFlags", ctypes.wintypes.DWORD)]
234        dll = ctypes.windll.user32
235        h = dll.GetProcessWindowStation()
236        if not h:
237            raise ctypes.WinError()
238        uof = USEROBJECTFLAGS()
239        needed = ctypes.wintypes.DWORD()
240        res = dll.GetUserObjectInformationW(h,
241            UOI_FLAGS,
242            ctypes.byref(uof),
243            ctypes.sizeof(uof),
244            ctypes.byref(needed))
245        if not res:
246            raise ctypes.WinError()
247        if not bool(uof.dwFlags & WSF_VISIBLE):
248            reason = "gui not available (WSF_VISIBLE flag not set)"
249    elif sys.platform == 'darwin':
250        # The Aqua Tk implementations on OS X can abort the process if
251        # being called in an environment where a window server connection
252        # cannot be made, for instance when invoked by a buildbot or ssh
253        # process not running under the same user id as the current console
254        # user.  To avoid that, raise an exception if the window manager
255        # connection is not available.
256        from ctypes import cdll, c_int, pointer, Structure
257        from ctypes.util import find_library
258
259        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
260
261        if app_services.CGMainDisplayID() == 0:
262            reason = "gui tests cannot run without OS X window manager"
263        else:
264            class ProcessSerialNumber(Structure):
265                _fields_ = [("highLongOfPSN", c_int),
266                            ("lowLongOfPSN", c_int)]
267            psn = ProcessSerialNumber()
268            psn_p = pointer(psn)
269            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
270                  (app_services.SetFrontProcess(psn_p) < 0) ):
271                reason = "cannot run without OS X gui process"
272
273    # check on every platform whether tkinter can actually do anything
274    if not reason:
275        try:
276            from tkinter import Tk
277            root = Tk()
278            root.withdraw()
279            root.update()
280            root.destroy()
281        except Exception as e:
282            err_string = str(e)
283            if len(err_string) > 50:
284                err_string = err_string[:50] + ' [...]'
285            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
286                                                           err_string)
287
288    _is_gui_available.reason = reason
289    _is_gui_available.result = not reason
290
291    return _is_gui_available.result
292
293def is_resource_enabled(resource):
294    """Test whether a resource is enabled.
295
296    Known resources are set by regrtest.py.  If not running under regrtest.py,
297    all resources are assumed enabled unless use_resources has been set.
298    """
299    return use_resources is None or resource in use_resources
300
301def requires(resource, msg=None):
302    """Raise ResourceDenied if the specified resource is not available."""
303    if not is_resource_enabled(resource):
304        if msg is None:
305            msg = "Use of the %r resource not enabled" % resource
306        raise ResourceDenied(msg)
307    if resource in {"network", "urlfetch"} and not has_socket_support:
308        raise ResourceDenied("No socket support")
309    if resource == 'gui' and not _is_gui_available():
310        raise ResourceDenied(_is_gui_available.reason)
311
312def _requires_unix_version(sysname, min_version):
313    """Decorator raising SkipTest if the OS is `sysname` and the version is less
314    than `min_version`.
315
316    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
317    the FreeBSD version is less than 7.2.
318    """
319    import platform
320    min_version_txt = '.'.join(map(str, min_version))
321    version_txt = platform.release().split('-', 1)[0]
322    if platform.system() == sysname:
323        try:
324            version = tuple(map(int, version_txt.split('.')))
325        except ValueError:
326            skip = False
327        else:
328            skip = version < min_version
329    else:
330        skip = False
331
332    return unittest.skipIf(
333        skip,
334        f"{sysname} version {min_version_txt} or higher required, not "
335        f"{version_txt}"
336    )
337
338
339def requires_freebsd_version(*min_version):
340    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
341    less than `min_version`.
342
343    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
344    version is less than 7.2.
345    """
346    return _requires_unix_version('FreeBSD', min_version)
347
348def requires_linux_version(*min_version):
349    """Decorator raising SkipTest if the OS is Linux and the Linux version is
350    less than `min_version`.
351
352    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
353    version is less than 2.6.32.
354    """
355    return _requires_unix_version('Linux', min_version)
356
357def requires_mac_ver(*min_version):
358    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
359    version if less than min_version.
360
361    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
362    is lesser than 10.5.
363    """
364    def decorator(func):
365        @functools.wraps(func)
366        def wrapper(*args, **kw):
367            if sys.platform == 'darwin':
368                import platform
369                version_txt = platform.mac_ver()[0]
370                try:
371                    version = tuple(map(int, version_txt.split('.')))
372                except ValueError:
373                    pass
374                else:
375                    if version < min_version:
376                        min_version_txt = '.'.join(map(str, min_version))
377                        raise unittest.SkipTest(
378                            "Mac OS X %s or higher required, not %s"
379                            % (min_version_txt, version_txt))
380            return func(*args, **kw)
381        wrapper.min_version = min_version
382        return wrapper
383    return decorator
384
385
386def skip_if_buildbot(reason=None):
387    """Decorator raising SkipTest if running on a buildbot."""
388    if not reason:
389        reason = 'not suitable for buildbots'
390    try:
391        isbuildbot = getpass.getuser().lower() == 'buildbot'
392    except (KeyError, EnvironmentError) as err:
393        warnings.warn(f'getpass.getuser() failed {err}.', RuntimeWarning)
394        isbuildbot = False
395    return unittest.skipIf(isbuildbot, reason)
396
397def check_sanitizer(*, address=False, memory=False, ub=False):
398    """Returns True if Python is compiled with sanitizer support"""
399    if not (address or memory or ub):
400        raise ValueError('At least one of address, memory, or ub must be True')
401
402
403    _cflags = sysconfig.get_config_var('CFLAGS') or ''
404    _config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
405    memory_sanitizer = (
406        '-fsanitize=memory' in _cflags or
407        '--with-memory-sanitizer' in _config_args
408    )
409    address_sanitizer = (
410        '-fsanitize=address' in _cflags or
411        '--with-memory-sanitizer' in _config_args
412    )
413    ub_sanitizer = (
414        '-fsanitize=undefined' in _cflags or
415        '--with-undefined-behavior-sanitizer' in _config_args
416    )
417    return (
418        (memory and memory_sanitizer) or
419        (address and address_sanitizer) or
420        (ub and ub_sanitizer)
421    )
422
423
424def skip_if_sanitizer(reason=None, *, address=False, memory=False, ub=False):
425    """Decorator raising SkipTest if running with a sanitizer active."""
426    if not reason:
427        reason = 'not working with sanitizers active'
428    skip = check_sanitizer(address=address, memory=memory, ub=ub)
429    return unittest.skipIf(skip, reason)
430
431
432def system_must_validate_cert(f):
433    """Skip the test on TLS certificate validation failures."""
434    @functools.wraps(f)
435    def dec(*args, **kwargs):
436        try:
437            f(*args, **kwargs)
438        except OSError as e:
439            if "CERTIFICATE_VERIFY_FAILED" in str(e):
440                raise unittest.SkipTest("system does not contain "
441                                        "necessary certificates")
442            raise
443    return dec
444
445# A constant likely larger than the underlying OS pipe buffer size, to
446# make writes blocking.
447# Windows limit seems to be around 512 B, and many Unix kernels have a
448# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
449# (see issue #17835 for a discussion of this number).
450PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
451
452# A constant likely larger than the underlying OS socket buffer size, to make
453# writes blocking.
454# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
455# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF).  See issue #18643
456# for a discussion of this number.
457SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
458
459# decorator for skipping tests on non-IEEE 754 platforms
460requires_IEEE_754 = unittest.skipUnless(
461    float.__getformat__("double").startswith("IEEE"),
462    "test requires IEEE 754 doubles")
463
464def requires_zlib(reason='requires zlib'):
465    try:
466        import zlib
467    except ImportError:
468        zlib = None
469    return unittest.skipUnless(zlib, reason)
470
471def requires_gzip(reason='requires gzip'):
472    try:
473        import gzip
474    except ImportError:
475        gzip = None
476    return unittest.skipUnless(gzip, reason)
477
478def requires_bz2(reason='requires bz2'):
479    try:
480        import bz2
481    except ImportError:
482        bz2 = None
483    return unittest.skipUnless(bz2, reason)
484
485def requires_lzma(reason='requires lzma'):
486    try:
487        import lzma
488    except ImportError:
489        lzma = None
490    return unittest.skipUnless(lzma, reason)
491
492def has_no_debug_ranges():
493    try:
494        import _testinternalcapi
495    except ImportError:
496        raise unittest.SkipTest("_testinternalcapi required")
497    config = _testinternalcapi.get_config()
498    return not bool(config['code_debug_ranges'])
499
500def requires_debug_ranges(reason='requires co_positions / debug_ranges'):
501    return unittest.skipIf(has_no_debug_ranges(), reason)
502
503requires_legacy_unicode_capi = unittest.skipUnless(unicode_legacy_string,
504                        'requires legacy Unicode C API')
505
506is_jython = sys.platform.startswith('java')
507
508is_android = hasattr(sys, 'getandroidapilevel')
509
510if sys.platform not in ('win32', 'vxworks'):
511    unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
512else:
513    unix_shell = None
514
515# wasm32-emscripten and -wasi are POSIX-like but do not
516# have subprocess or fork support.
517is_emscripten = sys.platform == "emscripten"
518is_wasi = sys.platform == "wasi"
519
520has_fork_support = hasattr(os, "fork") and not is_emscripten and not is_wasi
521
522def requires_fork():
523    return unittest.skipUnless(has_fork_support, "requires working os.fork()")
524
525has_subprocess_support = not is_emscripten and not is_wasi
526
527def requires_subprocess():
528    """Used for subprocess, os.spawn calls, fd inheritance"""
529    return unittest.skipUnless(has_subprocess_support, "requires subprocess support")
530
531# Emscripten's socket emulation and WASI sockets have limitations.
532has_socket_support = not is_emscripten and not is_wasi
533
534def requires_working_socket(*, module=False):
535    """Skip tests or modules that require working sockets
536
537    Can be used as a function/class decorator or to skip an entire module.
538    """
539    msg = "requires socket support"
540    if module:
541        if not has_socket_support:
542            raise unittest.SkipTest(msg)
543    else:
544        return unittest.skipUnless(has_socket_support, msg)
545
546# Does strftime() support glibc extension like '%4Y'?
547has_strftime_extensions = False
548if sys.platform != "win32":
549    # bpo-47037: Windows debug builds crash with "Debug Assertion Failed"
550    try:
551        has_strftime_extensions = time.strftime("%4Y") != "%4Y"
552    except ValueError:
553        pass
554
555# Define the URL of a dedicated HTTP server for the network tests.
556# The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
557TEST_HTTP_URL = "http://www.pythontest.net"
558
559# Set by libregrtest/main.py so we can skip tests that are not
560# useful for PGO
561PGO = False
562
563# Set by libregrtest/main.py if we are running the extended (time consuming)
564# PGO task.  If this is True, PGO is also True.
565PGO_EXTENDED = False
566
567# TEST_DATA_DIR is used as a target download location for remote resources
568TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
569
570
571def darwin_malloc_err_warning(test_name):
572    """Assure user that loud errors generated by macOS libc's malloc are
573    expected."""
574    if sys.platform != 'darwin':
575        return
576
577    import shutil
578    msg = ' NOTICE '
579    detail = (f'{test_name} may generate "malloc can\'t allocate region"\n'
580              'warnings on macOS systems. This behavior is known. Do not\n'
581              'report a bug unless tests are also failing. See bpo-40928.')
582
583    padding, _ = shutil.get_terminal_size()
584    print(msg.center(padding, '-'))
585    print(detail)
586    print('-' * padding)
587
588
589def findfile(filename, subdir=None):
590    """Try to find a file on sys.path or in the test directory.  If it is not
591    found the argument passed to the function is returned (this does not
592    necessarily signal failure; could still be the legitimate path).
593
594    Setting *subdir* indicates a relative path to use to find the file
595    rather than looking directly in the path directories.
596    """
597    if os.path.isabs(filename):
598        return filename
599    if subdir is not None:
600        filename = os.path.join(subdir, filename)
601    path = [TEST_HOME_DIR] + sys.path
602    for dn in path:
603        fn = os.path.join(dn, filename)
604        if os.path.exists(fn): return fn
605    return filename
606
607
608def sortdict(dict):
609    "Like repr(dict), but in sorted order."
610    items = sorted(dict.items())
611    reprpairs = ["%r: %r" % pair for pair in items]
612    withcommas = ", ".join(reprpairs)
613    return "{%s}" % withcommas
614
615def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
616    with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
617        compile(statement, '<test string>', 'exec')
618    err = cm.exception
619    testcase.assertIsNotNone(err.lineno)
620    if lineno is not None:
621        testcase.assertEqual(err.lineno, lineno)
622    testcase.assertIsNotNone(err.offset)
623    if offset is not None:
624        testcase.assertEqual(err.offset, offset)
625
626
627def open_urlresource(url, *args, **kw):
628    import urllib.request, urllib.parse
629    from .os_helper import unlink
630    try:
631        import gzip
632    except ImportError:
633        gzip = None
634
635    check = kw.pop('check', None)
636
637    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
638
639    fn = os.path.join(TEST_DATA_DIR, filename)
640
641    def check_valid_file(fn):
642        f = open(fn, *args, **kw)
643        if check is None:
644            return f
645        elif check(f):
646            f.seek(0)
647            return f
648        f.close()
649
650    if os.path.exists(fn):
651        f = check_valid_file(fn)
652        if f is not None:
653            return f
654        unlink(fn)
655
656    # Verify the requirement before downloading the file
657    requires('urlfetch')
658
659    if verbose:
660        print('\tfetching %s ...' % url, file=get_original_stdout())
661    opener = urllib.request.build_opener()
662    if gzip:
663        opener.addheaders.append(('Accept-Encoding', 'gzip'))
664    f = opener.open(url, timeout=INTERNET_TIMEOUT)
665    if gzip and f.headers.get('Content-Encoding') == 'gzip':
666        f = gzip.GzipFile(fileobj=f)
667    try:
668        with open(fn, "wb") as out:
669            s = f.read()
670            while s:
671                out.write(s)
672                s = f.read()
673    finally:
674        f.close()
675
676    f = check_valid_file(fn)
677    if f is not None:
678        return f
679    raise TestFailed('invalid resource %r' % fn)
680
681
682@contextlib.contextmanager
683def captured_output(stream_name):
684    """Return a context manager used by captured_stdout/stdin/stderr
685    that temporarily replaces the sys stream *stream_name* with a StringIO."""
686    import io
687    orig_stdout = getattr(sys, stream_name)
688    setattr(sys, stream_name, io.StringIO())
689    try:
690        yield getattr(sys, stream_name)
691    finally:
692        setattr(sys, stream_name, orig_stdout)
693
694def captured_stdout():
695    """Capture the output of sys.stdout:
696
697       with captured_stdout() as stdout:
698           print("hello")
699       self.assertEqual(stdout.getvalue(), "hello\\n")
700    """
701    return captured_output("stdout")
702
703def captured_stderr():
704    """Capture the output of sys.stderr:
705
706       with captured_stderr() as stderr:
707           print("hello", file=sys.stderr)
708       self.assertEqual(stderr.getvalue(), "hello\\n")
709    """
710    return captured_output("stderr")
711
712def captured_stdin():
713    """Capture the input to sys.stdin:
714
715       with captured_stdin() as stdin:
716           stdin.write('hello\\n')
717           stdin.seek(0)
718           # call test code that consumes from sys.stdin
719           captured = input()
720       self.assertEqual(captured, "hello")
721    """
722    return captured_output("stdin")
723
724
725def gc_collect():
726    """Force as many objects as possible to be collected.
727
728    In non-CPython implementations of Python, this is needed because timely
729    deallocation is not guaranteed by the garbage collector.  (Even in CPython
730    this can be the case in case of reference cycles.)  This means that __del__
731    methods may be called later than expected and weakrefs may remain alive for
732    longer than expected.  This function tries its best to force all garbage
733    objects to disappear.
734    """
735    import gc
736    gc.collect()
737    if is_jython:
738        time.sleep(0.1)
739    gc.collect()
740    gc.collect()
741
742@contextlib.contextmanager
743def disable_gc():
744    import gc
745    have_gc = gc.isenabled()
746    gc.disable()
747    try:
748        yield
749    finally:
750        if have_gc:
751            gc.enable()
752
753
754def python_is_optimized():
755    """Find if Python was built with optimizations."""
756    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
757    final_opt = ""
758    for opt in cflags.split():
759        if opt.startswith('-O'):
760            final_opt = opt
761    return final_opt not in ('', '-O0', '-Og')
762
763
764_header = 'nP'
765_align = '0n'
766if hasattr(sys, "getobjects"):
767    _header = '2P' + _header
768    _align = '0P'
769_vheader = _header + 'n'
770
771def calcobjsize(fmt):
772    import struct
773    return struct.calcsize(_header + fmt + _align)
774
775def calcvobjsize(fmt):
776    import struct
777    return struct.calcsize(_vheader + fmt + _align)
778
779
780_TPFLAGS_HAVE_GC = 1<<14
781_TPFLAGS_HEAPTYPE = 1<<9
782
783def check_sizeof(test, o, size):
784    try:
785        import _testinternalcapi
786    except ImportError:
787        raise unittest.SkipTest("_testinternalcapi required")
788    result = sys.getsizeof(o)
789    # add GC header size
790    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
791        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
792        size += _testinternalcapi.SIZEOF_PYGC_HEAD
793    msg = 'wrong size for %s: got %d, expected %d' \
794            % (type(o), result, size)
795    test.assertEqual(result, size, msg)
796
797#=======================================================================
798# Decorator for running a function in a different locale, correctly resetting
799# it afterwards.
800
801@contextlib.contextmanager
802def run_with_locale(catstr, *locales):
803    try:
804        import locale
805        category = getattr(locale, catstr)
806        orig_locale = locale.setlocale(category)
807    except AttributeError:
808        # if the test author gives us an invalid category string
809        raise
810    except:
811        # cannot retrieve original locale, so do nothing
812        locale = orig_locale = None
813    else:
814        for loc in locales:
815            try:
816                locale.setlocale(category, loc)
817                break
818            except:
819                pass
820
821    try:
822        yield
823    finally:
824        if locale and orig_locale:
825            locale.setlocale(category, orig_locale)
826
827#=======================================================================
828# Decorator for running a function in a specific timezone, correctly
829# resetting it afterwards.
830
831def run_with_tz(tz):
832    def decorator(func):
833        def inner(*args, **kwds):
834            try:
835                tzset = time.tzset
836            except AttributeError:
837                raise unittest.SkipTest("tzset required")
838            if 'TZ' in os.environ:
839                orig_tz = os.environ['TZ']
840            else:
841                orig_tz = None
842            os.environ['TZ'] = tz
843            tzset()
844
845            # now run the function, resetting the tz on exceptions
846            try:
847                return func(*args, **kwds)
848            finally:
849                if orig_tz is None:
850                    del os.environ['TZ']
851                else:
852                    os.environ['TZ'] = orig_tz
853                time.tzset()
854
855        inner.__name__ = func.__name__
856        inner.__doc__ = func.__doc__
857        return inner
858    return decorator
859
860#=======================================================================
861# Big-memory-test support. Separate from 'resources' because memory use
862# should be configurable.
863
864# Some handy shorthands. Note that these are used for byte-limits as well
865# as size-limits, in the various bigmem tests
866_1M = 1024*1024
867_1G = 1024 * _1M
868_2G = 2 * _1G
869_4G = 4 * _1G
870
871MAX_Py_ssize_t = sys.maxsize
872
873def set_memlimit(limit):
874    global max_memuse
875    global real_max_memuse
876    sizes = {
877        'k': 1024,
878        'm': _1M,
879        'g': _1G,
880        't': 1024*_1G,
881    }
882    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
883                 re.IGNORECASE | re.VERBOSE)
884    if m is None:
885        raise ValueError('Invalid memory limit %r' % (limit,))
886    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
887    real_max_memuse = memlimit
888    if memlimit > MAX_Py_ssize_t:
889        memlimit = MAX_Py_ssize_t
890    if memlimit < _2G - 1:
891        raise ValueError('Memory limit %r too low to be useful' % (limit,))
892    max_memuse = memlimit
893
894class _MemoryWatchdog:
895    """An object which periodically watches the process' memory consumption
896    and prints it out.
897    """
898
899    def __init__(self):
900        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
901        self.started = False
902
903    def start(self):
904        import warnings
905        try:
906            f = open(self.procfile, 'r')
907        except OSError as e:
908            warnings.warn('/proc not available for stats: {}'.format(e),
909                          RuntimeWarning)
910            sys.stderr.flush()
911            return
912
913        import subprocess
914        with f:
915            watchdog_script = findfile("memory_watchdog.py")
916            self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
917                                                 stdin=f,
918                                                 stderr=subprocess.DEVNULL)
919        self.started = True
920
921    def stop(self):
922        if self.started:
923            self.mem_watchdog.terminate()
924            self.mem_watchdog.wait()
925
926
927def bigmemtest(size, memuse, dry_run=True):
928    """Decorator for bigmem tests.
929
930    'size' is a requested size for the test (in arbitrary, test-interpreted
931    units.) 'memuse' is the number of bytes per unit for the test, or a good
932    estimate of it. For example, a test that needs two byte buffers, of 4 GiB
933    each, could be decorated with @bigmemtest(size=_4G, memuse=2).
934
935    The 'size' argument is normally passed to the decorated test method as an
936    extra argument. If 'dry_run' is true, the value passed to the test method
937    may be less than the requested value. If 'dry_run' is false, it means the
938    test doesn't support dummy runs when -M is not specified.
939    """
940    def decorator(f):
941        def wrapper(self):
942            size = wrapper.size
943            memuse = wrapper.memuse
944            if not real_max_memuse:
945                maxsize = 5147
946            else:
947                maxsize = size
948
949            if ((real_max_memuse or not dry_run)
950                and real_max_memuse < maxsize * memuse):
951                raise unittest.SkipTest(
952                    "not enough memory: %.1fG minimum needed"
953                    % (size * memuse / (1024 ** 3)))
954
955            if real_max_memuse and verbose:
956                print()
957                print(" ... expected peak memory use: {peak:.1f}G"
958                      .format(peak=size * memuse / (1024 ** 3)))
959                watchdog = _MemoryWatchdog()
960                watchdog.start()
961            else:
962                watchdog = None
963
964            try:
965                return f(self, maxsize)
966            finally:
967                if watchdog:
968                    watchdog.stop()
969
970        wrapper.size = size
971        wrapper.memuse = memuse
972        return wrapper
973    return decorator
974
975def bigaddrspacetest(f):
976    """Decorator for tests that fill the address space."""
977    def wrapper(self):
978        if max_memuse < MAX_Py_ssize_t:
979            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
980                raise unittest.SkipTest(
981                    "not enough memory: try a 32-bit build instead")
982            else:
983                raise unittest.SkipTest(
984                    "not enough memory: %.1fG minimum needed"
985                    % (MAX_Py_ssize_t / (1024 ** 3)))
986        else:
987            return f(self)
988    return wrapper
989
990#=======================================================================
991# unittest integration.
992
993class BasicTestRunner:
994    def run(self, test):
995        result = unittest.TestResult()
996        test(result)
997        return result
998
999def _id(obj):
1000    return obj
1001
1002def requires_resource(resource):
1003    if resource == 'gui' and not _is_gui_available():
1004        return unittest.skip(_is_gui_available.reason)
1005    if is_resource_enabled(resource):
1006        return _id
1007    else:
1008        return unittest.skip("resource {0!r} is not enabled".format(resource))
1009
1010def cpython_only(test):
1011    """
1012    Decorator for tests only applicable on CPython.
1013    """
1014    return impl_detail(cpython=True)(test)
1015
1016def impl_detail(msg=None, **guards):
1017    if check_impl_detail(**guards):
1018        return _id
1019    if msg is None:
1020        guardnames, default = _parse_guards(guards)
1021        if default:
1022            msg = "implementation detail not available on {0}"
1023        else:
1024            msg = "implementation detail specific to {0}"
1025        guardnames = sorted(guardnames.keys())
1026        msg = msg.format(' or '.join(guardnames))
1027    return unittest.skip(msg)
1028
1029def _parse_guards(guards):
1030    # Returns a tuple ({platform_name: run_me}, default_value)
1031    if not guards:
1032        return ({'cpython': True}, False)
1033    is_true = list(guards.values())[0]
1034    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
1035    return (guards, not is_true)
1036
1037# Use the following check to guard CPython's implementation-specific tests --
1038# or to run them only on the implementation(s) guarded by the arguments.
1039def check_impl_detail(**guards):
1040    """This function returns True or False depending on the host platform.
1041       Examples:
1042          if check_impl_detail():               # only on CPython (default)
1043          if check_impl_detail(jython=True):    # only on Jython
1044          if check_impl_detail(cpython=False):  # everywhere except on CPython
1045    """
1046    guards, default = _parse_guards(guards)
1047    return guards.get(sys.implementation.name, default)
1048
1049
1050def no_tracing(func):
1051    """Decorator to temporarily turn off tracing for the duration of a test."""
1052    if not hasattr(sys, 'gettrace'):
1053        return func
1054    else:
1055        @functools.wraps(func)
1056        def wrapper(*args, **kwargs):
1057            original_trace = sys.gettrace()
1058            try:
1059                sys.settrace(None)
1060                return func(*args, **kwargs)
1061            finally:
1062                sys.settrace(original_trace)
1063        return wrapper
1064
1065
1066def refcount_test(test):
1067    """Decorator for tests which involve reference counting.
1068
1069    To start, the decorator does not run the test if is not run by CPython.
1070    After that, any trace function is unset during the test to prevent
1071    unexpected refcounts caused by the trace function.
1072
1073    """
1074    return no_tracing(cpython_only(test))
1075
1076
1077def _filter_suite(suite, pred):
1078    """Recursively filter test cases in a suite based on a predicate."""
1079    newtests = []
1080    for test in suite._tests:
1081        if isinstance(test, unittest.TestSuite):
1082            _filter_suite(test, pred)
1083            newtests.append(test)
1084        else:
1085            if pred(test):
1086                newtests.append(test)
1087    suite._tests = newtests
1088
1089def _run_suite(suite):
1090    """Run tests from a unittest.TestSuite-derived class."""
1091    runner = get_test_runner(sys.stdout,
1092                             verbosity=verbose,
1093                             capture_output=(junit_xml_list is not None))
1094
1095    result = runner.run(suite)
1096
1097    if junit_xml_list is not None:
1098        junit_xml_list.append(result.get_xml_element())
1099
1100    if not result.testsRun and not result.skipped and not result.errors:
1101        raise TestDidNotRun
1102    if not result.wasSuccessful():
1103        if len(result.errors) == 1 and not result.failures:
1104            err = result.errors[0][1]
1105        elif len(result.failures) == 1 and not result.errors:
1106            err = result.failures[0][1]
1107        else:
1108            err = "multiple errors occurred"
1109            if not verbose: err += "; run in verbose mode for details"
1110        errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
1111        failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
1112        raise TestFailedWithDetails(err, errors, failures)
1113
1114
1115# By default, don't filter tests
1116_match_test_func = None
1117
1118_accept_test_patterns = None
1119_ignore_test_patterns = None
1120
1121
1122def match_test(test):
1123    # Function used by support.run_unittest() and regrtest --list-cases
1124    if _match_test_func is None:
1125        return True
1126    else:
1127        return _match_test_func(test.id())
1128
1129
1130def _is_full_match_test(pattern):
1131    # If a pattern contains at least one dot, it's considered
1132    # as a full test identifier.
1133    # Example: 'test.test_os.FileTests.test_access'.
1134    #
1135    # ignore patterns which contain fnmatch patterns: '*', '?', '[...]'
1136    # or '[!...]'. For example, ignore 'test_access*'.
1137    return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
1138
1139
1140def set_match_tests(accept_patterns=None, ignore_patterns=None):
1141    global _match_test_func, _accept_test_patterns, _ignore_test_patterns
1142
1143
1144    if accept_patterns is None:
1145        accept_patterns = ()
1146    if ignore_patterns is None:
1147        ignore_patterns = ()
1148
1149    accept_func = ignore_func = None
1150
1151    if accept_patterns != _accept_test_patterns:
1152        accept_patterns, accept_func = _compile_match_function(accept_patterns)
1153    if ignore_patterns != _ignore_test_patterns:
1154        ignore_patterns, ignore_func = _compile_match_function(ignore_patterns)
1155
1156    # Create a copy since patterns can be mutable and so modified later
1157    _accept_test_patterns = tuple(accept_patterns)
1158    _ignore_test_patterns = tuple(ignore_patterns)
1159
1160    if accept_func is not None or ignore_func is not None:
1161        def match_function(test_id):
1162            accept = True
1163            ignore = False
1164            if accept_func:
1165                accept = accept_func(test_id)
1166            if ignore_func:
1167                ignore = ignore_func(test_id)
1168            return accept and not ignore
1169
1170        _match_test_func = match_function
1171
1172
1173def _compile_match_function(patterns):
1174    if not patterns:
1175        func = None
1176        # set_match_tests(None) behaves as set_match_tests(())
1177        patterns = ()
1178    elif all(map(_is_full_match_test, patterns)):
1179        # Simple case: all patterns are full test identifier.
1180        # The test.bisect_cmd utility only uses such full test identifiers.
1181        func = set(patterns).__contains__
1182    else:
1183        import fnmatch
1184        regex = '|'.join(map(fnmatch.translate, patterns))
1185        # The search *is* case sensitive on purpose:
1186        # don't use flags=re.IGNORECASE
1187        regex_match = re.compile(regex).match
1188
1189        def match_test_regex(test_id):
1190            if regex_match(test_id):
1191                # The regex matches the whole identifier, for example
1192                # 'test.test_os.FileTests.test_access'.
1193                return True
1194            else:
1195                # Try to match parts of the test identifier.
1196                # For example, split 'test.test_os.FileTests.test_access'
1197                # into: 'test', 'test_os', 'FileTests' and 'test_access'.
1198                return any(map(regex_match, test_id.split(".")))
1199
1200        func = match_test_regex
1201
1202    return patterns, func
1203
1204
1205def run_unittest(*classes):
1206    """Run tests from unittest.TestCase-derived classes."""
1207    valid_types = (unittest.TestSuite, unittest.TestCase)
1208    loader = unittest.TestLoader()
1209    suite = unittest.TestSuite()
1210    for cls in classes:
1211        if isinstance(cls, str):
1212            if cls in sys.modules:
1213                suite.addTest(loader.loadTestsFromModule(sys.modules[cls]))
1214            else:
1215                raise ValueError("str arguments must be keys in sys.modules")
1216        elif isinstance(cls, valid_types):
1217            suite.addTest(cls)
1218        else:
1219            suite.addTest(loader.loadTestsFromTestCase(cls))
1220    _filter_suite(suite, match_test)
1221    _run_suite(suite)
1222
1223#=======================================================================
1224# Check for the presence of docstrings.
1225
1226# Rather than trying to enumerate all the cases where docstrings may be
1227# disabled, we just check for that directly
1228
1229def _check_docstrings():
1230    """Just used to check if docstrings are enabled"""
1231
1232MISSING_C_DOCSTRINGS = (check_impl_detail() and
1233                        sys.platform != 'win32' and
1234                        not sysconfig.get_config_var('WITH_DOC_STRINGS'))
1235
1236HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
1237                   not MISSING_C_DOCSTRINGS)
1238
1239requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1240                                          "test requires docstrings")
1241
1242
1243#=======================================================================
1244# doctest driver.
1245
1246def run_doctest(module, verbosity=None, optionflags=0):
1247    """Run doctest on the given module.  Return (#failures, #tests).
1248
1249    If optional argument verbosity is not specified (or is None), pass
1250    support's belief about verbosity on to doctest.  Else doctest's
1251    usual behavior is used (it searches sys.argv for -v).
1252    """
1253
1254    import doctest
1255
1256    if verbosity is None:
1257        verbosity = verbose
1258    else:
1259        verbosity = None
1260
1261    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
1262    if f:
1263        raise TestFailed("%d of %d doctests failed" % (f, t))
1264    if verbose:
1265        print('doctest (%s) ... %d tests with zero failures' %
1266              (module.__name__, t))
1267    return f, t
1268
1269
1270#=======================================================================
1271# Support for saving and restoring the imported modules.
1272
1273def flush_std_streams():
1274    if sys.stdout is not None:
1275        sys.stdout.flush()
1276    if sys.stderr is not None:
1277        sys.stderr.flush()
1278
1279
1280def print_warning(msg):
1281    # bpo-45410: Explicitly flush stdout to keep logs in order
1282    flush_std_streams()
1283    stream = print_warning.orig_stderr
1284    for line in msg.splitlines():
1285        print(f"Warning -- {line}", file=stream)
1286    stream.flush()
1287
1288# bpo-39983: Store the original sys.stderr at Python startup to be able to
1289# log warnings even if sys.stderr is captured temporarily by a test.
1290print_warning.orig_stderr = sys.stderr
1291
1292
1293# Flag used by saved_test_environment of test.libregrtest.save_env,
1294# to check if a test modified the environment. The flag should be set to False
1295# before running a new test.
1296#
1297# For example, threading_helper.threading_cleanup() sets the flag is the function fails
1298# to cleanup threads.
1299environment_altered = False
1300
1301def reap_children():
1302    """Use this function at the end of test_main() whenever sub-processes
1303    are started.  This will help ensure that no extra children (zombies)
1304    stick around to hog resources and create problems when looking
1305    for refleaks.
1306    """
1307    global environment_altered
1308
1309    # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
1310    if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
1311        return
1312    elif not has_subprocess_support:
1313        return
1314
1315    # Reap all our dead child processes so we don't leave zombies around.
1316    # These hog resources and might be causing some of the buildbots to die.
1317    while True:
1318        try:
1319            # Read the exit status of any child process which already completed
1320            pid, status = os.waitpid(-1, os.WNOHANG)
1321        except OSError:
1322            break
1323
1324        if pid == 0:
1325            break
1326
1327        print_warning(f"reap_children() reaped child process {pid}")
1328        environment_altered = True
1329
1330
1331@contextlib.contextmanager
1332def swap_attr(obj, attr, new_val):
1333    """Temporary swap out an attribute with a new object.
1334
1335    Usage:
1336        with swap_attr(obj, "attr", 5):
1337            ...
1338
1339        This will set obj.attr to 5 for the duration of the with: block,
1340        restoring the old value at the end of the block. If `attr` doesn't
1341        exist on `obj`, it will be created and then deleted at the end of the
1342        block.
1343
1344        The old value (or None if it doesn't exist) will be assigned to the
1345        target of the "as" clause, if there is one.
1346    """
1347    if hasattr(obj, attr):
1348        real_val = getattr(obj, attr)
1349        setattr(obj, attr, new_val)
1350        try:
1351            yield real_val
1352        finally:
1353            setattr(obj, attr, real_val)
1354    else:
1355        setattr(obj, attr, new_val)
1356        try:
1357            yield
1358        finally:
1359            if hasattr(obj, attr):
1360                delattr(obj, attr)
1361
1362@contextlib.contextmanager
1363def swap_item(obj, item, new_val):
1364    """Temporary swap out an item with a new object.
1365
1366    Usage:
1367        with swap_item(obj, "item", 5):
1368            ...
1369
1370        This will set obj["item"] to 5 for the duration of the with: block,
1371        restoring the old value at the end of the block. If `item` doesn't
1372        exist on `obj`, it will be created and then deleted at the end of the
1373        block.
1374
1375        The old value (or None if it doesn't exist) will be assigned to the
1376        target of the "as" clause, if there is one.
1377    """
1378    if item in obj:
1379        real_val = obj[item]
1380        obj[item] = new_val
1381        try:
1382            yield real_val
1383        finally:
1384            obj[item] = real_val
1385    else:
1386        obj[item] = new_val
1387        try:
1388            yield
1389        finally:
1390            if item in obj:
1391                del obj[item]
1392
1393def args_from_interpreter_flags():
1394    """Return a list of command-line arguments reproducing the current
1395    settings in sys.flags and sys.warnoptions."""
1396    import subprocess
1397    return subprocess._args_from_interpreter_flags()
1398
1399def optim_args_from_interpreter_flags():
1400    """Return a list of command-line arguments reproducing the current
1401    optimization settings in sys.flags."""
1402    import subprocess
1403    return subprocess._optim_args_from_interpreter_flags()
1404
1405
1406class Matcher(object):
1407
1408    _partial_matches = ('msg', 'message')
1409
1410    def matches(self, d, **kwargs):
1411        """
1412        Try to match a single dict with the supplied arguments.
1413
1414        Keys whose values are strings and which are in self._partial_matches
1415        will be checked for partial (i.e. substring) matches. You can extend
1416        this scheme to (for example) do regular expression matching, etc.
1417        """
1418        result = True
1419        for k in kwargs:
1420            v = kwargs[k]
1421            dv = d.get(k)
1422            if not self.match_value(k, dv, v):
1423                result = False
1424                break
1425        return result
1426
1427    def match_value(self, k, dv, v):
1428        """
1429        Try to match a single stored value (dv) with a supplied value (v).
1430        """
1431        if type(v) != type(dv):
1432            result = False
1433        elif type(dv) is not str or k not in self._partial_matches:
1434            result = (v == dv)
1435        else:
1436            result = dv.find(v) >= 0
1437        return result
1438
1439
1440_buggy_ucrt = None
1441def skip_if_buggy_ucrt_strfptime(test):
1442    """
1443    Skip decorator for tests that use buggy strptime/strftime
1444
1445    If the UCRT bugs are present time.localtime().tm_zone will be
1446    an empty string, otherwise we assume the UCRT bugs are fixed
1447
1448    See bpo-37552 [Windows] strptime/strftime return invalid
1449    results with UCRT version 17763.615
1450    """
1451    import locale
1452    global _buggy_ucrt
1453    if _buggy_ucrt is None:
1454        if(sys.platform == 'win32' and
1455                locale.getencoding() == 'cp65001' and
1456                time.localtime().tm_zone == ''):
1457            _buggy_ucrt = True
1458        else:
1459            _buggy_ucrt = False
1460    return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test
1461
1462class PythonSymlink:
1463    """Creates a symlink for the current Python executable"""
1464    def __init__(self, link=None):
1465        from .os_helper import TESTFN
1466
1467        self.link = link or os.path.abspath(TESTFN)
1468        self._linked = []
1469        self.real = os.path.realpath(sys.executable)
1470        self._also_link = []
1471
1472        self._env = None
1473
1474        self._platform_specific()
1475
1476    if sys.platform == "win32":
1477        def _platform_specific(self):
1478            import glob
1479            import _winapi
1480
1481            if os.path.lexists(self.real) and not os.path.exists(self.real):
1482                # App symlink appears to not exist, but we want the
1483                # real executable here anyway
1484                self.real = _winapi.GetModuleFileName(0)
1485
1486            dll = _winapi.GetModuleFileName(sys.dllhandle)
1487            src_dir = os.path.dirname(dll)
1488            dest_dir = os.path.dirname(self.link)
1489            self._also_link.append((
1490                dll,
1491                os.path.join(dest_dir, os.path.basename(dll))
1492            ))
1493            for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")):
1494                self._also_link.append((
1495                    runtime,
1496                    os.path.join(dest_dir, os.path.basename(runtime))
1497                ))
1498
1499            self._env = {k.upper(): os.getenv(k) for k in os.environ}
1500            self._env["PYTHONHOME"] = os.path.dirname(self.real)
1501            if sysconfig.is_python_build():
1502                self._env["PYTHONPATH"] = STDLIB_DIR
1503    else:
1504        def _platform_specific(self):
1505            pass
1506
1507    def __enter__(self):
1508        os.symlink(self.real, self.link)
1509        self._linked.append(self.link)
1510        for real, link in self._also_link:
1511            os.symlink(real, link)
1512            self._linked.append(link)
1513        return self
1514
1515    def __exit__(self, exc_type, exc_value, exc_tb):
1516        for link in self._linked:
1517            try:
1518                os.remove(link)
1519            except IOError as ex:
1520                if verbose:
1521                    print("failed to clean up {}: {}".format(link, ex))
1522
1523    def _call(self, python, args, env, returncode):
1524        import subprocess
1525        cmd = [python, *args]
1526        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
1527                             stderr=subprocess.PIPE, env=env)
1528        r = p.communicate()
1529        if p.returncode != returncode:
1530            if verbose:
1531                print(repr(r[0]))
1532                print(repr(r[1]), file=sys.stderr)
1533            raise RuntimeError(
1534                'unexpected return code: {0} (0x{0:08X})'.format(p.returncode))
1535        return r
1536
1537    def call_real(self, *args, returncode=0):
1538        return self._call(self.real, args, None, returncode)
1539
1540    def call_link(self, *args, returncode=0):
1541        return self._call(self.link, args, self._env, returncode)
1542
1543
1544def skip_if_pgo_task(test):
1545    """Skip decorator for tests not run in (non-extended) PGO task"""
1546    ok = not PGO or PGO_EXTENDED
1547    msg = "Not run for (non-extended) PGO task"
1548    return test if ok else unittest.skip(msg)(test)
1549
1550
1551def detect_api_mismatch(ref_api, other_api, *, ignore=()):
1552    """Returns the set of items in ref_api not in other_api, except for a
1553    defined list of items to be ignored in this check.
1554
1555    By default this skips private attributes beginning with '_' but
1556    includes all magic methods, i.e. those starting and ending in '__'.
1557    """
1558    missing_items = set(dir(ref_api)) - set(dir(other_api))
1559    if ignore:
1560        missing_items -= set(ignore)
1561    missing_items = set(m for m in missing_items
1562                        if not m.startswith('_') or m.endswith('__'))
1563    return missing_items
1564
1565
1566def check__all__(test_case, module, name_of_module=None, extra=(),
1567                 not_exported=()):
1568    """Assert that the __all__ variable of 'module' contains all public names.
1569
1570    The module's public names (its API) are detected automatically based on
1571    whether they match the public name convention and were defined in
1572    'module'.
1573
1574    The 'name_of_module' argument can specify (as a string or tuple thereof)
1575    what module(s) an API could be defined in in order to be detected as a
1576    public API. One case for this is when 'module' imports part of its public
1577    API from other modules, possibly a C backend (like 'csv' and its '_csv').
1578
1579    The 'extra' argument can be a set of names that wouldn't otherwise be
1580    automatically detected as "public", like objects without a proper
1581    '__module__' attribute. If provided, it will be added to the
1582    automatically detected ones.
1583
1584    The 'not_exported' argument can be a set of names that must not be treated
1585    as part of the public API even though their names indicate otherwise.
1586
1587    Usage:
1588        import bar
1589        import foo
1590        import unittest
1591        from test import support
1592
1593        class MiscTestCase(unittest.TestCase):
1594            def test__all__(self):
1595                support.check__all__(self, foo)
1596
1597        class OtherTestCase(unittest.TestCase):
1598            def test__all__(self):
1599                extra = {'BAR_CONST', 'FOO_CONST'}
1600                not_exported = {'baz'}  # Undocumented name.
1601                # bar imports part of its API from _bar.
1602                support.check__all__(self, bar, ('bar', '_bar'),
1603                                     extra=extra, not_exported=not_exported)
1604
1605    """
1606
1607    if name_of_module is None:
1608        name_of_module = (module.__name__, )
1609    elif isinstance(name_of_module, str):
1610        name_of_module = (name_of_module, )
1611
1612    expected = set(extra)
1613
1614    for name in dir(module):
1615        if name.startswith('_') or name in not_exported:
1616            continue
1617        obj = getattr(module, name)
1618        if (getattr(obj, '__module__', None) in name_of_module or
1619                (not hasattr(obj, '__module__') and
1620                 not isinstance(obj, types.ModuleType))):
1621            expected.add(name)
1622    test_case.assertCountEqual(module.__all__, expected)
1623
1624
1625def suppress_msvcrt_asserts(verbose=False):
1626    try:
1627        import msvcrt
1628    except ImportError:
1629        return
1630
1631    msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS
1632                        | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT
1633                        | msvcrt.SEM_NOGPFAULTERRORBOX
1634                        | msvcrt.SEM_NOOPENFILEERRORBOX)
1635
1636    # CrtSetReportMode() is only available in debug build
1637    if hasattr(msvcrt, 'CrtSetReportMode'):
1638        for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
1639            if verbose:
1640                msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
1641                msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
1642            else:
1643                msvcrt.CrtSetReportMode(m, 0)
1644
1645
1646class SuppressCrashReport:
1647    """Try to prevent a crash report from popping up.
1648
1649    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
1650    disable the creation of coredump file.
1651    """
1652    old_value = None
1653    old_modes = None
1654
1655    def __enter__(self):
1656        """On Windows, disable Windows Error Reporting dialogs using
1657        SetErrorMode() and CrtSetReportMode().
1658
1659        On UNIX, try to save the previous core file size limit, then set
1660        soft limit to 0.
1661        """
1662        if sys.platform.startswith('win'):
1663            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
1664            try:
1665                import msvcrt
1666            except ImportError:
1667                return
1668
1669            self.old_value = msvcrt.GetErrorMode()
1670
1671            msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX)
1672
1673            # bpo-23314: Suppress assert dialogs in debug builds.
1674            # CrtSetReportMode() is only available in debug build.
1675            if hasattr(msvcrt, 'CrtSetReportMode'):
1676                self.old_modes = {}
1677                for report_type in [msvcrt.CRT_WARN,
1678                                    msvcrt.CRT_ERROR,
1679                                    msvcrt.CRT_ASSERT]:
1680                    old_mode = msvcrt.CrtSetReportMode(report_type,
1681                            msvcrt.CRTDBG_MODE_FILE)
1682                    old_file = msvcrt.CrtSetReportFile(report_type,
1683                            msvcrt.CRTDBG_FILE_STDERR)
1684                    self.old_modes[report_type] = old_mode, old_file
1685
1686        else:
1687            try:
1688                import resource
1689                self.resource = resource
1690            except ImportError:
1691                self.resource = None
1692            if self.resource is not None:
1693                try:
1694                    self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE)
1695                    self.resource.setrlimit(self.resource.RLIMIT_CORE,
1696                                            (0, self.old_value[1]))
1697                except (ValueError, OSError):
1698                    pass
1699
1700            if sys.platform == 'darwin':
1701                import subprocess
1702                # Check if the 'Crash Reporter' on OSX was configured
1703                # in 'Developer' mode and warn that it will get triggered
1704                # when it is.
1705                #
1706                # This assumes that this context manager is used in tests
1707                # that might trigger the next manager.
1708                cmd = ['/usr/bin/defaults', 'read',
1709                       'com.apple.CrashReporter', 'DialogType']
1710                proc = subprocess.Popen(cmd,
1711                                        stdout=subprocess.PIPE,
1712                                        stderr=subprocess.PIPE)
1713                with proc:
1714                    stdout = proc.communicate()[0]
1715                if stdout.strip() == b'developer':
1716                    print("this test triggers the Crash Reporter, "
1717                          "that is intentional", end='', flush=True)
1718
1719        return self
1720
1721    def __exit__(self, *ignore_exc):
1722        """Restore Windows ErrorMode or core file behavior to initial value."""
1723        if self.old_value is None:
1724            return
1725
1726        if sys.platform.startswith('win'):
1727            import msvcrt
1728            msvcrt.SetErrorMode(self.old_value)
1729
1730            if self.old_modes:
1731                for report_type, (old_mode, old_file) in self.old_modes.items():
1732                    msvcrt.CrtSetReportMode(report_type, old_mode)
1733                    msvcrt.CrtSetReportFile(report_type, old_file)
1734        else:
1735            if self.resource is not None:
1736                try:
1737                    self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value)
1738                except (ValueError, OSError):
1739                    pass
1740
1741
1742def patch(test_instance, object_to_patch, attr_name, new_value):
1743    """Override 'object_to_patch'.'attr_name' with 'new_value'.
1744
1745    Also, add a cleanup procedure to 'test_instance' to restore
1746    'object_to_patch' value for 'attr_name'.
1747    The 'attr_name' should be a valid attribute for 'object_to_patch'.
1748
1749    """
1750    # check that 'attr_name' is a real attribute for 'object_to_patch'
1751    # will raise AttributeError if it does not exist
1752    getattr(object_to_patch, attr_name)
1753
1754    # keep a copy of the old value
1755    attr_is_local = False
1756    try:
1757        old_value = object_to_patch.__dict__[attr_name]
1758    except (AttributeError, KeyError):
1759        old_value = getattr(object_to_patch, attr_name, None)
1760    else:
1761        attr_is_local = True
1762
1763    # restore the value when the test is done
1764    def cleanup():
1765        if attr_is_local:
1766            setattr(object_to_patch, attr_name, old_value)
1767        else:
1768            delattr(object_to_patch, attr_name)
1769
1770    test_instance.addCleanup(cleanup)
1771
1772    # actually override the attribute
1773    setattr(object_to_patch, attr_name, new_value)
1774
1775
1776@contextlib.contextmanager
1777def patch_list(orig):
1778    """Like unittest.mock.patch.dict, but for lists."""
1779    try:
1780        saved = orig[:]
1781        yield
1782    finally:
1783        orig[:] = saved
1784
1785
1786def run_in_subinterp(code):
1787    """
1788    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
1789    module is enabled.
1790    """
1791    # Issue #10915, #15751: PyGILState_*() functions don't work with
1792    # sub-interpreters, the tracemalloc module uses these functions internally
1793    try:
1794        import tracemalloc
1795    except ImportError:
1796        pass
1797    else:
1798        if tracemalloc.is_tracing():
1799            raise unittest.SkipTest("run_in_subinterp() cannot be used "
1800                                     "if tracemalloc module is tracing "
1801                                     "memory allocations")
1802    import _testcapi
1803    return _testcapi.run_in_subinterp(code)
1804
1805
1806def check_free_after_iterating(test, iter, cls, args=()):
1807    class A(cls):
1808        def __del__(self):
1809            nonlocal done
1810            done = True
1811            try:
1812                next(it)
1813            except StopIteration:
1814                pass
1815
1816    done = False
1817    it = iter(A(*args))
1818    # Issue 26494: Shouldn't crash
1819    test.assertRaises(StopIteration, next, it)
1820    # The sequence should be deallocated just after the end of iterating
1821    gc_collect()
1822    test.assertTrue(done)
1823
1824
1825def missing_compiler_executable(cmd_names=[]):
1826    """Check if the compiler components used to build the interpreter exist.
1827
1828    Check for the existence of the compiler executables whose names are listed
1829    in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
1830    and return the first missing executable or None when none is found
1831    missing.
1832
1833    """
1834    # TODO (PEP 632): alternate check without using distutils
1835    from distutils import ccompiler, sysconfig, spawn, errors
1836    compiler = ccompiler.new_compiler()
1837    sysconfig.customize_compiler(compiler)
1838    if compiler.compiler_type == "msvc":
1839        # MSVC has no executables, so check whether initialization succeeds
1840        try:
1841            compiler.initialize()
1842        except errors.DistutilsPlatformError:
1843            return "msvc"
1844    for name in compiler.executables:
1845        if cmd_names and name not in cmd_names:
1846            continue
1847        cmd = getattr(compiler, name)
1848        if cmd_names:
1849            assert cmd is not None, \
1850                    "the '%s' executable is not configured" % name
1851        elif not cmd:
1852            continue
1853        if spawn.find_executable(cmd[0]) is None:
1854            return cmd[0]
1855
1856
1857_is_android_emulator = None
1858def setswitchinterval(interval):
1859    # Setting a very low gil interval on the Android emulator causes python
1860    # to hang (issue #26939).
1861    minimum_interval = 1e-5
1862    if is_android and interval < minimum_interval:
1863        global _is_android_emulator
1864        if _is_android_emulator is None:
1865            import subprocess
1866            _is_android_emulator = (subprocess.check_output(
1867                               ['getprop', 'ro.kernel.qemu']).strip() == b'1')
1868        if _is_android_emulator:
1869            interval = minimum_interval
1870    return sys.setswitchinterval(interval)
1871
1872
1873@contextlib.contextmanager
1874def disable_faulthandler():
1875    import faulthandler
1876
1877    # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
1878    # sys.stderr with a StringIO which has no file descriptor when a test
1879    # is run with -W/--verbose3.
1880    fd = sys.__stderr__.fileno()
1881
1882    is_enabled = faulthandler.is_enabled()
1883    try:
1884        faulthandler.disable()
1885        yield
1886    finally:
1887        if is_enabled:
1888            faulthandler.enable(file=fd, all_threads=True)
1889
1890
1891class SaveSignals:
1892    """
1893    Save and restore signal handlers.
1894
1895    This class is only able to save/restore signal handlers registered
1896    by the Python signal module: see bpo-13285 for "external" signal
1897    handlers.
1898    """
1899
1900    def __init__(self):
1901        import signal
1902        self.signal = signal
1903        self.signals = signal.valid_signals()
1904        # SIGKILL and SIGSTOP signals cannot be ignored nor caught
1905        for signame in ('SIGKILL', 'SIGSTOP'):
1906            try:
1907                signum = getattr(signal, signame)
1908            except AttributeError:
1909                continue
1910            self.signals.remove(signum)
1911        self.handlers = {}
1912
1913    def save(self):
1914        for signum in self.signals:
1915            handler = self.signal.getsignal(signum)
1916            if handler is None:
1917                # getsignal() returns None if a signal handler was not
1918                # registered by the Python signal module,
1919                # and the handler is not SIG_DFL nor SIG_IGN.
1920                #
1921                # Ignore the signal: we cannot restore the handler.
1922                continue
1923            self.handlers[signum] = handler
1924
1925    def restore(self):
1926        for signum, handler in self.handlers.items():
1927            self.signal.signal(signum, handler)
1928
1929
1930def with_pymalloc():
1931    import _testcapi
1932    return _testcapi.WITH_PYMALLOC
1933
1934
1935class _ALWAYS_EQ:
1936    """
1937    Object that is equal to anything.
1938    """
1939    def __eq__(self, other):
1940        return True
1941    def __ne__(self, other):
1942        return False
1943
1944ALWAYS_EQ = _ALWAYS_EQ()
1945
1946class _NEVER_EQ:
1947    """
1948    Object that is not equal to anything.
1949    """
1950    def __eq__(self, other):
1951        return False
1952    def __ne__(self, other):
1953        return True
1954    def __hash__(self):
1955        return 1
1956
1957NEVER_EQ = _NEVER_EQ()
1958
1959@functools.total_ordering
1960class _LARGEST:
1961    """
1962    Object that is greater than anything (except itself).
1963    """
1964    def __eq__(self, other):
1965        return isinstance(other, _LARGEST)
1966    def __lt__(self, other):
1967        return False
1968
1969LARGEST = _LARGEST()
1970
1971@functools.total_ordering
1972class _SMALLEST:
1973    """
1974    Object that is less than anything (except itself).
1975    """
1976    def __eq__(self, other):
1977        return isinstance(other, _SMALLEST)
1978    def __gt__(self, other):
1979        return False
1980
1981SMALLEST = _SMALLEST()
1982
1983def maybe_get_event_loop_policy():
1984    """Return the global event loop policy if one is set, else return None."""
1985    import asyncio.events
1986    return asyncio.events._event_loop_policy
1987
1988# Helpers for testing hashing.
1989NHASHBITS = sys.hash_info.width # number of bits in hash() result
1990assert NHASHBITS in (32, 64)
1991
1992# Return mean and sdev of number of collisions when tossing nballs balls
1993# uniformly at random into nbins bins.  By definition, the number of
1994# collisions is the number of balls minus the number of occupied bins at
1995# the end.
1996def collision_stats(nbins, nballs):
1997    n, k = nbins, nballs
1998    # prob a bin empty after k trials = (1 - 1/n)**k
1999    # mean # empty is then n * (1 - 1/n)**k
2000    # so mean # occupied is n - n * (1 - 1/n)**k
2001    # so collisions = k - (n - n*(1 - 1/n)**k)
2002    #
2003    # For the variance:
2004    # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 =
2005    # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty)
2006    #
2007    # Massive cancellation occurs, and, e.g., for a 64-bit hash code
2008    # 1-1/2**64 rounds uselessly to 1.0.  Rather than make heroic (and
2009    # error-prone) efforts to rework the naive formulas to avoid those,
2010    # we use the `decimal` module to get plenty of extra precision.
2011    #
2012    # Note:  the exact values are straightforward to compute with
2013    # rationals, but in context that's unbearably slow, requiring
2014    # multi-million bit arithmetic.
2015    import decimal
2016    with decimal.localcontext() as ctx:
2017        bits = n.bit_length() * 2  # bits in n**2
2018        # At least that many bits will likely cancel out.
2019        # Use that many decimal digits instead.
2020        ctx.prec = max(bits, 30)
2021        dn = decimal.Decimal(n)
2022        p1empty = ((dn - 1) / dn) ** k
2023        meanempty = n * p1empty
2024        occupied = n - meanempty
2025        collisions = k - occupied
2026        var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
2027        return float(collisions), float(var.sqrt())
2028
2029
2030class catch_unraisable_exception:
2031    """
2032    Context manager catching unraisable exception using sys.unraisablehook.
2033
2034    Storing the exception value (cm.unraisable.exc_value) creates a reference
2035    cycle. The reference cycle is broken explicitly when the context manager
2036    exits.
2037
2038    Storing the object (cm.unraisable.object) can resurrect it if it is set to
2039    an object which is being finalized. Exiting the context manager clears the
2040    stored object.
2041
2042    Usage:
2043
2044        with support.catch_unraisable_exception() as cm:
2045            # code creating an "unraisable exception"
2046            ...
2047
2048            # check the unraisable exception: use cm.unraisable
2049            ...
2050
2051        # cm.unraisable attribute no longer exists at this point
2052        # (to break a reference cycle)
2053    """
2054
2055    def __init__(self):
2056        self.unraisable = None
2057        self._old_hook = None
2058
2059    def _hook(self, unraisable):
2060        # Storing unraisable.object can resurrect an object which is being
2061        # finalized. Storing unraisable.exc_value creates a reference cycle.
2062        self.unraisable = unraisable
2063
2064    def __enter__(self):
2065        self._old_hook = sys.unraisablehook
2066        sys.unraisablehook = self._hook
2067        return self
2068
2069    def __exit__(self, *exc_info):
2070        sys.unraisablehook = self._old_hook
2071        del self.unraisable
2072
2073
2074def wait_process(pid, *, exitcode, timeout=None):
2075    """
2076    Wait until process pid completes and check that the process exit code is
2077    exitcode.
2078
2079    Raise an AssertionError if the process exit code is not equal to exitcode.
2080
2081    If the process runs longer than timeout seconds (LONG_TIMEOUT by default),
2082    kill the process (if signal.SIGKILL is available) and raise an
2083    AssertionError. The timeout feature is not available on Windows.
2084    """
2085    if os.name != "nt":
2086        import signal
2087
2088        if timeout is None:
2089            timeout = LONG_TIMEOUT
2090        t0 = time.monotonic()
2091        sleep = 0.001
2092        max_sleep = 0.1
2093        while True:
2094            pid2, status = os.waitpid(pid, os.WNOHANG)
2095            if pid2 != 0:
2096                break
2097            # process is still running
2098
2099            dt = time.monotonic() - t0
2100            if dt > timeout:
2101                try:
2102                    os.kill(pid, signal.SIGKILL)
2103                    os.waitpid(pid, 0)
2104                except OSError:
2105                    # Ignore errors like ChildProcessError or PermissionError
2106                    pass
2107
2108                raise AssertionError(f"process {pid} is still running "
2109                                     f"after {dt:.1f} seconds")
2110
2111            sleep = min(sleep * 2, max_sleep)
2112            time.sleep(sleep)
2113    else:
2114        # Windows implementation
2115        pid2, status = os.waitpid(pid, 0)
2116
2117    exitcode2 = os.waitstatus_to_exitcode(status)
2118    if exitcode2 != exitcode:
2119        raise AssertionError(f"process {pid} exited with code {exitcode2}, "
2120                             f"but exit code {exitcode} is expected")
2121
2122    # sanity check: it should not fail in practice
2123    if pid2 != pid:
2124        raise AssertionError(f"pid {pid2} != pid {pid}")
2125
2126def skip_if_broken_multiprocessing_synchronize():
2127    """
2128    Skip tests if the multiprocessing.synchronize module is missing, if there
2129    is no available semaphore implementation, or if creating a lock raises an
2130    OSError (on Linux only).
2131    """
2132    from .import_helper import import_module
2133
2134    # Skip tests if the _multiprocessing extension is missing.
2135    import_module('_multiprocessing')
2136
2137    # Skip tests if there is no available semaphore implementation:
2138    # multiprocessing.synchronize requires _multiprocessing.SemLock.
2139    synchronize = import_module('multiprocessing.synchronize')
2140
2141    if sys.platform == "linux":
2142        try:
2143            # bpo-38377: On Linux, creating a semaphore fails with OSError
2144            # if the current user does not have the permission to create
2145            # a file in /dev/shm/ directory.
2146            synchronize.Lock(ctx=None)
2147        except OSError as exc:
2148            raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}")
2149
2150
2151def check_disallow_instantiation(testcase, tp, *args, **kwds):
2152    """
2153    Check that given type cannot be instantiated using *args and **kwds.
2154
2155    See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag.
2156    """
2157    mod = tp.__module__
2158    name = tp.__name__
2159    if mod != 'builtins':
2160        qualname = f"{mod}.{name}"
2161    else:
2162        qualname = f"{name}"
2163    msg = f"cannot create '{re.escape(qualname)}' instances"
2164    testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds)
2165
2166@contextlib.contextmanager
2167def infinite_recursion(max_depth=75):
2168    """Set a lower limit for tests that interact with infinite recursions
2169    (e.g test_ast.ASTHelpers_Test.test_recursion_direct) since on some
2170    debug windows builds, due to not enough functions being inlined the
2171    stack size might not handle the default recursion limit (1000). See
2172    bpo-11105 for details."""
2173
2174    original_depth = sys.getrecursionlimit()
2175    try:
2176        sys.setrecursionlimit(max_depth)
2177        yield
2178    finally:
2179        sys.setrecursionlimit(original_depth)
2180
2181def ignore_deprecations_from(module: str, *, like: str) -> object:
2182    token = object()
2183    warnings.filterwarnings(
2184        "ignore",
2185        category=DeprecationWarning,
2186        module=module,
2187        message=like + fr"(?#support{id(token)})",
2188    )
2189    return token
2190
2191def clear_ignored_deprecations(*tokens: object) -> None:
2192    if not tokens:
2193        raise ValueError("Provide token or tokens returned by ignore_deprecations_from")
2194
2195    new_filters = []
2196    endswith = tuple(rf"(?#support{id(token)})" for token in tokens)
2197    for action, message, category, module, lineno in warnings.filters:
2198        if action == "ignore" and category is DeprecationWarning:
2199            if isinstance(message, re.Pattern):
2200                msg = message.pattern
2201            else:
2202                msg = message or ""
2203            if msg.endswith(endswith):
2204                continue
2205        new_filters.append((action, message, category, module, lineno))
2206    if warnings.filters != new_filters:
2207        warnings.filters[:] = new_filters
2208        warnings._filters_mutated()
2209
2210
2211# Skip a test if venv with pip is known to not work.
2212def requires_venv_with_pip():
2213    # ensurepip requires zlib to open ZIP archives (.whl binary wheel packages)
2214    try:
2215        import zlib
2216    except ImportError:
2217        return unittest.skipIf(True, "venv: ensurepip requires zlib")
2218
2219    # bpo-26610: pip/pep425tags.py requires ctypes.
2220    # gh-92820: setuptools/windows_support.py uses ctypes (setuptools 58.1).
2221    try:
2222        import ctypes
2223    except ImportError:
2224        ctypes = None
2225    return unittest.skipUnless(ctypes, 'venv: pip requires ctypes')
2226
2227
2228@contextlib.contextmanager
2229def adjust_int_max_str_digits(max_digits):
2230    """Temporarily change the integer string conversion length limit."""
2231    current = sys.get_int_max_str_digits()
2232    try:
2233        sys.set_int_max_str_digits(max_digits)
2234        yield
2235    finally:
2236        sys.set_int_max_str_digits(current)
2237