1import collections.abc
2import contextlib
3import errno
4import os
5import re
6import stat
7import sys
8import time
9import unittest
10import warnings
11
12
13# Filename used for testing
14if os.name == 'java':
15    # Jython disallows @ in module names
16    TESTFN_ASCII = '$test'
17else:
18    TESTFN_ASCII = '@test'
19
20# Disambiguate TESTFN for parallel testing, while letting it remain a valid
21# module name.
22TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
23
24# TESTFN_UNICODE is a non-ascii filename
25TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
26if sys.platform == 'darwin':
27    # In Mac OS X's VFS API file names are, by definition, canonically
28    # decomposed Unicode, encoded using UTF-8. See QA1173:
29    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
30    import unicodedata
31    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
32
33# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
34# encoded by the filesystem encoding (in strict mode). It can be None if we
35# cannot generate such filename.
36TESTFN_UNENCODABLE = None
37if os.name == 'nt':
38    # skip win32s (0) or Windows 9x/ME (1)
39    if sys.getwindowsversion().platform >= 2:
40        # Different kinds of characters from various languages to minimize the
41        # probability that the whole name is encodable to MBCS (issue #9819)
42        TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80"
43        try:
44            TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
45        except UnicodeEncodeError:
46            pass
47        else:
48            print('WARNING: The filename %r CAN be encoded by the filesystem '
49                  'encoding (%s). Unicode filename tests may not be effective'
50                  % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
51            TESTFN_UNENCODABLE = None
52# macOS and Emscripten deny unencodable filenames (invalid utf-8)
53elif sys.platform not in {'darwin', 'emscripten', 'wasi'}:
54    try:
55        # ascii and utf-8 cannot encode the byte 0xff
56        b'\xff'.decode(sys.getfilesystemencoding())
57    except UnicodeDecodeError:
58        # 0xff will be encoded using the surrogate character u+DCFF
59        TESTFN_UNENCODABLE = TESTFN_ASCII \
60            + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
61    else:
62        # File system encoding (eg. ISO-8859-* encodings) can encode
63        # the byte 0xff. Skip some unicode filename tests.
64        pass
65
66# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
67# or an empty string if there is no such character.
68FS_NONASCII = ''
69for character in (
70    # First try printable and common characters to have a readable filename.
71    # For each character, the encoding list are just example of encodings able
72    # to encode the character (the list is not exhaustive).
73
74    # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
75    '\u00E6',
76    # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
77    '\u0130',
78    # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
79    '\u0141',
80    # U+03C6 (Greek Small Letter Phi): cp1253
81    '\u03C6',
82    # U+041A (Cyrillic Capital Letter Ka): cp1251
83    '\u041A',
84    # U+05D0 (Hebrew Letter Alef): Encodable to cp424
85    '\u05D0',
86    # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
87    '\u060C',
88    # U+062A (Arabic Letter Teh): cp720
89    '\u062A',
90    # U+0E01 (Thai Character Ko Kai): cp874
91    '\u0E01',
92
93    # Then try more "special" characters. "special" because they may be
94    # interpreted or displayed differently depending on the exact locale
95    # encoding and the font.
96
97    # U+00A0 (No-Break Space)
98    '\u00A0',
99    # U+20AC (Euro Sign)
100    '\u20AC',
101):
102    try:
103        # If Python is set up to use the legacy 'mbcs' in Windows,
104        # 'replace' error mode is used, and encode() returns b'?'
105        # for characters missing in the ANSI codepage
106        if os.fsdecode(os.fsencode(character)) != character:
107            raise UnicodeError
108    except UnicodeError:
109        pass
110    else:
111        FS_NONASCII = character
112        break
113
114# Save the initial cwd
115SAVEDCWD = os.getcwd()
116
117# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
118# decoded from the filesystem encoding (in strict mode). It can be None if we
119# cannot generate such filename (ex: the latin1 encoding can decode any byte
120# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
121# to the surrogateescape error handler (PEP 383), but not from the filesystem
122# encoding in strict mode.
123TESTFN_UNDECODABLE = None
124for name in (
125    # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
126    # accepts it to create a file or a directory, or don't accept to enter to
127    # such directory (when the bytes name is used). So test b'\xe7' first:
128    # it is not decodable from cp932.
129    b'\xe7w\xf0',
130    # undecodable from ASCII, UTF-8
131    b'\xff',
132    # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
133    # and cp857
134    b'\xae\xd5'
135    # undecodable from UTF-8 (UNIX and Mac OS X)
136    b'\xed\xb2\x80', b'\xed\xb4\x80',
137    # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
138    # cp1253, cp1254, cp1255, cp1257, cp1258
139    b'\x81\x98',
140):
141    try:
142        name.decode(sys.getfilesystemencoding())
143    except UnicodeDecodeError:
144        TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
145        break
146
147if FS_NONASCII:
148    TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII
149else:
150    TESTFN_NONASCII = None
151TESTFN = TESTFN_NONASCII or TESTFN_ASCII
152
153
154def make_bad_fd():
155    """
156    Create an invalid file descriptor by opening and closing a file and return
157    its fd.
158    """
159    file = open(TESTFN, "wb")
160    try:
161        return file.fileno()
162    finally:
163        file.close()
164        unlink(TESTFN)
165
166
167_can_symlink = None
168
169
170def can_symlink():
171    global _can_symlink
172    if _can_symlink is not None:
173        return _can_symlink
174    # WASI / wasmtime prevents symlinks with absolute paths, see man
175    # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
176    # paths. Skip symlink tests on WASI for now.
177    src = os.path.abspath(TESTFN)
178    symlink_path = src + "can_symlink"
179    try:
180        os.symlink(src, symlink_path)
181        can = True
182    except (OSError, NotImplementedError, AttributeError):
183        can = False
184    else:
185        os.remove(symlink_path)
186    _can_symlink = can
187    return can
188
189
190def skip_unless_symlink(test):
191    """Skip decorator for tests that require functional symlink"""
192    ok = can_symlink()
193    msg = "Requires functional symlink implementation"
194    return test if ok else unittest.skip(msg)(test)
195
196
197_can_xattr = None
198
199
200def can_xattr():
201    import tempfile
202    global _can_xattr
203    if _can_xattr is not None:
204        return _can_xattr
205    if not hasattr(os, "setxattr"):
206        can = False
207    else:
208        import platform
209        tmp_dir = tempfile.mkdtemp()
210        tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
211        try:
212            with open(TESTFN, "wb") as fp:
213                try:
214                    # TESTFN & tempfile may use different file systems with
215                    # different capabilities
216                    os.setxattr(tmp_fp, b"user.test", b"")
217                    os.setxattr(tmp_name, b"trusted.foo", b"42")
218                    os.setxattr(fp.fileno(), b"user.test", b"")
219                    # Kernels < 2.6.39 don't respect setxattr flags.
220                    kernel_version = platform.release()
221                    m = re.match(r"2.6.(\d{1,2})", kernel_version)
222                    can = m is None or int(m.group(1)) >= 39
223                except OSError:
224                    can = False
225        finally:
226            unlink(TESTFN)
227            unlink(tmp_name)
228            rmdir(tmp_dir)
229    _can_xattr = can
230    return can
231
232
233def skip_unless_xattr(test):
234    """Skip decorator for tests that require functional extended attributes"""
235    ok = can_xattr()
236    msg = "no non-broken extended attribute support"
237    return test if ok else unittest.skip(msg)(test)
238
239
240_can_chmod = None
241
242def can_chmod():
243    global _can_chmod
244    if _can_chmod is not None:
245        return _can_chmod
246    if not hasattr(os, "chown"):
247        _can_chmod = False
248        return _can_chmod
249    try:
250        with open(TESTFN, "wb") as f:
251            try:
252                os.chmod(TESTFN, 0o777)
253                mode1 = os.stat(TESTFN).st_mode
254                os.chmod(TESTFN, 0o666)
255                mode2 = os.stat(TESTFN).st_mode
256            except OSError as e:
257                can = False
258            else:
259                can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2)
260    finally:
261        unlink(TESTFN)
262    _can_chmod = can
263    return can
264
265
266def skip_unless_working_chmod(test):
267    """Skip tests that require working os.chmod()
268
269    WASI SDK 15.0 cannot change file mode bits.
270    """
271    ok = can_chmod()
272    msg = "requires working os.chmod()"
273    return test if ok else unittest.skip(msg)(test)
274
275
276# Check whether the current effective user has the capability to override
277# DAC (discretionary access control). Typically user root is able to
278# bypass file read, write, and execute permission checks. The capability
279# is independent of the effective user. See capabilities(7).
280_can_dac_override = None
281
282def can_dac_override():
283    global _can_dac_override
284
285    if not can_chmod():
286        _can_dac_override = False
287    if _can_dac_override is not None:
288        return _can_dac_override
289
290    try:
291        with open(TESTFN, "wb") as f:
292            os.chmod(TESTFN, 0o400)
293            try:
294                with open(TESTFN, "wb"):
295                    pass
296            except OSError:
297                _can_dac_override = False
298            else:
299                _can_dac_override = True
300    finally:
301        unlink(TESTFN)
302
303    return _can_dac_override
304
305
306def skip_if_dac_override(test):
307    ok = not can_dac_override()
308    msg = "incompatible with CAP_DAC_OVERRIDE"
309    return test if ok else unittest.skip(msg)(test)
310
311
312def skip_unless_dac_override(test):
313    ok = can_dac_override()
314    msg = "requires CAP_DAC_OVERRIDE"
315    return test if ok else unittest.skip(msg)(test)
316
317
318def unlink(filename):
319    try:
320        _unlink(filename)
321    except (FileNotFoundError, NotADirectoryError):
322        pass
323
324
325if sys.platform.startswith("win"):
326    def _waitfor(func, pathname, waitall=False):
327        # Perform the operation
328        func(pathname)
329        # Now setup the wait loop
330        if waitall:
331            dirname = pathname
332        else:
333            dirname, name = os.path.split(pathname)
334            dirname = dirname or '.'
335        # Check for `pathname` to be removed from the filesystem.
336        # The exponential backoff of the timeout amounts to a total
337        # of ~1 second after which the deletion is probably an error
338        # anyway.
339        # Testing on an [email protected] shows that usually only 1 iteration is
340        # required when contention occurs.
341        timeout = 0.001
342        while timeout < 1.0:
343            # Note we are only testing for the existence of the file(s) in
344            # the contents of the directory regardless of any security or
345            # access rights.  If we have made it this far, we have sufficient
346            # permissions to do that much using Python's equivalent of the
347            # Windows API FindFirstFile.
348            # Other Windows APIs can fail or give incorrect results when
349            # dealing with files that are pending deletion.
350            L = os.listdir(dirname)
351            if not (L if waitall else name in L):
352                return
353            # Increase the timeout and try again
354            time.sleep(timeout)
355            timeout *= 2
356        warnings.warn('tests may fail, delete still pending for ' + pathname,
357                      RuntimeWarning, stacklevel=4)
358
359    def _unlink(filename):
360        _waitfor(os.unlink, filename)
361
362    def _rmdir(dirname):
363        _waitfor(os.rmdir, dirname)
364
365    def _rmtree(path):
366        from test.support import _force_run
367
368        def _rmtree_inner(path):
369            for name in _force_run(path, os.listdir, path):
370                fullname = os.path.join(path, name)
371                try:
372                    mode = os.lstat(fullname).st_mode
373                except OSError as exc:
374                    print("support.rmtree(): os.lstat(%r) failed with %s"
375                          % (fullname, exc),
376                          file=sys.__stderr__)
377                    mode = 0
378                if stat.S_ISDIR(mode):
379                    _waitfor(_rmtree_inner, fullname, waitall=True)
380                    _force_run(fullname, os.rmdir, fullname)
381                else:
382                    _force_run(fullname, os.unlink, fullname)
383        _waitfor(_rmtree_inner, path, waitall=True)
384        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
385
386    def _longpath(path):
387        try:
388            import ctypes
389        except ImportError:
390            # No ctypes means we can't expands paths.
391            pass
392        else:
393            buffer = ctypes.create_unicode_buffer(len(path) * 2)
394            length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
395                                                             len(buffer))
396            if length:
397                return buffer[:length]
398        return path
399else:
400    _unlink = os.unlink
401    _rmdir = os.rmdir
402
403    def _rmtree(path):
404        import shutil
405        try:
406            shutil.rmtree(path)
407            return
408        except OSError:
409            pass
410
411        def _rmtree_inner(path):
412            from test.support import _force_run
413            for name in _force_run(path, os.listdir, path):
414                fullname = os.path.join(path, name)
415                try:
416                    mode = os.lstat(fullname).st_mode
417                except OSError:
418                    mode = 0
419                if stat.S_ISDIR(mode):
420                    _rmtree_inner(fullname)
421                    _force_run(path, os.rmdir, fullname)
422                else:
423                    _force_run(path, os.unlink, fullname)
424        _rmtree_inner(path)
425        os.rmdir(path)
426
427    def _longpath(path):
428        return path
429
430
431def rmdir(dirname):
432    try:
433        _rmdir(dirname)
434    except FileNotFoundError:
435        pass
436
437
438def rmtree(path):
439    try:
440        _rmtree(path)
441    except FileNotFoundError:
442        pass
443
444
445@contextlib.contextmanager
446def temp_dir(path=None, quiet=False):
447    """Return a context manager that creates a temporary directory.
448
449    Arguments:
450
451      path: the directory to create temporarily.  If omitted or None,
452        defaults to creating a temporary directory using tempfile.mkdtemp.
453
454      quiet: if False (the default), the context manager raises an exception
455        on error.  Otherwise, if the path is specified and cannot be
456        created, only a warning is issued.
457
458    """
459    import tempfile
460    dir_created = False
461    if path is None:
462        path = tempfile.mkdtemp()
463        dir_created = True
464        path = os.path.realpath(path)
465    else:
466        try:
467            os.mkdir(path)
468            dir_created = True
469        except OSError as exc:
470            if not quiet:
471                raise
472            warnings.warn(f'tests may fail, unable to create '
473                          f'temporary directory {path!r}: {exc}',
474                          RuntimeWarning, stacklevel=3)
475    if dir_created:
476        pid = os.getpid()
477    try:
478        yield path
479    finally:
480        # In case the process forks, let only the parent remove the
481        # directory. The child has a different process id. (bpo-30028)
482        if dir_created and pid == os.getpid():
483            rmtree(path)
484
485
486@contextlib.contextmanager
487def change_cwd(path, quiet=False):
488    """Return a context manager that changes the current working directory.
489
490    Arguments:
491
492      path: the directory to use as the temporary current working directory.
493
494      quiet: if False (the default), the context manager raises an exception
495        on error.  Otherwise, it issues only a warning and keeps the current
496        working directory the same.
497
498    """
499    saved_dir = os.getcwd()
500    try:
501        os.chdir(os.path.realpath(path))
502    except OSError as exc:
503        if not quiet:
504            raise
505        warnings.warn(f'tests may fail, unable to change the current working '
506                      f'directory to {path!r}: {exc}',
507                      RuntimeWarning, stacklevel=3)
508    try:
509        yield os.getcwd()
510    finally:
511        os.chdir(saved_dir)
512
513
514@contextlib.contextmanager
515def temp_cwd(name='tempcwd', quiet=False):
516    """
517    Context manager that temporarily creates and changes the CWD.
518
519    The function temporarily changes the current working directory
520    after creating a temporary directory in the current directory with
521    name *name*.  If *name* is None, the temporary directory is
522    created using tempfile.mkdtemp.
523
524    If *quiet* is False (default) and it is not possible to
525    create or change the CWD, an error is raised.  If *quiet* is True,
526    only a warning is raised and the original CWD is used.
527
528    """
529    with temp_dir(path=name, quiet=quiet) as temp_path:
530        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
531            yield cwd_dir
532
533
534def create_empty_file(filename):
535    """Create an empty file. If the file already exists, truncate it."""
536    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
537    os.close(fd)
538
539
540@contextlib.contextmanager
541def open_dir_fd(path):
542    """Open a file descriptor to a directory."""
543    assert os.path.isdir(path)
544    flags = os.O_RDONLY
545    if hasattr(os, "O_DIRECTORY"):
546        flags |= os.O_DIRECTORY
547    dir_fd = os.open(path, flags)
548    try:
549        yield dir_fd
550    finally:
551        os.close(dir_fd)
552
553
554def fs_is_case_insensitive(directory):
555    """Detects if the file system for the specified directory
556    is case-insensitive."""
557    import tempfile
558    with tempfile.NamedTemporaryFile(dir=directory) as base:
559        base_path = base.name
560        case_path = base_path.upper()
561        if case_path == base_path:
562            case_path = base_path.lower()
563        try:
564            return os.path.samefile(base_path, case_path)
565        except FileNotFoundError:
566            return False
567
568
569class FakePath:
570    """Simple implementing of the path protocol.
571    """
572    def __init__(self, path):
573        self.path = path
574
575    def __repr__(self):
576        return f'<FakePath {self.path!r}>'
577
578    def __fspath__(self):
579        if (isinstance(self.path, BaseException) or
580            isinstance(self.path, type) and
581                issubclass(self.path, BaseException)):
582            raise self.path
583        else:
584            return self.path
585
586
587def fd_count():
588    """Count the number of open file descriptors.
589    """
590    if sys.platform.startswith(('linux', 'freebsd', 'emscripten')):
591        try:
592            names = os.listdir("/proc/self/fd")
593            # Subtract one because listdir() internally opens a file
594            # descriptor to list the content of the /proc/self/fd/ directory.
595            return len(names) - 1
596        except FileNotFoundError:
597            pass
598
599    MAXFD = 256
600    if hasattr(os, 'sysconf'):
601        try:
602            MAXFD = os.sysconf("SC_OPEN_MAX")
603        except OSError:
604            pass
605
606    old_modes = None
607    if sys.platform == 'win32':
608        # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
609        # on invalid file descriptor if Python is compiled in debug mode
610        try:
611            import msvcrt
612            msvcrt.CrtSetReportMode
613        except (AttributeError, ImportError):
614            # no msvcrt or a release build
615            pass
616        else:
617            old_modes = {}
618            for report_type in (msvcrt.CRT_WARN,
619                                msvcrt.CRT_ERROR,
620                                msvcrt.CRT_ASSERT):
621                old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
622                                                                 0)
623
624    try:
625        count = 0
626        for fd in range(MAXFD):
627            try:
628                # Prefer dup() over fstat(). fstat() can require input/output
629                # whereas dup() doesn't.
630                fd2 = os.dup(fd)
631            except OSError as e:
632                if e.errno != errno.EBADF:
633                    raise
634            else:
635                os.close(fd2)
636                count += 1
637    finally:
638        if old_modes is not None:
639            for report_type in (msvcrt.CRT_WARN,
640                                msvcrt.CRT_ERROR,
641                                msvcrt.CRT_ASSERT):
642                msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
643
644    return count
645
646
647if hasattr(os, "umask"):
648    @contextlib.contextmanager
649    def temp_umask(umask):
650        """Context manager that temporarily sets the process umask."""
651        oldmask = os.umask(umask)
652        try:
653            yield
654        finally:
655            os.umask(oldmask)
656else:
657    @contextlib.contextmanager
658    def temp_umask(umask):
659        """no-op on platforms without umask()"""
660        yield
661
662
663class EnvironmentVarGuard(collections.abc.MutableMapping):
664
665    """Class to help protect the environment variable properly.  Can be used as
666    a context manager."""
667
668    def __init__(self):
669        self._environ = os.environ
670        self._changed = {}
671
672    def __getitem__(self, envvar):
673        return self._environ[envvar]
674
675    def __setitem__(self, envvar, value):
676        # Remember the initial value on the first access
677        if envvar not in self._changed:
678            self._changed[envvar] = self._environ.get(envvar)
679        self._environ[envvar] = value
680
681    def __delitem__(self, envvar):
682        # Remember the initial value on the first access
683        if envvar not in self._changed:
684            self._changed[envvar] = self._environ.get(envvar)
685        if envvar in self._environ:
686            del self._environ[envvar]
687
688    def keys(self):
689        return self._environ.keys()
690
691    def __iter__(self):
692        return iter(self._environ)
693
694    def __len__(self):
695        return len(self._environ)
696
697    def set(self, envvar, value):
698        self[envvar] = value
699
700    def unset(self, envvar):
701        del self[envvar]
702
703    def copy(self):
704        # We do what os.environ.copy() does.
705        return dict(self)
706
707    def __enter__(self):
708        return self
709
710    def __exit__(self, *ignore_exc):
711        for (k, v) in self._changed.items():
712            if v is None:
713                if k in self._environ:
714                    del self._environ[k]
715            else:
716                self._environ[k] = v
717        os.environ = self._environ
718