1import collections.abc 2import contextlib 3import errno 4import os 5import re 6import stat 7import sys 8import time 9import unittest 10import warnings 11 12 13# Filename used for testing 14if os.name == 'java': 15 # Jython disallows @ in module names 16 TESTFN_ASCII = '$test' 17else: 18 TESTFN_ASCII = '@test' 19 20# Disambiguate TESTFN for parallel testing, while letting it remain a valid 21# module name. 22TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid()) 23 24# TESTFN_UNICODE is a non-ascii filename 25TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f" 26if sys.platform == 'darwin': 27 # In Mac OS X's VFS API file names are, by definition, canonically 28 # decomposed Unicode, encoded using UTF-8. See QA1173: 29 # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html 30 import unicodedata 31 TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) 32 33# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be 34# encoded by the filesystem encoding (in strict mode). It can be None if we 35# cannot generate such filename. 36TESTFN_UNENCODABLE = None 37if os.name == 'nt': 38 # skip win32s (0) or Windows 9x/ME (1) 39 if sys.getwindowsversion().platform >= 2: 40 # Different kinds of characters from various languages to minimize the 41 # probability that the whole name is encodable to MBCS (issue #9819) 42 TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80" 43 try: 44 TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding()) 45 except UnicodeEncodeError: 46 pass 47 else: 48 print('WARNING: The filename %r CAN be encoded by the filesystem ' 49 'encoding (%s). Unicode filename tests may not be effective' 50 % (TESTFN_UNENCODABLE, sys.getfilesystemencoding())) 51 TESTFN_UNENCODABLE = None 52# macOS and Emscripten deny unencodable filenames (invalid utf-8) 53elif sys.platform not in {'darwin', 'emscripten', 'wasi'}: 54 try: 55 # ascii and utf-8 cannot encode the byte 0xff 56 b'\xff'.decode(sys.getfilesystemencoding()) 57 except UnicodeDecodeError: 58 # 0xff will be encoded using the surrogate character u+DCFF 59 TESTFN_UNENCODABLE = TESTFN_ASCII \ 60 + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape') 61 else: 62 # File system encoding (eg. ISO-8859-* encodings) can encode 63 # the byte 0xff. Skip some unicode filename tests. 64 pass 65 66# FS_NONASCII: non-ASCII character encodable by os.fsencode(), 67# or an empty string if there is no such character. 68FS_NONASCII = '' 69for character in ( 70 # First try printable and common characters to have a readable filename. 71 # For each character, the encoding list are just example of encodings able 72 # to encode the character (the list is not exhaustive). 73 74 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 75 '\u00E6', 76 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 77 '\u0130', 78 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 79 '\u0141', 80 # U+03C6 (Greek Small Letter Phi): cp1253 81 '\u03C6', 82 # U+041A (Cyrillic Capital Letter Ka): cp1251 83 '\u041A', 84 # U+05D0 (Hebrew Letter Alef): Encodable to cp424 85 '\u05D0', 86 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic 87 '\u060C', 88 # U+062A (Arabic Letter Teh): cp720 89 '\u062A', 90 # U+0E01 (Thai Character Ko Kai): cp874 91 '\u0E01', 92 93 # Then try more "special" characters. "special" because they may be 94 # interpreted or displayed differently depending on the exact locale 95 # encoding and the font. 96 97 # U+00A0 (No-Break Space) 98 '\u00A0', 99 # U+20AC (Euro Sign) 100 '\u20AC', 101): 102 try: 103 # If Python is set up to use the legacy 'mbcs' in Windows, 104 # 'replace' error mode is used, and encode() returns b'?' 105 # for characters missing in the ANSI codepage 106 if os.fsdecode(os.fsencode(character)) != character: 107 raise UnicodeError 108 except UnicodeError: 109 pass 110 else: 111 FS_NONASCII = character 112 break 113 114# Save the initial cwd 115SAVEDCWD = os.getcwd() 116 117# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be 118# decoded from the filesystem encoding (in strict mode). It can be None if we 119# cannot generate such filename (ex: the latin1 encoding can decode any byte 120# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks 121# to the surrogateescape error handler (PEP 383), but not from the filesystem 122# encoding in strict mode. 123TESTFN_UNDECODABLE = None 124for name in ( 125 # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows 126 # accepts it to create a file or a directory, or don't accept to enter to 127 # such directory (when the bytes name is used). So test b'\xe7' first: 128 # it is not decodable from cp932. 129 b'\xe7w\xf0', 130 # undecodable from ASCII, UTF-8 131 b'\xff', 132 # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 133 # and cp857 134 b'\xae\xd5' 135 # undecodable from UTF-8 (UNIX and Mac OS X) 136 b'\xed\xb2\x80', b'\xed\xb4\x80', 137 # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, 138 # cp1253, cp1254, cp1255, cp1257, cp1258 139 b'\x81\x98', 140): 141 try: 142 name.decode(sys.getfilesystemencoding()) 143 except UnicodeDecodeError: 144 TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name 145 break 146 147if FS_NONASCII: 148 TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII 149else: 150 TESTFN_NONASCII = None 151TESTFN = TESTFN_NONASCII or TESTFN_ASCII 152 153 154def make_bad_fd(): 155 """ 156 Create an invalid file descriptor by opening and closing a file and return 157 its fd. 158 """ 159 file = open(TESTFN, "wb") 160 try: 161 return file.fileno() 162 finally: 163 file.close() 164 unlink(TESTFN) 165 166 167_can_symlink = None 168 169 170def can_symlink(): 171 global _can_symlink 172 if _can_symlink is not None: 173 return _can_symlink 174 # WASI / wasmtime prevents symlinks with absolute paths, see man 175 # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute 176 # paths. Skip symlink tests on WASI for now. 177 src = os.path.abspath(TESTFN) 178 symlink_path = src + "can_symlink" 179 try: 180 os.symlink(src, symlink_path) 181 can = True 182 except (OSError, NotImplementedError, AttributeError): 183 can = False 184 else: 185 os.remove(symlink_path) 186 _can_symlink = can 187 return can 188 189 190def skip_unless_symlink(test): 191 """Skip decorator for tests that require functional symlink""" 192 ok = can_symlink() 193 msg = "Requires functional symlink implementation" 194 return test if ok else unittest.skip(msg)(test) 195 196 197_can_xattr = None 198 199 200def can_xattr(): 201 import tempfile 202 global _can_xattr 203 if _can_xattr is not None: 204 return _can_xattr 205 if not hasattr(os, "setxattr"): 206 can = False 207 else: 208 import platform 209 tmp_dir = tempfile.mkdtemp() 210 tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir) 211 try: 212 with open(TESTFN, "wb") as fp: 213 try: 214 # TESTFN & tempfile may use different file systems with 215 # different capabilities 216 os.setxattr(tmp_fp, b"user.test", b"") 217 os.setxattr(tmp_name, b"trusted.foo", b"42") 218 os.setxattr(fp.fileno(), b"user.test", b"") 219 # Kernels < 2.6.39 don't respect setxattr flags. 220 kernel_version = platform.release() 221 m = re.match(r"2.6.(\d{1,2})", kernel_version) 222 can = m is None or int(m.group(1)) >= 39 223 except OSError: 224 can = False 225 finally: 226 unlink(TESTFN) 227 unlink(tmp_name) 228 rmdir(tmp_dir) 229 _can_xattr = can 230 return can 231 232 233def skip_unless_xattr(test): 234 """Skip decorator for tests that require functional extended attributes""" 235 ok = can_xattr() 236 msg = "no non-broken extended attribute support" 237 return test if ok else unittest.skip(msg)(test) 238 239 240_can_chmod = None 241 242def can_chmod(): 243 global _can_chmod 244 if _can_chmod is not None: 245 return _can_chmod 246 if not hasattr(os, "chown"): 247 _can_chmod = False 248 return _can_chmod 249 try: 250 with open(TESTFN, "wb") as f: 251 try: 252 os.chmod(TESTFN, 0o777) 253 mode1 = os.stat(TESTFN).st_mode 254 os.chmod(TESTFN, 0o666) 255 mode2 = os.stat(TESTFN).st_mode 256 except OSError as e: 257 can = False 258 else: 259 can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2) 260 finally: 261 unlink(TESTFN) 262 _can_chmod = can 263 return can 264 265 266def skip_unless_working_chmod(test): 267 """Skip tests that require working os.chmod() 268 269 WASI SDK 15.0 cannot change file mode bits. 270 """ 271 ok = can_chmod() 272 msg = "requires working os.chmod()" 273 return test if ok else unittest.skip(msg)(test) 274 275 276# Check whether the current effective user has the capability to override 277# DAC (discretionary access control). Typically user root is able to 278# bypass file read, write, and execute permission checks. The capability 279# is independent of the effective user. See capabilities(7). 280_can_dac_override = None 281 282def can_dac_override(): 283 global _can_dac_override 284 285 if not can_chmod(): 286 _can_dac_override = False 287 if _can_dac_override is not None: 288 return _can_dac_override 289 290 try: 291 with open(TESTFN, "wb") as f: 292 os.chmod(TESTFN, 0o400) 293 try: 294 with open(TESTFN, "wb"): 295 pass 296 except OSError: 297 _can_dac_override = False 298 else: 299 _can_dac_override = True 300 finally: 301 unlink(TESTFN) 302 303 return _can_dac_override 304 305 306def skip_if_dac_override(test): 307 ok = not can_dac_override() 308 msg = "incompatible with CAP_DAC_OVERRIDE" 309 return test if ok else unittest.skip(msg)(test) 310 311 312def skip_unless_dac_override(test): 313 ok = can_dac_override() 314 msg = "requires CAP_DAC_OVERRIDE" 315 return test if ok else unittest.skip(msg)(test) 316 317 318def unlink(filename): 319 try: 320 _unlink(filename) 321 except (FileNotFoundError, NotADirectoryError): 322 pass 323 324 325if sys.platform.startswith("win"): 326 def _waitfor(func, pathname, waitall=False): 327 # Perform the operation 328 func(pathname) 329 # Now setup the wait loop 330 if waitall: 331 dirname = pathname 332 else: 333 dirname, name = os.path.split(pathname) 334 dirname = dirname or '.' 335 # Check for `pathname` to be removed from the filesystem. 336 # The exponential backoff of the timeout amounts to a total 337 # of ~1 second after which the deletion is probably an error 338 # anyway. 339 # Testing on an [email protected] shows that usually only 1 iteration is 340 # required when contention occurs. 341 timeout = 0.001 342 while timeout < 1.0: 343 # Note we are only testing for the existence of the file(s) in 344 # the contents of the directory regardless of any security or 345 # access rights. If we have made it this far, we have sufficient 346 # permissions to do that much using Python's equivalent of the 347 # Windows API FindFirstFile. 348 # Other Windows APIs can fail or give incorrect results when 349 # dealing with files that are pending deletion. 350 L = os.listdir(dirname) 351 if not (L if waitall else name in L): 352 return 353 # Increase the timeout and try again 354 time.sleep(timeout) 355 timeout *= 2 356 warnings.warn('tests may fail, delete still pending for ' + pathname, 357 RuntimeWarning, stacklevel=4) 358 359 def _unlink(filename): 360 _waitfor(os.unlink, filename) 361 362 def _rmdir(dirname): 363 _waitfor(os.rmdir, dirname) 364 365 def _rmtree(path): 366 from test.support import _force_run 367 368 def _rmtree_inner(path): 369 for name in _force_run(path, os.listdir, path): 370 fullname = os.path.join(path, name) 371 try: 372 mode = os.lstat(fullname).st_mode 373 except OSError as exc: 374 print("support.rmtree(): os.lstat(%r) failed with %s" 375 % (fullname, exc), 376 file=sys.__stderr__) 377 mode = 0 378 if stat.S_ISDIR(mode): 379 _waitfor(_rmtree_inner, fullname, waitall=True) 380 _force_run(fullname, os.rmdir, fullname) 381 else: 382 _force_run(fullname, os.unlink, fullname) 383 _waitfor(_rmtree_inner, path, waitall=True) 384 _waitfor(lambda p: _force_run(p, os.rmdir, p), path) 385 386 def _longpath(path): 387 try: 388 import ctypes 389 except ImportError: 390 # No ctypes means we can't expands paths. 391 pass 392 else: 393 buffer = ctypes.create_unicode_buffer(len(path) * 2) 394 length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer, 395 len(buffer)) 396 if length: 397 return buffer[:length] 398 return path 399else: 400 _unlink = os.unlink 401 _rmdir = os.rmdir 402 403 def _rmtree(path): 404 import shutil 405 try: 406 shutil.rmtree(path) 407 return 408 except OSError: 409 pass 410 411 def _rmtree_inner(path): 412 from test.support import _force_run 413 for name in _force_run(path, os.listdir, path): 414 fullname = os.path.join(path, name) 415 try: 416 mode = os.lstat(fullname).st_mode 417 except OSError: 418 mode = 0 419 if stat.S_ISDIR(mode): 420 _rmtree_inner(fullname) 421 _force_run(path, os.rmdir, fullname) 422 else: 423 _force_run(path, os.unlink, fullname) 424 _rmtree_inner(path) 425 os.rmdir(path) 426 427 def _longpath(path): 428 return path 429 430 431def rmdir(dirname): 432 try: 433 _rmdir(dirname) 434 except FileNotFoundError: 435 pass 436 437 438def rmtree(path): 439 try: 440 _rmtree(path) 441 except FileNotFoundError: 442 pass 443 444 445@contextlib.contextmanager 446def temp_dir(path=None, quiet=False): 447 """Return a context manager that creates a temporary directory. 448 449 Arguments: 450 451 path: the directory to create temporarily. If omitted or None, 452 defaults to creating a temporary directory using tempfile.mkdtemp. 453 454 quiet: if False (the default), the context manager raises an exception 455 on error. Otherwise, if the path is specified and cannot be 456 created, only a warning is issued. 457 458 """ 459 import tempfile 460 dir_created = False 461 if path is None: 462 path = tempfile.mkdtemp() 463 dir_created = True 464 path = os.path.realpath(path) 465 else: 466 try: 467 os.mkdir(path) 468 dir_created = True 469 except OSError as exc: 470 if not quiet: 471 raise 472 warnings.warn(f'tests may fail, unable to create ' 473 f'temporary directory {path!r}: {exc}', 474 RuntimeWarning, stacklevel=3) 475 if dir_created: 476 pid = os.getpid() 477 try: 478 yield path 479 finally: 480 # In case the process forks, let only the parent remove the 481 # directory. The child has a different process id. (bpo-30028) 482 if dir_created and pid == os.getpid(): 483 rmtree(path) 484 485 486@contextlib.contextmanager 487def change_cwd(path, quiet=False): 488 """Return a context manager that changes the current working directory. 489 490 Arguments: 491 492 path: the directory to use as the temporary current working directory. 493 494 quiet: if False (the default), the context manager raises an exception 495 on error. Otherwise, it issues only a warning and keeps the current 496 working directory the same. 497 498 """ 499 saved_dir = os.getcwd() 500 try: 501 os.chdir(os.path.realpath(path)) 502 except OSError as exc: 503 if not quiet: 504 raise 505 warnings.warn(f'tests may fail, unable to change the current working ' 506 f'directory to {path!r}: {exc}', 507 RuntimeWarning, stacklevel=3) 508 try: 509 yield os.getcwd() 510 finally: 511 os.chdir(saved_dir) 512 513 514@contextlib.contextmanager 515def temp_cwd(name='tempcwd', quiet=False): 516 """ 517 Context manager that temporarily creates and changes the CWD. 518 519 The function temporarily changes the current working directory 520 after creating a temporary directory in the current directory with 521 name *name*. If *name* is None, the temporary directory is 522 created using tempfile.mkdtemp. 523 524 If *quiet* is False (default) and it is not possible to 525 create or change the CWD, an error is raised. If *quiet* is True, 526 only a warning is raised and the original CWD is used. 527 528 """ 529 with temp_dir(path=name, quiet=quiet) as temp_path: 530 with change_cwd(temp_path, quiet=quiet) as cwd_dir: 531 yield cwd_dir 532 533 534def create_empty_file(filename): 535 """Create an empty file. If the file already exists, truncate it.""" 536 fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) 537 os.close(fd) 538 539 540@contextlib.contextmanager 541def open_dir_fd(path): 542 """Open a file descriptor to a directory.""" 543 assert os.path.isdir(path) 544 flags = os.O_RDONLY 545 if hasattr(os, "O_DIRECTORY"): 546 flags |= os.O_DIRECTORY 547 dir_fd = os.open(path, flags) 548 try: 549 yield dir_fd 550 finally: 551 os.close(dir_fd) 552 553 554def fs_is_case_insensitive(directory): 555 """Detects if the file system for the specified directory 556 is case-insensitive.""" 557 import tempfile 558 with tempfile.NamedTemporaryFile(dir=directory) as base: 559 base_path = base.name 560 case_path = base_path.upper() 561 if case_path == base_path: 562 case_path = base_path.lower() 563 try: 564 return os.path.samefile(base_path, case_path) 565 except FileNotFoundError: 566 return False 567 568 569class FakePath: 570 """Simple implementing of the path protocol. 571 """ 572 def __init__(self, path): 573 self.path = path 574 575 def __repr__(self): 576 return f'<FakePath {self.path!r}>' 577 578 def __fspath__(self): 579 if (isinstance(self.path, BaseException) or 580 isinstance(self.path, type) and 581 issubclass(self.path, BaseException)): 582 raise self.path 583 else: 584 return self.path 585 586 587def fd_count(): 588 """Count the number of open file descriptors. 589 """ 590 if sys.platform.startswith(('linux', 'freebsd', 'emscripten')): 591 try: 592 names = os.listdir("/proc/self/fd") 593 # Subtract one because listdir() internally opens a file 594 # descriptor to list the content of the /proc/self/fd/ directory. 595 return len(names) - 1 596 except FileNotFoundError: 597 pass 598 599 MAXFD = 256 600 if hasattr(os, 'sysconf'): 601 try: 602 MAXFD = os.sysconf("SC_OPEN_MAX") 603 except OSError: 604 pass 605 606 old_modes = None 607 if sys.platform == 'win32': 608 # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process 609 # on invalid file descriptor if Python is compiled in debug mode 610 try: 611 import msvcrt 612 msvcrt.CrtSetReportMode 613 except (AttributeError, ImportError): 614 # no msvcrt or a release build 615 pass 616 else: 617 old_modes = {} 618 for report_type in (msvcrt.CRT_WARN, 619 msvcrt.CRT_ERROR, 620 msvcrt.CRT_ASSERT): 621 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 622 0) 623 624 try: 625 count = 0 626 for fd in range(MAXFD): 627 try: 628 # Prefer dup() over fstat(). fstat() can require input/output 629 # whereas dup() doesn't. 630 fd2 = os.dup(fd) 631 except OSError as e: 632 if e.errno != errno.EBADF: 633 raise 634 else: 635 os.close(fd2) 636 count += 1 637 finally: 638 if old_modes is not None: 639 for report_type in (msvcrt.CRT_WARN, 640 msvcrt.CRT_ERROR, 641 msvcrt.CRT_ASSERT): 642 msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) 643 644 return count 645 646 647if hasattr(os, "umask"): 648 @contextlib.contextmanager 649 def temp_umask(umask): 650 """Context manager that temporarily sets the process umask.""" 651 oldmask = os.umask(umask) 652 try: 653 yield 654 finally: 655 os.umask(oldmask) 656else: 657 @contextlib.contextmanager 658 def temp_umask(umask): 659 """no-op on platforms without umask()""" 660 yield 661 662 663class EnvironmentVarGuard(collections.abc.MutableMapping): 664 665 """Class to help protect the environment variable properly. Can be used as 666 a context manager.""" 667 668 def __init__(self): 669 self._environ = os.environ 670 self._changed = {} 671 672 def __getitem__(self, envvar): 673 return self._environ[envvar] 674 675 def __setitem__(self, envvar, value): 676 # Remember the initial value on the first access 677 if envvar not in self._changed: 678 self._changed[envvar] = self._environ.get(envvar) 679 self._environ[envvar] = value 680 681 def __delitem__(self, envvar): 682 # Remember the initial value on the first access 683 if envvar not in self._changed: 684 self._changed[envvar] = self._environ.get(envvar) 685 if envvar in self._environ: 686 del self._environ[envvar] 687 688 def keys(self): 689 return self._environ.keys() 690 691 def __iter__(self): 692 return iter(self._environ) 693 694 def __len__(self): 695 return len(self._environ) 696 697 def set(self, envvar, value): 698 self[envvar] = value 699 700 def unset(self, envvar): 701 del self[envvar] 702 703 def copy(self): 704 # We do what os.environ.copy() does. 705 return dict(self) 706 707 def __enter__(self): 708 return self 709 710 def __exit__(self, *ignore_exc): 711 for (k, v) in self._changed.items(): 712 if v is None: 713 if k in self._environ: 714 del self._environ[k] 715 else: 716 self._environ[k] = v 717 os.environ = self._environ 718