1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.support': 4 raise ImportError('support must be imported from the test package') 5 6import contextlib 7import functools 8import getpass 9import os 10import re 11import stat 12import sys 13import sysconfig 14import time 15import types 16import unittest 17import warnings 18 19from .testresult import get_test_runner 20 21 22try: 23 from _testcapi import unicode_legacy_string 24except ImportError: 25 unicode_legacy_string = None 26 27__all__ = [ 28 # globals 29 "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", 30 # exceptions 31 "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", 32 # io 33 "record_original_stdout", "get_original_stdout", "captured_stdout", 34 "captured_stdin", "captured_stderr", 35 # unittest 36 "is_resource_enabled", "requires", "requires_freebsd_version", 37 "requires_linux_version", "requires_mac_ver", 38 "check_syntax_error", 39 "BasicTestRunner", "run_unittest", "run_doctest", 40 "requires_gzip", "requires_bz2", "requires_lzma", 41 "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", 42 "requires_IEEE_754", "requires_zlib", 43 "has_fork_support", "requires_fork", 44 "has_subprocess_support", "requires_subprocess", 45 "has_socket_support", "requires_working_socket", 46 "anticipate_failure", "load_package_tests", "detect_api_mismatch", 47 "check__all__", "skip_if_buggy_ucrt_strfptime", 48 "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer", 49 # sys 50 "is_jython", "is_android", "is_emscripten", "is_wasi", 51 "check_impl_detail", "unix_shell", "setswitchinterval", 52 # network 53 "open_urlresource", 54 # processes 55 "reap_children", 56 # miscellaneous 57 "run_with_locale", "swap_item", "findfile", "infinite_recursion", 58 "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", 59 "run_with_tz", "PGO", "missing_compiler_executable", 60 "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST", 61 "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT", 62 ] 63 64 65# Timeout in seconds for tests using a network server listening on the network 66# local loopback interface like 127.0.0.1. 67# 68# The timeout is long enough to prevent test failure: it takes into account 69# that the client and the server can run in different threads or even different 70# processes. 71# 72# The timeout should be long enough for connect(), recv() and send() methods 73# of socket.socket. 74LOOPBACK_TIMEOUT = 5.0 75if sys.platform == 'win32' and ' 32 bit (ARM)' in sys.version: 76 # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2 77 # seconds on Windows ARM32 buildbot 78 LOOPBACK_TIMEOUT = 10 79elif sys.platform == 'vxworks': 80 LOOPBACK_TIMEOUT = 10 81 82# Timeout in seconds for network requests going to the internet. The timeout is 83# short enough to prevent a test to wait for too long if the internet request 84# is blocked for whatever reason. 85# 86# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed, 87# but skip the test instead: see transient_internet(). 88INTERNET_TIMEOUT = 60.0 89 90# Timeout in seconds to mark a test as failed if the test takes "too long". 91# 92# The timeout value depends on the regrtest --timeout command line option. 93# 94# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use 95# LONG_TIMEOUT instead. 96SHORT_TIMEOUT = 30.0 97 98# Timeout in seconds to detect when a test hangs. 99# 100# It is long enough to reduce the risk of test failure on the slowest Python 101# buildbots. It should not be used to mark a test as failed if the test takes 102# "too long". The timeout value depends on the regrtest --timeout command line 103# option. 104LONG_TIMEOUT = 5 * 60.0 105 106# TEST_HOME_DIR refers to the top level directory of the "test" package 107# that contains Python's regression test suite 108TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) 109TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) 110STDLIB_DIR = os.path.dirname(TEST_HOME_DIR) 111REPO_ROOT = os.path.dirname(STDLIB_DIR) 112 113 114class Error(Exception): 115 """Base class for regression test exceptions.""" 116 117class TestFailed(Error): 118 """Test failed.""" 119 120class TestFailedWithDetails(TestFailed): 121 """Test failed.""" 122 def __init__(self, msg, errors, failures): 123 self.msg = msg 124 self.errors = errors 125 self.failures = failures 126 super().__init__(msg, errors, failures) 127 128 def __str__(self): 129 return self.msg 130 131class TestDidNotRun(Error): 132 """Test did not run any subtests.""" 133 134class ResourceDenied(unittest.SkipTest): 135 """Test skipped because it requested a disallowed resource. 136 137 This is raised when a test calls requires() for a resource that 138 has not be enabled. It is used to distinguish between expected 139 and unexpected skips. 140 """ 141 142def anticipate_failure(condition): 143 """Decorator to mark a test that is known to be broken in some cases 144 145 Any use of this decorator should have a comment identifying the 146 associated tracker issue. 147 """ 148 if condition: 149 return unittest.expectedFailure 150 return lambda f: f 151 152def load_package_tests(pkg_dir, loader, standard_tests, pattern): 153 """Generic load_tests implementation for simple test packages. 154 155 Most packages can implement load_tests using this function as follows: 156 157 def load_tests(*args): 158 return load_package_tests(os.path.dirname(__file__), *args) 159 """ 160 if pattern is None: 161 pattern = "test*" 162 top_dir = STDLIB_DIR 163 package_tests = loader.discover(start_dir=pkg_dir, 164 top_level_dir=top_dir, 165 pattern=pattern) 166 standard_tests.addTests(package_tests) 167 return standard_tests 168 169 170def get_attribute(obj, name): 171 """Get an attribute, raising SkipTest if AttributeError is raised.""" 172 try: 173 attribute = getattr(obj, name) 174 except AttributeError: 175 raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) 176 else: 177 return attribute 178 179verbose = 1 # Flag set to 0 by regrtest.py 180use_resources = None # Flag set to [] by regrtest.py 181max_memuse = 0 # Disable bigmem tests (they will still be run with 182 # small sizes, to make sure they work.) 183real_max_memuse = 0 184junit_xml_list = None # list of testsuite XML elements 185failfast = False 186 187# _original_stdout is meant to hold stdout at the time regrtest began. 188# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 189# The point is to have some flavor of stdout the user can actually see. 190_original_stdout = None 191def record_original_stdout(stdout): 192 global _original_stdout 193 _original_stdout = stdout 194 195def get_original_stdout(): 196 return _original_stdout or sys.stdout 197 198 199def _force_run(path, func, *args): 200 try: 201 return func(*args) 202 except FileNotFoundError as err: 203 # chmod() won't fix a missing file. 204 if verbose >= 2: 205 print('%s: %s' % (err.__class__.__name__, err)) 206 raise 207 except OSError as err: 208 if verbose >= 2: 209 print('%s: %s' % (err.__class__.__name__, err)) 210 print('re-run %s%r' % (func.__name__, args)) 211 os.chmod(path, stat.S_IRWXU) 212 return func(*args) 213 214 215# Check whether a gui is actually available 216def _is_gui_available(): 217 if hasattr(_is_gui_available, 'result'): 218 return _is_gui_available.result 219 import platform 220 reason = None 221 if sys.platform.startswith('win') and platform.win32_is_iot(): 222 reason = "gui is not available on Windows IoT Core" 223 elif sys.platform.startswith('win'): 224 # if Python is running as a service (such as the buildbot service), 225 # gui interaction may be disallowed 226 import ctypes 227 import ctypes.wintypes 228 UOI_FLAGS = 1 229 WSF_VISIBLE = 0x0001 230 class USEROBJECTFLAGS(ctypes.Structure): 231 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 232 ("fReserved", ctypes.wintypes.BOOL), 233 ("dwFlags", ctypes.wintypes.DWORD)] 234 dll = ctypes.windll.user32 235 h = dll.GetProcessWindowStation() 236 if not h: 237 raise ctypes.WinError() 238 uof = USEROBJECTFLAGS() 239 needed = ctypes.wintypes.DWORD() 240 res = dll.GetUserObjectInformationW(h, 241 UOI_FLAGS, 242 ctypes.byref(uof), 243 ctypes.sizeof(uof), 244 ctypes.byref(needed)) 245 if not res: 246 raise ctypes.WinError() 247 if not bool(uof.dwFlags & WSF_VISIBLE): 248 reason = "gui not available (WSF_VISIBLE flag not set)" 249 elif sys.platform == 'darwin': 250 # The Aqua Tk implementations on OS X can abort the process if 251 # being called in an environment where a window server connection 252 # cannot be made, for instance when invoked by a buildbot or ssh 253 # process not running under the same user id as the current console 254 # user. To avoid that, raise an exception if the window manager 255 # connection is not available. 256 from ctypes import cdll, c_int, pointer, Structure 257 from ctypes.util import find_library 258 259 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 260 261 if app_services.CGMainDisplayID() == 0: 262 reason = "gui tests cannot run without OS X window manager" 263 else: 264 class ProcessSerialNumber(Structure): 265 _fields_ = [("highLongOfPSN", c_int), 266 ("lowLongOfPSN", c_int)] 267 psn = ProcessSerialNumber() 268 psn_p = pointer(psn) 269 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 270 (app_services.SetFrontProcess(psn_p) < 0) ): 271 reason = "cannot run without OS X gui process" 272 273 # check on every platform whether tkinter can actually do anything 274 if not reason: 275 try: 276 from tkinter import Tk 277 root = Tk() 278 root.withdraw() 279 root.update() 280 root.destroy() 281 except Exception as e: 282 err_string = str(e) 283 if len(err_string) > 50: 284 err_string = err_string[:50] + ' [...]' 285 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 286 err_string) 287 288 _is_gui_available.reason = reason 289 _is_gui_available.result = not reason 290 291 return _is_gui_available.result 292 293def is_resource_enabled(resource): 294 """Test whether a resource is enabled. 295 296 Known resources are set by regrtest.py. If not running under regrtest.py, 297 all resources are assumed enabled unless use_resources has been set. 298 """ 299 return use_resources is None or resource in use_resources 300 301def requires(resource, msg=None): 302 """Raise ResourceDenied if the specified resource is not available.""" 303 if not is_resource_enabled(resource): 304 if msg is None: 305 msg = "Use of the %r resource not enabled" % resource 306 raise ResourceDenied(msg) 307 if resource in {"network", "urlfetch"} and not has_socket_support: 308 raise ResourceDenied("No socket support") 309 if resource == 'gui' and not _is_gui_available(): 310 raise ResourceDenied(_is_gui_available.reason) 311 312def _requires_unix_version(sysname, min_version): 313 """Decorator raising SkipTest if the OS is `sysname` and the version is less 314 than `min_version`. 315 316 For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if 317 the FreeBSD version is less than 7.2. 318 """ 319 import platform 320 min_version_txt = '.'.join(map(str, min_version)) 321 version_txt = platform.release().split('-', 1)[0] 322 if platform.system() == sysname: 323 try: 324 version = tuple(map(int, version_txt.split('.'))) 325 except ValueError: 326 skip = False 327 else: 328 skip = version < min_version 329 else: 330 skip = False 331 332 return unittest.skipIf( 333 skip, 334 f"{sysname} version {min_version_txt} or higher required, not " 335 f"{version_txt}" 336 ) 337 338 339def requires_freebsd_version(*min_version): 340 """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is 341 less than `min_version`. 342 343 For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD 344 version is less than 7.2. 345 """ 346 return _requires_unix_version('FreeBSD', min_version) 347 348def requires_linux_version(*min_version): 349 """Decorator raising SkipTest if the OS is Linux and the Linux version is 350 less than `min_version`. 351 352 For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux 353 version is less than 2.6.32. 354 """ 355 return _requires_unix_version('Linux', min_version) 356 357def requires_mac_ver(*min_version): 358 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 359 version if less than min_version. 360 361 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 362 is lesser than 10.5. 363 """ 364 def decorator(func): 365 @functools.wraps(func) 366 def wrapper(*args, **kw): 367 if sys.platform == 'darwin': 368 import platform 369 version_txt = platform.mac_ver()[0] 370 try: 371 version = tuple(map(int, version_txt.split('.'))) 372 except ValueError: 373 pass 374 else: 375 if version < min_version: 376 min_version_txt = '.'.join(map(str, min_version)) 377 raise unittest.SkipTest( 378 "Mac OS X %s or higher required, not %s" 379 % (min_version_txt, version_txt)) 380 return func(*args, **kw) 381 wrapper.min_version = min_version 382 return wrapper 383 return decorator 384 385 386def skip_if_buildbot(reason=None): 387 """Decorator raising SkipTest if running on a buildbot.""" 388 if not reason: 389 reason = 'not suitable for buildbots' 390 try: 391 isbuildbot = getpass.getuser().lower() == 'buildbot' 392 except (KeyError, EnvironmentError) as err: 393 warnings.warn(f'getpass.getuser() failed {err}.', RuntimeWarning) 394 isbuildbot = False 395 return unittest.skipIf(isbuildbot, reason) 396 397def check_sanitizer(*, address=False, memory=False, ub=False): 398 """Returns True if Python is compiled with sanitizer support""" 399 if not (address or memory or ub): 400 raise ValueError('At least one of address, memory, or ub must be True') 401 402 403 _cflags = sysconfig.get_config_var('CFLAGS') or '' 404 _config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' 405 memory_sanitizer = ( 406 '-fsanitize=memory' in _cflags or 407 '--with-memory-sanitizer' in _config_args 408 ) 409 address_sanitizer = ( 410 '-fsanitize=address' in _cflags or 411 '--with-memory-sanitizer' in _config_args 412 ) 413 ub_sanitizer = ( 414 '-fsanitize=undefined' in _cflags or 415 '--with-undefined-behavior-sanitizer' in _config_args 416 ) 417 return ( 418 (memory and memory_sanitizer) or 419 (address and address_sanitizer) or 420 (ub and ub_sanitizer) 421 ) 422 423 424def skip_if_sanitizer(reason=None, *, address=False, memory=False, ub=False): 425 """Decorator raising SkipTest if running with a sanitizer active.""" 426 if not reason: 427 reason = 'not working with sanitizers active' 428 skip = check_sanitizer(address=address, memory=memory, ub=ub) 429 return unittest.skipIf(skip, reason) 430 431 432def system_must_validate_cert(f): 433 """Skip the test on TLS certificate validation failures.""" 434 @functools.wraps(f) 435 def dec(*args, **kwargs): 436 try: 437 f(*args, **kwargs) 438 except OSError as e: 439 if "CERTIFICATE_VERIFY_FAILED" in str(e): 440 raise unittest.SkipTest("system does not contain " 441 "necessary certificates") 442 raise 443 return dec 444 445# A constant likely larger than the underlying OS pipe buffer size, to 446# make writes blocking. 447# Windows limit seems to be around 512 B, and many Unix kernels have a 448# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 449# (see issue #17835 for a discussion of this number). 450PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 451 452# A constant likely larger than the underlying OS socket buffer size, to make 453# writes blocking. 454# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 455# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 456# for a discussion of this number. 457SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 458 459# decorator for skipping tests on non-IEEE 754 platforms 460requires_IEEE_754 = unittest.skipUnless( 461 float.__getformat__("double").startswith("IEEE"), 462 "test requires IEEE 754 doubles") 463 464def requires_zlib(reason='requires zlib'): 465 try: 466 import zlib 467 except ImportError: 468 zlib = None 469 return unittest.skipUnless(zlib, reason) 470 471def requires_gzip(reason='requires gzip'): 472 try: 473 import gzip 474 except ImportError: 475 gzip = None 476 return unittest.skipUnless(gzip, reason) 477 478def requires_bz2(reason='requires bz2'): 479 try: 480 import bz2 481 except ImportError: 482 bz2 = None 483 return unittest.skipUnless(bz2, reason) 484 485def requires_lzma(reason='requires lzma'): 486 try: 487 import lzma 488 except ImportError: 489 lzma = None 490 return unittest.skipUnless(lzma, reason) 491 492def has_no_debug_ranges(): 493 try: 494 import _testinternalcapi 495 except ImportError: 496 raise unittest.SkipTest("_testinternalcapi required") 497 config = _testinternalcapi.get_config() 498 return not bool(config['code_debug_ranges']) 499 500def requires_debug_ranges(reason='requires co_positions / debug_ranges'): 501 return unittest.skipIf(has_no_debug_ranges(), reason) 502 503requires_legacy_unicode_capi = unittest.skipUnless(unicode_legacy_string, 504 'requires legacy Unicode C API') 505 506is_jython = sys.platform.startswith('java') 507 508is_android = hasattr(sys, 'getandroidapilevel') 509 510if sys.platform not in ('win32', 'vxworks'): 511 unix_shell = '/system/bin/sh' if is_android else '/bin/sh' 512else: 513 unix_shell = None 514 515# wasm32-emscripten and -wasi are POSIX-like but do not 516# have subprocess or fork support. 517is_emscripten = sys.platform == "emscripten" 518is_wasi = sys.platform == "wasi" 519 520has_fork_support = hasattr(os, "fork") and not is_emscripten and not is_wasi 521 522def requires_fork(): 523 return unittest.skipUnless(has_fork_support, "requires working os.fork()") 524 525has_subprocess_support = not is_emscripten and not is_wasi 526 527def requires_subprocess(): 528 """Used for subprocess, os.spawn calls, fd inheritance""" 529 return unittest.skipUnless(has_subprocess_support, "requires subprocess support") 530 531# Emscripten's socket emulation and WASI sockets have limitations. 532has_socket_support = not is_emscripten and not is_wasi 533 534def requires_working_socket(*, module=False): 535 """Skip tests or modules that require working sockets 536 537 Can be used as a function/class decorator or to skip an entire module. 538 """ 539 msg = "requires socket support" 540 if module: 541 if not has_socket_support: 542 raise unittest.SkipTest(msg) 543 else: 544 return unittest.skipUnless(has_socket_support, msg) 545 546# Does strftime() support glibc extension like '%4Y'? 547has_strftime_extensions = False 548if sys.platform != "win32": 549 # bpo-47037: Windows debug builds crash with "Debug Assertion Failed" 550 try: 551 has_strftime_extensions = time.strftime("%4Y") != "%4Y" 552 except ValueError: 553 pass 554 555# Define the URL of a dedicated HTTP server for the network tests. 556# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. 557TEST_HTTP_URL = "http://www.pythontest.net" 558 559# Set by libregrtest/main.py so we can skip tests that are not 560# useful for PGO 561PGO = False 562 563# Set by libregrtest/main.py if we are running the extended (time consuming) 564# PGO task. If this is True, PGO is also True. 565PGO_EXTENDED = False 566 567# TEST_DATA_DIR is used as a target download location for remote resources 568TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") 569 570 571def darwin_malloc_err_warning(test_name): 572 """Assure user that loud errors generated by macOS libc's malloc are 573 expected.""" 574 if sys.platform != 'darwin': 575 return 576 577 import shutil 578 msg = ' NOTICE ' 579 detail = (f'{test_name} may generate "malloc can\'t allocate region"\n' 580 'warnings on macOS systems. This behavior is known. Do not\n' 581 'report a bug unless tests are also failing. See bpo-40928.') 582 583 padding, _ = shutil.get_terminal_size() 584 print(msg.center(padding, '-')) 585 print(detail) 586 print('-' * padding) 587 588 589def findfile(filename, subdir=None): 590 """Try to find a file on sys.path or in the test directory. If it is not 591 found the argument passed to the function is returned (this does not 592 necessarily signal failure; could still be the legitimate path). 593 594 Setting *subdir* indicates a relative path to use to find the file 595 rather than looking directly in the path directories. 596 """ 597 if os.path.isabs(filename): 598 return filename 599 if subdir is not None: 600 filename = os.path.join(subdir, filename) 601 path = [TEST_HOME_DIR] + sys.path 602 for dn in path: 603 fn = os.path.join(dn, filename) 604 if os.path.exists(fn): return fn 605 return filename 606 607 608def sortdict(dict): 609 "Like repr(dict), but in sorted order." 610 items = sorted(dict.items()) 611 reprpairs = ["%r: %r" % pair for pair in items] 612 withcommas = ", ".join(reprpairs) 613 return "{%s}" % withcommas 614 615def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None): 616 with testcase.assertRaisesRegex(SyntaxError, errtext) as cm: 617 compile(statement, '<test string>', 'exec') 618 err = cm.exception 619 testcase.assertIsNotNone(err.lineno) 620 if lineno is not None: 621 testcase.assertEqual(err.lineno, lineno) 622 testcase.assertIsNotNone(err.offset) 623 if offset is not None: 624 testcase.assertEqual(err.offset, offset) 625 626 627def open_urlresource(url, *args, **kw): 628 import urllib.request, urllib.parse 629 from .os_helper import unlink 630 try: 631 import gzip 632 except ImportError: 633 gzip = None 634 635 check = kw.pop('check', None) 636 637 filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 638 639 fn = os.path.join(TEST_DATA_DIR, filename) 640 641 def check_valid_file(fn): 642 f = open(fn, *args, **kw) 643 if check is None: 644 return f 645 elif check(f): 646 f.seek(0) 647 return f 648 f.close() 649 650 if os.path.exists(fn): 651 f = check_valid_file(fn) 652 if f is not None: 653 return f 654 unlink(fn) 655 656 # Verify the requirement before downloading the file 657 requires('urlfetch') 658 659 if verbose: 660 print('\tfetching %s ...' % url, file=get_original_stdout()) 661 opener = urllib.request.build_opener() 662 if gzip: 663 opener.addheaders.append(('Accept-Encoding', 'gzip')) 664 f = opener.open(url, timeout=INTERNET_TIMEOUT) 665 if gzip and f.headers.get('Content-Encoding') == 'gzip': 666 f = gzip.GzipFile(fileobj=f) 667 try: 668 with open(fn, "wb") as out: 669 s = f.read() 670 while s: 671 out.write(s) 672 s = f.read() 673 finally: 674 f.close() 675 676 f = check_valid_file(fn) 677 if f is not None: 678 return f 679 raise TestFailed('invalid resource %r' % fn) 680 681 682@contextlib.contextmanager 683def captured_output(stream_name): 684 """Return a context manager used by captured_stdout/stdin/stderr 685 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 686 import io 687 orig_stdout = getattr(sys, stream_name) 688 setattr(sys, stream_name, io.StringIO()) 689 try: 690 yield getattr(sys, stream_name) 691 finally: 692 setattr(sys, stream_name, orig_stdout) 693 694def captured_stdout(): 695 """Capture the output of sys.stdout: 696 697 with captured_stdout() as stdout: 698 print("hello") 699 self.assertEqual(stdout.getvalue(), "hello\\n") 700 """ 701 return captured_output("stdout") 702 703def captured_stderr(): 704 """Capture the output of sys.stderr: 705 706 with captured_stderr() as stderr: 707 print("hello", file=sys.stderr) 708 self.assertEqual(stderr.getvalue(), "hello\\n") 709 """ 710 return captured_output("stderr") 711 712def captured_stdin(): 713 """Capture the input to sys.stdin: 714 715 with captured_stdin() as stdin: 716 stdin.write('hello\\n') 717 stdin.seek(0) 718 # call test code that consumes from sys.stdin 719 captured = input() 720 self.assertEqual(captured, "hello") 721 """ 722 return captured_output("stdin") 723 724 725def gc_collect(): 726 """Force as many objects as possible to be collected. 727 728 In non-CPython implementations of Python, this is needed because timely 729 deallocation is not guaranteed by the garbage collector. (Even in CPython 730 this can be the case in case of reference cycles.) This means that __del__ 731 methods may be called later than expected and weakrefs may remain alive for 732 longer than expected. This function tries its best to force all garbage 733 objects to disappear. 734 """ 735 import gc 736 gc.collect() 737 if is_jython: 738 time.sleep(0.1) 739 gc.collect() 740 gc.collect() 741 742@contextlib.contextmanager 743def disable_gc(): 744 import gc 745 have_gc = gc.isenabled() 746 gc.disable() 747 try: 748 yield 749 finally: 750 if have_gc: 751 gc.enable() 752 753 754def python_is_optimized(): 755 """Find if Python was built with optimizations.""" 756 cflags = sysconfig.get_config_var('PY_CFLAGS') or '' 757 final_opt = "" 758 for opt in cflags.split(): 759 if opt.startswith('-O'): 760 final_opt = opt 761 return final_opt not in ('', '-O0', '-Og') 762 763 764_header = 'nP' 765_align = '0n' 766if hasattr(sys, "getobjects"): 767 _header = '2P' + _header 768 _align = '0P' 769_vheader = _header + 'n' 770 771def calcobjsize(fmt): 772 import struct 773 return struct.calcsize(_header + fmt + _align) 774 775def calcvobjsize(fmt): 776 import struct 777 return struct.calcsize(_vheader + fmt + _align) 778 779 780_TPFLAGS_HAVE_GC = 1<<14 781_TPFLAGS_HEAPTYPE = 1<<9 782 783def check_sizeof(test, o, size): 784 try: 785 import _testinternalcapi 786 except ImportError: 787 raise unittest.SkipTest("_testinternalcapi required") 788 result = sys.getsizeof(o) 789 # add GC header size 790 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 791 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 792 size += _testinternalcapi.SIZEOF_PYGC_HEAD 793 msg = 'wrong size for %s: got %d, expected %d' \ 794 % (type(o), result, size) 795 test.assertEqual(result, size, msg) 796 797#======================================================================= 798# Decorator for running a function in a different locale, correctly resetting 799# it afterwards. 800 801@contextlib.contextmanager 802def run_with_locale(catstr, *locales): 803 try: 804 import locale 805 category = getattr(locale, catstr) 806 orig_locale = locale.setlocale(category) 807 except AttributeError: 808 # if the test author gives us an invalid category string 809 raise 810 except: 811 # cannot retrieve original locale, so do nothing 812 locale = orig_locale = None 813 else: 814 for loc in locales: 815 try: 816 locale.setlocale(category, loc) 817 break 818 except: 819 pass 820 821 try: 822 yield 823 finally: 824 if locale and orig_locale: 825 locale.setlocale(category, orig_locale) 826 827#======================================================================= 828# Decorator for running a function in a specific timezone, correctly 829# resetting it afterwards. 830 831def run_with_tz(tz): 832 def decorator(func): 833 def inner(*args, **kwds): 834 try: 835 tzset = time.tzset 836 except AttributeError: 837 raise unittest.SkipTest("tzset required") 838 if 'TZ' in os.environ: 839 orig_tz = os.environ['TZ'] 840 else: 841 orig_tz = None 842 os.environ['TZ'] = tz 843 tzset() 844 845 # now run the function, resetting the tz on exceptions 846 try: 847 return func(*args, **kwds) 848 finally: 849 if orig_tz is None: 850 del os.environ['TZ'] 851 else: 852 os.environ['TZ'] = orig_tz 853 time.tzset() 854 855 inner.__name__ = func.__name__ 856 inner.__doc__ = func.__doc__ 857 return inner 858 return decorator 859 860#======================================================================= 861# Big-memory-test support. Separate from 'resources' because memory use 862# should be configurable. 863 864# Some handy shorthands. Note that these are used for byte-limits as well 865# as size-limits, in the various bigmem tests 866_1M = 1024*1024 867_1G = 1024 * _1M 868_2G = 2 * _1G 869_4G = 4 * _1G 870 871MAX_Py_ssize_t = sys.maxsize 872 873def set_memlimit(limit): 874 global max_memuse 875 global real_max_memuse 876 sizes = { 877 'k': 1024, 878 'm': _1M, 879 'g': _1G, 880 't': 1024*_1G, 881 } 882 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 883 re.IGNORECASE | re.VERBOSE) 884 if m is None: 885 raise ValueError('Invalid memory limit %r' % (limit,)) 886 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 887 real_max_memuse = memlimit 888 if memlimit > MAX_Py_ssize_t: 889 memlimit = MAX_Py_ssize_t 890 if memlimit < _2G - 1: 891 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 892 max_memuse = memlimit 893 894class _MemoryWatchdog: 895 """An object which periodically watches the process' memory consumption 896 and prints it out. 897 """ 898 899 def __init__(self): 900 self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) 901 self.started = False 902 903 def start(self): 904 import warnings 905 try: 906 f = open(self.procfile, 'r') 907 except OSError as e: 908 warnings.warn('/proc not available for stats: {}'.format(e), 909 RuntimeWarning) 910 sys.stderr.flush() 911 return 912 913 import subprocess 914 with f: 915 watchdog_script = findfile("memory_watchdog.py") 916 self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], 917 stdin=f, 918 stderr=subprocess.DEVNULL) 919 self.started = True 920 921 def stop(self): 922 if self.started: 923 self.mem_watchdog.terminate() 924 self.mem_watchdog.wait() 925 926 927def bigmemtest(size, memuse, dry_run=True): 928 """Decorator for bigmem tests. 929 930 'size' is a requested size for the test (in arbitrary, test-interpreted 931 units.) 'memuse' is the number of bytes per unit for the test, or a good 932 estimate of it. For example, a test that needs two byte buffers, of 4 GiB 933 each, could be decorated with @bigmemtest(size=_4G, memuse=2). 934 935 The 'size' argument is normally passed to the decorated test method as an 936 extra argument. If 'dry_run' is true, the value passed to the test method 937 may be less than the requested value. If 'dry_run' is false, it means the 938 test doesn't support dummy runs when -M is not specified. 939 """ 940 def decorator(f): 941 def wrapper(self): 942 size = wrapper.size 943 memuse = wrapper.memuse 944 if not real_max_memuse: 945 maxsize = 5147 946 else: 947 maxsize = size 948 949 if ((real_max_memuse or not dry_run) 950 and real_max_memuse < maxsize * memuse): 951 raise unittest.SkipTest( 952 "not enough memory: %.1fG minimum needed" 953 % (size * memuse / (1024 ** 3))) 954 955 if real_max_memuse and verbose: 956 print() 957 print(" ... expected peak memory use: {peak:.1f}G" 958 .format(peak=size * memuse / (1024 ** 3))) 959 watchdog = _MemoryWatchdog() 960 watchdog.start() 961 else: 962 watchdog = None 963 964 try: 965 return f(self, maxsize) 966 finally: 967 if watchdog: 968 watchdog.stop() 969 970 wrapper.size = size 971 wrapper.memuse = memuse 972 return wrapper 973 return decorator 974 975def bigaddrspacetest(f): 976 """Decorator for tests that fill the address space.""" 977 def wrapper(self): 978 if max_memuse < MAX_Py_ssize_t: 979 if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: 980 raise unittest.SkipTest( 981 "not enough memory: try a 32-bit build instead") 982 else: 983 raise unittest.SkipTest( 984 "not enough memory: %.1fG minimum needed" 985 % (MAX_Py_ssize_t / (1024 ** 3))) 986 else: 987 return f(self) 988 return wrapper 989 990#======================================================================= 991# unittest integration. 992 993class BasicTestRunner: 994 def run(self, test): 995 result = unittest.TestResult() 996 test(result) 997 return result 998 999def _id(obj): 1000 return obj 1001 1002def requires_resource(resource): 1003 if resource == 'gui' and not _is_gui_available(): 1004 return unittest.skip(_is_gui_available.reason) 1005 if is_resource_enabled(resource): 1006 return _id 1007 else: 1008 return unittest.skip("resource {0!r} is not enabled".format(resource)) 1009 1010def cpython_only(test): 1011 """ 1012 Decorator for tests only applicable on CPython. 1013 """ 1014 return impl_detail(cpython=True)(test) 1015 1016def impl_detail(msg=None, **guards): 1017 if check_impl_detail(**guards): 1018 return _id 1019 if msg is None: 1020 guardnames, default = _parse_guards(guards) 1021 if default: 1022 msg = "implementation detail not available on {0}" 1023 else: 1024 msg = "implementation detail specific to {0}" 1025 guardnames = sorted(guardnames.keys()) 1026 msg = msg.format(' or '.join(guardnames)) 1027 return unittest.skip(msg) 1028 1029def _parse_guards(guards): 1030 # Returns a tuple ({platform_name: run_me}, default_value) 1031 if not guards: 1032 return ({'cpython': True}, False) 1033 is_true = list(guards.values())[0] 1034 assert list(guards.values()) == [is_true] * len(guards) # all True or all False 1035 return (guards, not is_true) 1036 1037# Use the following check to guard CPython's implementation-specific tests -- 1038# or to run them only on the implementation(s) guarded by the arguments. 1039def check_impl_detail(**guards): 1040 """This function returns True or False depending on the host platform. 1041 Examples: 1042 if check_impl_detail(): # only on CPython (default) 1043 if check_impl_detail(jython=True): # only on Jython 1044 if check_impl_detail(cpython=False): # everywhere except on CPython 1045 """ 1046 guards, default = _parse_guards(guards) 1047 return guards.get(sys.implementation.name, default) 1048 1049 1050def no_tracing(func): 1051 """Decorator to temporarily turn off tracing for the duration of a test.""" 1052 if not hasattr(sys, 'gettrace'): 1053 return func 1054 else: 1055 @functools.wraps(func) 1056 def wrapper(*args, **kwargs): 1057 original_trace = sys.gettrace() 1058 try: 1059 sys.settrace(None) 1060 return func(*args, **kwargs) 1061 finally: 1062 sys.settrace(original_trace) 1063 return wrapper 1064 1065 1066def refcount_test(test): 1067 """Decorator for tests which involve reference counting. 1068 1069 To start, the decorator does not run the test if is not run by CPython. 1070 After that, any trace function is unset during the test to prevent 1071 unexpected refcounts caused by the trace function. 1072 1073 """ 1074 return no_tracing(cpython_only(test)) 1075 1076 1077def _filter_suite(suite, pred): 1078 """Recursively filter test cases in a suite based on a predicate.""" 1079 newtests = [] 1080 for test in suite._tests: 1081 if isinstance(test, unittest.TestSuite): 1082 _filter_suite(test, pred) 1083 newtests.append(test) 1084 else: 1085 if pred(test): 1086 newtests.append(test) 1087 suite._tests = newtests 1088 1089def _run_suite(suite): 1090 """Run tests from a unittest.TestSuite-derived class.""" 1091 runner = get_test_runner(sys.stdout, 1092 verbosity=verbose, 1093 capture_output=(junit_xml_list is not None)) 1094 1095 result = runner.run(suite) 1096 1097 if junit_xml_list is not None: 1098 junit_xml_list.append(result.get_xml_element()) 1099 1100 if not result.testsRun and not result.skipped and not result.errors: 1101 raise TestDidNotRun 1102 if not result.wasSuccessful(): 1103 if len(result.errors) == 1 and not result.failures: 1104 err = result.errors[0][1] 1105 elif len(result.failures) == 1 and not result.errors: 1106 err = result.failures[0][1] 1107 else: 1108 err = "multiple errors occurred" 1109 if not verbose: err += "; run in verbose mode for details" 1110 errors = [(str(tc), exc_str) for tc, exc_str in result.errors] 1111 failures = [(str(tc), exc_str) for tc, exc_str in result.failures] 1112 raise TestFailedWithDetails(err, errors, failures) 1113 1114 1115# By default, don't filter tests 1116_match_test_func = None 1117 1118_accept_test_patterns = None 1119_ignore_test_patterns = None 1120 1121 1122def match_test(test): 1123 # Function used by support.run_unittest() and regrtest --list-cases 1124 if _match_test_func is None: 1125 return True 1126 else: 1127 return _match_test_func(test.id()) 1128 1129 1130def _is_full_match_test(pattern): 1131 # If a pattern contains at least one dot, it's considered 1132 # as a full test identifier. 1133 # Example: 'test.test_os.FileTests.test_access'. 1134 # 1135 # ignore patterns which contain fnmatch patterns: '*', '?', '[...]' 1136 # or '[!...]'. For example, ignore 'test_access*'. 1137 return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) 1138 1139 1140def set_match_tests(accept_patterns=None, ignore_patterns=None): 1141 global _match_test_func, _accept_test_patterns, _ignore_test_patterns 1142 1143 1144 if accept_patterns is None: 1145 accept_patterns = () 1146 if ignore_patterns is None: 1147 ignore_patterns = () 1148 1149 accept_func = ignore_func = None 1150 1151 if accept_patterns != _accept_test_patterns: 1152 accept_patterns, accept_func = _compile_match_function(accept_patterns) 1153 if ignore_patterns != _ignore_test_patterns: 1154 ignore_patterns, ignore_func = _compile_match_function(ignore_patterns) 1155 1156 # Create a copy since patterns can be mutable and so modified later 1157 _accept_test_patterns = tuple(accept_patterns) 1158 _ignore_test_patterns = tuple(ignore_patterns) 1159 1160 if accept_func is not None or ignore_func is not None: 1161 def match_function(test_id): 1162 accept = True 1163 ignore = False 1164 if accept_func: 1165 accept = accept_func(test_id) 1166 if ignore_func: 1167 ignore = ignore_func(test_id) 1168 return accept and not ignore 1169 1170 _match_test_func = match_function 1171 1172 1173def _compile_match_function(patterns): 1174 if not patterns: 1175 func = None 1176 # set_match_tests(None) behaves as set_match_tests(()) 1177 patterns = () 1178 elif all(map(_is_full_match_test, patterns)): 1179 # Simple case: all patterns are full test identifier. 1180 # The test.bisect_cmd utility only uses such full test identifiers. 1181 func = set(patterns).__contains__ 1182 else: 1183 import fnmatch 1184 regex = '|'.join(map(fnmatch.translate, patterns)) 1185 # The search *is* case sensitive on purpose: 1186 # don't use flags=re.IGNORECASE 1187 regex_match = re.compile(regex).match 1188 1189 def match_test_regex(test_id): 1190 if regex_match(test_id): 1191 # The regex matches the whole identifier, for example 1192 # 'test.test_os.FileTests.test_access'. 1193 return True 1194 else: 1195 # Try to match parts of the test identifier. 1196 # For example, split 'test.test_os.FileTests.test_access' 1197 # into: 'test', 'test_os', 'FileTests' and 'test_access'. 1198 return any(map(regex_match, test_id.split("."))) 1199 1200 func = match_test_regex 1201 1202 return patterns, func 1203 1204 1205def run_unittest(*classes): 1206 """Run tests from unittest.TestCase-derived classes.""" 1207 valid_types = (unittest.TestSuite, unittest.TestCase) 1208 loader = unittest.TestLoader() 1209 suite = unittest.TestSuite() 1210 for cls in classes: 1211 if isinstance(cls, str): 1212 if cls in sys.modules: 1213 suite.addTest(loader.loadTestsFromModule(sys.modules[cls])) 1214 else: 1215 raise ValueError("str arguments must be keys in sys.modules") 1216 elif isinstance(cls, valid_types): 1217 suite.addTest(cls) 1218 else: 1219 suite.addTest(loader.loadTestsFromTestCase(cls)) 1220 _filter_suite(suite, match_test) 1221 _run_suite(suite) 1222 1223#======================================================================= 1224# Check for the presence of docstrings. 1225 1226# Rather than trying to enumerate all the cases where docstrings may be 1227# disabled, we just check for that directly 1228 1229def _check_docstrings(): 1230 """Just used to check if docstrings are enabled""" 1231 1232MISSING_C_DOCSTRINGS = (check_impl_detail() and 1233 sys.platform != 'win32' and 1234 not sysconfig.get_config_var('WITH_DOC_STRINGS')) 1235 1236HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and 1237 not MISSING_C_DOCSTRINGS) 1238 1239requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 1240 "test requires docstrings") 1241 1242 1243#======================================================================= 1244# doctest driver. 1245 1246def run_doctest(module, verbosity=None, optionflags=0): 1247 """Run doctest on the given module. Return (#failures, #tests). 1248 1249 If optional argument verbosity is not specified (or is None), pass 1250 support's belief about verbosity on to doctest. Else doctest's 1251 usual behavior is used (it searches sys.argv for -v). 1252 """ 1253 1254 import doctest 1255 1256 if verbosity is None: 1257 verbosity = verbose 1258 else: 1259 verbosity = None 1260 1261 f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) 1262 if f: 1263 raise TestFailed("%d of %d doctests failed" % (f, t)) 1264 if verbose: 1265 print('doctest (%s) ... %d tests with zero failures' % 1266 (module.__name__, t)) 1267 return f, t 1268 1269 1270#======================================================================= 1271# Support for saving and restoring the imported modules. 1272 1273def flush_std_streams(): 1274 if sys.stdout is not None: 1275 sys.stdout.flush() 1276 if sys.stderr is not None: 1277 sys.stderr.flush() 1278 1279 1280def print_warning(msg): 1281 # bpo-45410: Explicitly flush stdout to keep logs in order 1282 flush_std_streams() 1283 stream = print_warning.orig_stderr 1284 for line in msg.splitlines(): 1285 print(f"Warning -- {line}", file=stream) 1286 stream.flush() 1287 1288# bpo-39983: Store the original sys.stderr at Python startup to be able to 1289# log warnings even if sys.stderr is captured temporarily by a test. 1290print_warning.orig_stderr = sys.stderr 1291 1292 1293# Flag used by saved_test_environment of test.libregrtest.save_env, 1294# to check if a test modified the environment. The flag should be set to False 1295# before running a new test. 1296# 1297# For example, threading_helper.threading_cleanup() sets the flag is the function fails 1298# to cleanup threads. 1299environment_altered = False 1300 1301def reap_children(): 1302 """Use this function at the end of test_main() whenever sub-processes 1303 are started. This will help ensure that no extra children (zombies) 1304 stick around to hog resources and create problems when looking 1305 for refleaks. 1306 """ 1307 global environment_altered 1308 1309 # Need os.waitpid(-1, os.WNOHANG): Windows is not supported 1310 if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): 1311 return 1312 elif not has_subprocess_support: 1313 return 1314 1315 # Reap all our dead child processes so we don't leave zombies around. 1316 # These hog resources and might be causing some of the buildbots to die. 1317 while True: 1318 try: 1319 # Read the exit status of any child process which already completed 1320 pid, status = os.waitpid(-1, os.WNOHANG) 1321 except OSError: 1322 break 1323 1324 if pid == 0: 1325 break 1326 1327 print_warning(f"reap_children() reaped child process {pid}") 1328 environment_altered = True 1329 1330 1331@contextlib.contextmanager 1332def swap_attr(obj, attr, new_val): 1333 """Temporary swap out an attribute with a new object. 1334 1335 Usage: 1336 with swap_attr(obj, "attr", 5): 1337 ... 1338 1339 This will set obj.attr to 5 for the duration of the with: block, 1340 restoring the old value at the end of the block. If `attr` doesn't 1341 exist on `obj`, it will be created and then deleted at the end of the 1342 block. 1343 1344 The old value (or None if it doesn't exist) will be assigned to the 1345 target of the "as" clause, if there is one. 1346 """ 1347 if hasattr(obj, attr): 1348 real_val = getattr(obj, attr) 1349 setattr(obj, attr, new_val) 1350 try: 1351 yield real_val 1352 finally: 1353 setattr(obj, attr, real_val) 1354 else: 1355 setattr(obj, attr, new_val) 1356 try: 1357 yield 1358 finally: 1359 if hasattr(obj, attr): 1360 delattr(obj, attr) 1361 1362@contextlib.contextmanager 1363def swap_item(obj, item, new_val): 1364 """Temporary swap out an item with a new object. 1365 1366 Usage: 1367 with swap_item(obj, "item", 5): 1368 ... 1369 1370 This will set obj["item"] to 5 for the duration of the with: block, 1371 restoring the old value at the end of the block. If `item` doesn't 1372 exist on `obj`, it will be created and then deleted at the end of the 1373 block. 1374 1375 The old value (or None if it doesn't exist) will be assigned to the 1376 target of the "as" clause, if there is one. 1377 """ 1378 if item in obj: 1379 real_val = obj[item] 1380 obj[item] = new_val 1381 try: 1382 yield real_val 1383 finally: 1384 obj[item] = real_val 1385 else: 1386 obj[item] = new_val 1387 try: 1388 yield 1389 finally: 1390 if item in obj: 1391 del obj[item] 1392 1393def args_from_interpreter_flags(): 1394 """Return a list of command-line arguments reproducing the current 1395 settings in sys.flags and sys.warnoptions.""" 1396 import subprocess 1397 return subprocess._args_from_interpreter_flags() 1398 1399def optim_args_from_interpreter_flags(): 1400 """Return a list of command-line arguments reproducing the current 1401 optimization settings in sys.flags.""" 1402 import subprocess 1403 return subprocess._optim_args_from_interpreter_flags() 1404 1405 1406class Matcher(object): 1407 1408 _partial_matches = ('msg', 'message') 1409 1410 def matches(self, d, **kwargs): 1411 """ 1412 Try to match a single dict with the supplied arguments. 1413 1414 Keys whose values are strings and which are in self._partial_matches 1415 will be checked for partial (i.e. substring) matches. You can extend 1416 this scheme to (for example) do regular expression matching, etc. 1417 """ 1418 result = True 1419 for k in kwargs: 1420 v = kwargs[k] 1421 dv = d.get(k) 1422 if not self.match_value(k, dv, v): 1423 result = False 1424 break 1425 return result 1426 1427 def match_value(self, k, dv, v): 1428 """ 1429 Try to match a single stored value (dv) with a supplied value (v). 1430 """ 1431 if type(v) != type(dv): 1432 result = False 1433 elif type(dv) is not str or k not in self._partial_matches: 1434 result = (v == dv) 1435 else: 1436 result = dv.find(v) >= 0 1437 return result 1438 1439 1440_buggy_ucrt = None 1441def skip_if_buggy_ucrt_strfptime(test): 1442 """ 1443 Skip decorator for tests that use buggy strptime/strftime 1444 1445 If the UCRT bugs are present time.localtime().tm_zone will be 1446 an empty string, otherwise we assume the UCRT bugs are fixed 1447 1448 See bpo-37552 [Windows] strptime/strftime return invalid 1449 results with UCRT version 17763.615 1450 """ 1451 import locale 1452 global _buggy_ucrt 1453 if _buggy_ucrt is None: 1454 if(sys.platform == 'win32' and 1455 locale.getencoding() == 'cp65001' and 1456 time.localtime().tm_zone == ''): 1457 _buggy_ucrt = True 1458 else: 1459 _buggy_ucrt = False 1460 return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test 1461 1462class PythonSymlink: 1463 """Creates a symlink for the current Python executable""" 1464 def __init__(self, link=None): 1465 from .os_helper import TESTFN 1466 1467 self.link = link or os.path.abspath(TESTFN) 1468 self._linked = [] 1469 self.real = os.path.realpath(sys.executable) 1470 self._also_link = [] 1471 1472 self._env = None 1473 1474 self._platform_specific() 1475 1476 if sys.platform == "win32": 1477 def _platform_specific(self): 1478 import glob 1479 import _winapi 1480 1481 if os.path.lexists(self.real) and not os.path.exists(self.real): 1482 # App symlink appears to not exist, but we want the 1483 # real executable here anyway 1484 self.real = _winapi.GetModuleFileName(0) 1485 1486 dll = _winapi.GetModuleFileName(sys.dllhandle) 1487 src_dir = os.path.dirname(dll) 1488 dest_dir = os.path.dirname(self.link) 1489 self._also_link.append(( 1490 dll, 1491 os.path.join(dest_dir, os.path.basename(dll)) 1492 )) 1493 for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")): 1494 self._also_link.append(( 1495 runtime, 1496 os.path.join(dest_dir, os.path.basename(runtime)) 1497 )) 1498 1499 self._env = {k.upper(): os.getenv(k) for k in os.environ} 1500 self._env["PYTHONHOME"] = os.path.dirname(self.real) 1501 if sysconfig.is_python_build(): 1502 self._env["PYTHONPATH"] = STDLIB_DIR 1503 else: 1504 def _platform_specific(self): 1505 pass 1506 1507 def __enter__(self): 1508 os.symlink(self.real, self.link) 1509 self._linked.append(self.link) 1510 for real, link in self._also_link: 1511 os.symlink(real, link) 1512 self._linked.append(link) 1513 return self 1514 1515 def __exit__(self, exc_type, exc_value, exc_tb): 1516 for link in self._linked: 1517 try: 1518 os.remove(link) 1519 except IOError as ex: 1520 if verbose: 1521 print("failed to clean up {}: {}".format(link, ex)) 1522 1523 def _call(self, python, args, env, returncode): 1524 import subprocess 1525 cmd = [python, *args] 1526 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 1527 stderr=subprocess.PIPE, env=env) 1528 r = p.communicate() 1529 if p.returncode != returncode: 1530 if verbose: 1531 print(repr(r[0])) 1532 print(repr(r[1]), file=sys.stderr) 1533 raise RuntimeError( 1534 'unexpected return code: {0} (0x{0:08X})'.format(p.returncode)) 1535 return r 1536 1537 def call_real(self, *args, returncode=0): 1538 return self._call(self.real, args, None, returncode) 1539 1540 def call_link(self, *args, returncode=0): 1541 return self._call(self.link, args, self._env, returncode) 1542 1543 1544def skip_if_pgo_task(test): 1545 """Skip decorator for tests not run in (non-extended) PGO task""" 1546 ok = not PGO or PGO_EXTENDED 1547 msg = "Not run for (non-extended) PGO task" 1548 return test if ok else unittest.skip(msg)(test) 1549 1550 1551def detect_api_mismatch(ref_api, other_api, *, ignore=()): 1552 """Returns the set of items in ref_api not in other_api, except for a 1553 defined list of items to be ignored in this check. 1554 1555 By default this skips private attributes beginning with '_' but 1556 includes all magic methods, i.e. those starting and ending in '__'. 1557 """ 1558 missing_items = set(dir(ref_api)) - set(dir(other_api)) 1559 if ignore: 1560 missing_items -= set(ignore) 1561 missing_items = set(m for m in missing_items 1562 if not m.startswith('_') or m.endswith('__')) 1563 return missing_items 1564 1565 1566def check__all__(test_case, module, name_of_module=None, extra=(), 1567 not_exported=()): 1568 """Assert that the __all__ variable of 'module' contains all public names. 1569 1570 The module's public names (its API) are detected automatically based on 1571 whether they match the public name convention and were defined in 1572 'module'. 1573 1574 The 'name_of_module' argument can specify (as a string or tuple thereof) 1575 what module(s) an API could be defined in in order to be detected as a 1576 public API. One case for this is when 'module' imports part of its public 1577 API from other modules, possibly a C backend (like 'csv' and its '_csv'). 1578 1579 The 'extra' argument can be a set of names that wouldn't otherwise be 1580 automatically detected as "public", like objects without a proper 1581 '__module__' attribute. If provided, it will be added to the 1582 automatically detected ones. 1583 1584 The 'not_exported' argument can be a set of names that must not be treated 1585 as part of the public API even though their names indicate otherwise. 1586 1587 Usage: 1588 import bar 1589 import foo 1590 import unittest 1591 from test import support 1592 1593 class MiscTestCase(unittest.TestCase): 1594 def test__all__(self): 1595 support.check__all__(self, foo) 1596 1597 class OtherTestCase(unittest.TestCase): 1598 def test__all__(self): 1599 extra = {'BAR_CONST', 'FOO_CONST'} 1600 not_exported = {'baz'} # Undocumented name. 1601 # bar imports part of its API from _bar. 1602 support.check__all__(self, bar, ('bar', '_bar'), 1603 extra=extra, not_exported=not_exported) 1604 1605 """ 1606 1607 if name_of_module is None: 1608 name_of_module = (module.__name__, ) 1609 elif isinstance(name_of_module, str): 1610 name_of_module = (name_of_module, ) 1611 1612 expected = set(extra) 1613 1614 for name in dir(module): 1615 if name.startswith('_') or name in not_exported: 1616 continue 1617 obj = getattr(module, name) 1618 if (getattr(obj, '__module__', None) in name_of_module or 1619 (not hasattr(obj, '__module__') and 1620 not isinstance(obj, types.ModuleType))): 1621 expected.add(name) 1622 test_case.assertCountEqual(module.__all__, expected) 1623 1624 1625def suppress_msvcrt_asserts(verbose=False): 1626 try: 1627 import msvcrt 1628 except ImportError: 1629 return 1630 1631 msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS 1632 | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT 1633 | msvcrt.SEM_NOGPFAULTERRORBOX 1634 | msvcrt.SEM_NOOPENFILEERRORBOX) 1635 1636 # CrtSetReportMode() is only available in debug build 1637 if hasattr(msvcrt, 'CrtSetReportMode'): 1638 for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: 1639 if verbose: 1640 msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) 1641 msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) 1642 else: 1643 msvcrt.CrtSetReportMode(m, 0) 1644 1645 1646class SuppressCrashReport: 1647 """Try to prevent a crash report from popping up. 1648 1649 On Windows, don't display the Windows Error Reporting dialog. On UNIX, 1650 disable the creation of coredump file. 1651 """ 1652 old_value = None 1653 old_modes = None 1654 1655 def __enter__(self): 1656 """On Windows, disable Windows Error Reporting dialogs using 1657 SetErrorMode() and CrtSetReportMode(). 1658 1659 On UNIX, try to save the previous core file size limit, then set 1660 soft limit to 0. 1661 """ 1662 if sys.platform.startswith('win'): 1663 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx 1664 try: 1665 import msvcrt 1666 except ImportError: 1667 return 1668 1669 self.old_value = msvcrt.GetErrorMode() 1670 1671 msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX) 1672 1673 # bpo-23314: Suppress assert dialogs in debug builds. 1674 # CrtSetReportMode() is only available in debug build. 1675 if hasattr(msvcrt, 'CrtSetReportMode'): 1676 self.old_modes = {} 1677 for report_type in [msvcrt.CRT_WARN, 1678 msvcrt.CRT_ERROR, 1679 msvcrt.CRT_ASSERT]: 1680 old_mode = msvcrt.CrtSetReportMode(report_type, 1681 msvcrt.CRTDBG_MODE_FILE) 1682 old_file = msvcrt.CrtSetReportFile(report_type, 1683 msvcrt.CRTDBG_FILE_STDERR) 1684 self.old_modes[report_type] = old_mode, old_file 1685 1686 else: 1687 try: 1688 import resource 1689 self.resource = resource 1690 except ImportError: 1691 self.resource = None 1692 if self.resource is not None: 1693 try: 1694 self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE) 1695 self.resource.setrlimit(self.resource.RLIMIT_CORE, 1696 (0, self.old_value[1])) 1697 except (ValueError, OSError): 1698 pass 1699 1700 if sys.platform == 'darwin': 1701 import subprocess 1702 # Check if the 'Crash Reporter' on OSX was configured 1703 # in 'Developer' mode and warn that it will get triggered 1704 # when it is. 1705 # 1706 # This assumes that this context manager is used in tests 1707 # that might trigger the next manager. 1708 cmd = ['/usr/bin/defaults', 'read', 1709 'com.apple.CrashReporter', 'DialogType'] 1710 proc = subprocess.Popen(cmd, 1711 stdout=subprocess.PIPE, 1712 stderr=subprocess.PIPE) 1713 with proc: 1714 stdout = proc.communicate()[0] 1715 if stdout.strip() == b'developer': 1716 print("this test triggers the Crash Reporter, " 1717 "that is intentional", end='', flush=True) 1718 1719 return self 1720 1721 def __exit__(self, *ignore_exc): 1722 """Restore Windows ErrorMode or core file behavior to initial value.""" 1723 if self.old_value is None: 1724 return 1725 1726 if sys.platform.startswith('win'): 1727 import msvcrt 1728 msvcrt.SetErrorMode(self.old_value) 1729 1730 if self.old_modes: 1731 for report_type, (old_mode, old_file) in self.old_modes.items(): 1732 msvcrt.CrtSetReportMode(report_type, old_mode) 1733 msvcrt.CrtSetReportFile(report_type, old_file) 1734 else: 1735 if self.resource is not None: 1736 try: 1737 self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value) 1738 except (ValueError, OSError): 1739 pass 1740 1741 1742def patch(test_instance, object_to_patch, attr_name, new_value): 1743 """Override 'object_to_patch'.'attr_name' with 'new_value'. 1744 1745 Also, add a cleanup procedure to 'test_instance' to restore 1746 'object_to_patch' value for 'attr_name'. 1747 The 'attr_name' should be a valid attribute for 'object_to_patch'. 1748 1749 """ 1750 # check that 'attr_name' is a real attribute for 'object_to_patch' 1751 # will raise AttributeError if it does not exist 1752 getattr(object_to_patch, attr_name) 1753 1754 # keep a copy of the old value 1755 attr_is_local = False 1756 try: 1757 old_value = object_to_patch.__dict__[attr_name] 1758 except (AttributeError, KeyError): 1759 old_value = getattr(object_to_patch, attr_name, None) 1760 else: 1761 attr_is_local = True 1762 1763 # restore the value when the test is done 1764 def cleanup(): 1765 if attr_is_local: 1766 setattr(object_to_patch, attr_name, old_value) 1767 else: 1768 delattr(object_to_patch, attr_name) 1769 1770 test_instance.addCleanup(cleanup) 1771 1772 # actually override the attribute 1773 setattr(object_to_patch, attr_name, new_value) 1774 1775 1776@contextlib.contextmanager 1777def patch_list(orig): 1778 """Like unittest.mock.patch.dict, but for lists.""" 1779 try: 1780 saved = orig[:] 1781 yield 1782 finally: 1783 orig[:] = saved 1784 1785 1786def run_in_subinterp(code): 1787 """ 1788 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc 1789 module is enabled. 1790 """ 1791 # Issue #10915, #15751: PyGILState_*() functions don't work with 1792 # sub-interpreters, the tracemalloc module uses these functions internally 1793 try: 1794 import tracemalloc 1795 except ImportError: 1796 pass 1797 else: 1798 if tracemalloc.is_tracing(): 1799 raise unittest.SkipTest("run_in_subinterp() cannot be used " 1800 "if tracemalloc module is tracing " 1801 "memory allocations") 1802 import _testcapi 1803 return _testcapi.run_in_subinterp(code) 1804 1805 1806def check_free_after_iterating(test, iter, cls, args=()): 1807 class A(cls): 1808 def __del__(self): 1809 nonlocal done 1810 done = True 1811 try: 1812 next(it) 1813 except StopIteration: 1814 pass 1815 1816 done = False 1817 it = iter(A(*args)) 1818 # Issue 26494: Shouldn't crash 1819 test.assertRaises(StopIteration, next, it) 1820 # The sequence should be deallocated just after the end of iterating 1821 gc_collect() 1822 test.assertTrue(done) 1823 1824 1825def missing_compiler_executable(cmd_names=[]): 1826 """Check if the compiler components used to build the interpreter exist. 1827 1828 Check for the existence of the compiler executables whose names are listed 1829 in 'cmd_names' or all the compiler executables when 'cmd_names' is empty 1830 and return the first missing executable or None when none is found 1831 missing. 1832 1833 """ 1834 # TODO (PEP 632): alternate check without using distutils 1835 from distutils import ccompiler, sysconfig, spawn, errors 1836 compiler = ccompiler.new_compiler() 1837 sysconfig.customize_compiler(compiler) 1838 if compiler.compiler_type == "msvc": 1839 # MSVC has no executables, so check whether initialization succeeds 1840 try: 1841 compiler.initialize() 1842 except errors.DistutilsPlatformError: 1843 return "msvc" 1844 for name in compiler.executables: 1845 if cmd_names and name not in cmd_names: 1846 continue 1847 cmd = getattr(compiler, name) 1848 if cmd_names: 1849 assert cmd is not None, \ 1850 "the '%s' executable is not configured" % name 1851 elif not cmd: 1852 continue 1853 if spawn.find_executable(cmd[0]) is None: 1854 return cmd[0] 1855 1856 1857_is_android_emulator = None 1858def setswitchinterval(interval): 1859 # Setting a very low gil interval on the Android emulator causes python 1860 # to hang (issue #26939). 1861 minimum_interval = 1e-5 1862 if is_android and interval < minimum_interval: 1863 global _is_android_emulator 1864 if _is_android_emulator is None: 1865 import subprocess 1866 _is_android_emulator = (subprocess.check_output( 1867 ['getprop', 'ro.kernel.qemu']).strip() == b'1') 1868 if _is_android_emulator: 1869 interval = minimum_interval 1870 return sys.setswitchinterval(interval) 1871 1872 1873@contextlib.contextmanager 1874def disable_faulthandler(): 1875 import faulthandler 1876 1877 # use sys.__stderr__ instead of sys.stderr, since regrtest replaces 1878 # sys.stderr with a StringIO which has no file descriptor when a test 1879 # is run with -W/--verbose3. 1880 fd = sys.__stderr__.fileno() 1881 1882 is_enabled = faulthandler.is_enabled() 1883 try: 1884 faulthandler.disable() 1885 yield 1886 finally: 1887 if is_enabled: 1888 faulthandler.enable(file=fd, all_threads=True) 1889 1890 1891class SaveSignals: 1892 """ 1893 Save and restore signal handlers. 1894 1895 This class is only able to save/restore signal handlers registered 1896 by the Python signal module: see bpo-13285 for "external" signal 1897 handlers. 1898 """ 1899 1900 def __init__(self): 1901 import signal 1902 self.signal = signal 1903 self.signals = signal.valid_signals() 1904 # SIGKILL and SIGSTOP signals cannot be ignored nor caught 1905 for signame in ('SIGKILL', 'SIGSTOP'): 1906 try: 1907 signum = getattr(signal, signame) 1908 except AttributeError: 1909 continue 1910 self.signals.remove(signum) 1911 self.handlers = {} 1912 1913 def save(self): 1914 for signum in self.signals: 1915 handler = self.signal.getsignal(signum) 1916 if handler is None: 1917 # getsignal() returns None if a signal handler was not 1918 # registered by the Python signal module, 1919 # and the handler is not SIG_DFL nor SIG_IGN. 1920 # 1921 # Ignore the signal: we cannot restore the handler. 1922 continue 1923 self.handlers[signum] = handler 1924 1925 def restore(self): 1926 for signum, handler in self.handlers.items(): 1927 self.signal.signal(signum, handler) 1928 1929 1930def with_pymalloc(): 1931 import _testcapi 1932 return _testcapi.WITH_PYMALLOC 1933 1934 1935class _ALWAYS_EQ: 1936 """ 1937 Object that is equal to anything. 1938 """ 1939 def __eq__(self, other): 1940 return True 1941 def __ne__(self, other): 1942 return False 1943 1944ALWAYS_EQ = _ALWAYS_EQ() 1945 1946class _NEVER_EQ: 1947 """ 1948 Object that is not equal to anything. 1949 """ 1950 def __eq__(self, other): 1951 return False 1952 def __ne__(self, other): 1953 return True 1954 def __hash__(self): 1955 return 1 1956 1957NEVER_EQ = _NEVER_EQ() 1958 1959@functools.total_ordering 1960class _LARGEST: 1961 """ 1962 Object that is greater than anything (except itself). 1963 """ 1964 def __eq__(self, other): 1965 return isinstance(other, _LARGEST) 1966 def __lt__(self, other): 1967 return False 1968 1969LARGEST = _LARGEST() 1970 1971@functools.total_ordering 1972class _SMALLEST: 1973 """ 1974 Object that is less than anything (except itself). 1975 """ 1976 def __eq__(self, other): 1977 return isinstance(other, _SMALLEST) 1978 def __gt__(self, other): 1979 return False 1980 1981SMALLEST = _SMALLEST() 1982 1983def maybe_get_event_loop_policy(): 1984 """Return the global event loop policy if one is set, else return None.""" 1985 import asyncio.events 1986 return asyncio.events._event_loop_policy 1987 1988# Helpers for testing hashing. 1989NHASHBITS = sys.hash_info.width # number of bits in hash() result 1990assert NHASHBITS in (32, 64) 1991 1992# Return mean and sdev of number of collisions when tossing nballs balls 1993# uniformly at random into nbins bins. By definition, the number of 1994# collisions is the number of balls minus the number of occupied bins at 1995# the end. 1996def collision_stats(nbins, nballs): 1997 n, k = nbins, nballs 1998 # prob a bin empty after k trials = (1 - 1/n)**k 1999 # mean # empty is then n * (1 - 1/n)**k 2000 # so mean # occupied is n - n * (1 - 1/n)**k 2001 # so collisions = k - (n - n*(1 - 1/n)**k) 2002 # 2003 # For the variance: 2004 # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 = 2005 # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty) 2006 # 2007 # Massive cancellation occurs, and, e.g., for a 64-bit hash code 2008 # 1-1/2**64 rounds uselessly to 1.0. Rather than make heroic (and 2009 # error-prone) efforts to rework the naive formulas to avoid those, 2010 # we use the `decimal` module to get plenty of extra precision. 2011 # 2012 # Note: the exact values are straightforward to compute with 2013 # rationals, but in context that's unbearably slow, requiring 2014 # multi-million bit arithmetic. 2015 import decimal 2016 with decimal.localcontext() as ctx: 2017 bits = n.bit_length() * 2 # bits in n**2 2018 # At least that many bits will likely cancel out. 2019 # Use that many decimal digits instead. 2020 ctx.prec = max(bits, 30) 2021 dn = decimal.Decimal(n) 2022 p1empty = ((dn - 1) / dn) ** k 2023 meanempty = n * p1empty 2024 occupied = n - meanempty 2025 collisions = k - occupied 2026 var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty) 2027 return float(collisions), float(var.sqrt()) 2028 2029 2030class catch_unraisable_exception: 2031 """ 2032 Context manager catching unraisable exception using sys.unraisablehook. 2033 2034 Storing the exception value (cm.unraisable.exc_value) creates a reference 2035 cycle. The reference cycle is broken explicitly when the context manager 2036 exits. 2037 2038 Storing the object (cm.unraisable.object) can resurrect it if it is set to 2039 an object which is being finalized. Exiting the context manager clears the 2040 stored object. 2041 2042 Usage: 2043 2044 with support.catch_unraisable_exception() as cm: 2045 # code creating an "unraisable exception" 2046 ... 2047 2048 # check the unraisable exception: use cm.unraisable 2049 ... 2050 2051 # cm.unraisable attribute no longer exists at this point 2052 # (to break a reference cycle) 2053 """ 2054 2055 def __init__(self): 2056 self.unraisable = None 2057 self._old_hook = None 2058 2059 def _hook(self, unraisable): 2060 # Storing unraisable.object can resurrect an object which is being 2061 # finalized. Storing unraisable.exc_value creates a reference cycle. 2062 self.unraisable = unraisable 2063 2064 def __enter__(self): 2065 self._old_hook = sys.unraisablehook 2066 sys.unraisablehook = self._hook 2067 return self 2068 2069 def __exit__(self, *exc_info): 2070 sys.unraisablehook = self._old_hook 2071 del self.unraisable 2072 2073 2074def wait_process(pid, *, exitcode, timeout=None): 2075 """ 2076 Wait until process pid completes and check that the process exit code is 2077 exitcode. 2078 2079 Raise an AssertionError if the process exit code is not equal to exitcode. 2080 2081 If the process runs longer than timeout seconds (LONG_TIMEOUT by default), 2082 kill the process (if signal.SIGKILL is available) and raise an 2083 AssertionError. The timeout feature is not available on Windows. 2084 """ 2085 if os.name != "nt": 2086 import signal 2087 2088 if timeout is None: 2089 timeout = LONG_TIMEOUT 2090 t0 = time.monotonic() 2091 sleep = 0.001 2092 max_sleep = 0.1 2093 while True: 2094 pid2, status = os.waitpid(pid, os.WNOHANG) 2095 if pid2 != 0: 2096 break 2097 # process is still running 2098 2099 dt = time.monotonic() - t0 2100 if dt > timeout: 2101 try: 2102 os.kill(pid, signal.SIGKILL) 2103 os.waitpid(pid, 0) 2104 except OSError: 2105 # Ignore errors like ChildProcessError or PermissionError 2106 pass 2107 2108 raise AssertionError(f"process {pid} is still running " 2109 f"after {dt:.1f} seconds") 2110 2111 sleep = min(sleep * 2, max_sleep) 2112 time.sleep(sleep) 2113 else: 2114 # Windows implementation 2115 pid2, status = os.waitpid(pid, 0) 2116 2117 exitcode2 = os.waitstatus_to_exitcode(status) 2118 if exitcode2 != exitcode: 2119 raise AssertionError(f"process {pid} exited with code {exitcode2}, " 2120 f"but exit code {exitcode} is expected") 2121 2122 # sanity check: it should not fail in practice 2123 if pid2 != pid: 2124 raise AssertionError(f"pid {pid2} != pid {pid}") 2125 2126def skip_if_broken_multiprocessing_synchronize(): 2127 """ 2128 Skip tests if the multiprocessing.synchronize module is missing, if there 2129 is no available semaphore implementation, or if creating a lock raises an 2130 OSError (on Linux only). 2131 """ 2132 from .import_helper import import_module 2133 2134 # Skip tests if the _multiprocessing extension is missing. 2135 import_module('_multiprocessing') 2136 2137 # Skip tests if there is no available semaphore implementation: 2138 # multiprocessing.synchronize requires _multiprocessing.SemLock. 2139 synchronize = import_module('multiprocessing.synchronize') 2140 2141 if sys.platform == "linux": 2142 try: 2143 # bpo-38377: On Linux, creating a semaphore fails with OSError 2144 # if the current user does not have the permission to create 2145 # a file in /dev/shm/ directory. 2146 synchronize.Lock(ctx=None) 2147 except OSError as exc: 2148 raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}") 2149 2150 2151def check_disallow_instantiation(testcase, tp, *args, **kwds): 2152 """ 2153 Check that given type cannot be instantiated using *args and **kwds. 2154 2155 See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag. 2156 """ 2157 mod = tp.__module__ 2158 name = tp.__name__ 2159 if mod != 'builtins': 2160 qualname = f"{mod}.{name}" 2161 else: 2162 qualname = f"{name}" 2163 msg = f"cannot create '{re.escape(qualname)}' instances" 2164 testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds) 2165 2166@contextlib.contextmanager 2167def infinite_recursion(max_depth=75): 2168 """Set a lower limit for tests that interact with infinite recursions 2169 (e.g test_ast.ASTHelpers_Test.test_recursion_direct) since on some 2170 debug windows builds, due to not enough functions being inlined the 2171 stack size might not handle the default recursion limit (1000). See 2172 bpo-11105 for details.""" 2173 2174 original_depth = sys.getrecursionlimit() 2175 try: 2176 sys.setrecursionlimit(max_depth) 2177 yield 2178 finally: 2179 sys.setrecursionlimit(original_depth) 2180 2181def ignore_deprecations_from(module: str, *, like: str) -> object: 2182 token = object() 2183 warnings.filterwarnings( 2184 "ignore", 2185 category=DeprecationWarning, 2186 module=module, 2187 message=like + fr"(?#support{id(token)})", 2188 ) 2189 return token 2190 2191def clear_ignored_deprecations(*tokens: object) -> None: 2192 if not tokens: 2193 raise ValueError("Provide token or tokens returned by ignore_deprecations_from") 2194 2195 new_filters = [] 2196 endswith = tuple(rf"(?#support{id(token)})" for token in tokens) 2197 for action, message, category, module, lineno in warnings.filters: 2198 if action == "ignore" and category is DeprecationWarning: 2199 if isinstance(message, re.Pattern): 2200 msg = message.pattern 2201 else: 2202 msg = message or "" 2203 if msg.endswith(endswith): 2204 continue 2205 new_filters.append((action, message, category, module, lineno)) 2206 if warnings.filters != new_filters: 2207 warnings.filters[:] = new_filters 2208 warnings._filters_mutated() 2209 2210 2211# Skip a test if venv with pip is known to not work. 2212def requires_venv_with_pip(): 2213 # ensurepip requires zlib to open ZIP archives (.whl binary wheel packages) 2214 try: 2215 import zlib 2216 except ImportError: 2217 return unittest.skipIf(True, "venv: ensurepip requires zlib") 2218 2219 # bpo-26610: pip/pep425tags.py requires ctypes. 2220 # gh-92820: setuptools/windows_support.py uses ctypes (setuptools 58.1). 2221 try: 2222 import ctypes 2223 except ImportError: 2224 ctypes = None 2225 return unittest.skipUnless(ctypes, 'venv: pip requires ctypes') 2226 2227 2228@contextlib.contextmanager 2229def adjust_int_max_str_digits(max_digits): 2230 """Temporarily change the integer string conversion length limit.""" 2231 current = sys.get_int_max_str_digits() 2232 try: 2233 sys.set_int_max_str_digits(max_digits) 2234 yield 2235 finally: 2236 sys.set_int_max_str_digits(current) 2237