1"Test posix functions"
2
3from test import support
4from test.support import import_helper
5from test.support import os_helper
6from test.support import warnings_helper
7from test.support.script_helper import assert_python_ok
8
9# Skip these tests if there is no posix module.
10posix = import_helper.import_module('posix')
11
12import errno
13import sys
14import signal
15import time
16import os
17import platform
18import stat
19import tempfile
20import unittest
21import warnings
22import textwrap
23from contextlib import contextmanager
24
25try:
26    import pwd
27except ImportError:
28    pwd = None
29
30_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
31                              os_helper.TESTFN + '-dummy-symlink')
32
33requires_32b = unittest.skipUnless(
34    # Emscripten/WASI have 32 bits pointers, but support 64 bits syscall args.
35    sys.maxsize < 2**32 and not (support.is_emscripten or support.is_wasi),
36    'test is only meaningful on 32-bit builds'
37)
38
39def _supports_sched():
40    if not hasattr(posix, 'sched_getscheduler'):
41        return False
42    try:
43        posix.sched_getscheduler(0)
44    except OSError as e:
45        if e.errno == errno.ENOSYS:
46            return False
47    return True
48
49requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API')
50
51
52class PosixTester(unittest.TestCase):
53
54    def setUp(self):
55        # create empty file
56        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
57        with open(os_helper.TESTFN, "wb"):
58            pass
59        self.enterContext(warnings_helper.check_warnings())
60        warnings.filterwarnings('ignore', '.* potential security risk .*',
61                                RuntimeWarning)
62
63    def testNoArgFunctions(self):
64        # test posix functions which take no arguments and have
65        # no side-effects which we need to cleanup (e.g., fork, wait, abort)
66        NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
67                             "times", "getloadavg",
68                             "getegid", "geteuid", "getgid", "getgroups",
69                             "getpid", "getpgrp", "getppid", "getuid", "sync",
70                           ]
71
72        for name in NO_ARG_FUNCTIONS:
73            posix_func = getattr(posix, name, None)
74            if posix_func is not None:
75                with self.subTest(name):
76                    posix_func()
77                    self.assertRaises(TypeError, posix_func, 1)
78
79    @unittest.skipUnless(hasattr(posix, 'getresuid'),
80                         'test needs posix.getresuid()')
81    def test_getresuid(self):
82        user_ids = posix.getresuid()
83        self.assertEqual(len(user_ids), 3)
84        for val in user_ids:
85            self.assertGreaterEqual(val, 0)
86
87    @unittest.skipUnless(hasattr(posix, 'getresgid'),
88                         'test needs posix.getresgid()')
89    def test_getresgid(self):
90        group_ids = posix.getresgid()
91        self.assertEqual(len(group_ids), 3)
92        for val in group_ids:
93            self.assertGreaterEqual(val, 0)
94
95    @unittest.skipUnless(hasattr(posix, 'setresuid'),
96                         'test needs posix.setresuid()')
97    def test_setresuid(self):
98        current_user_ids = posix.getresuid()
99        self.assertIsNone(posix.setresuid(*current_user_ids))
100        # -1 means don't change that value.
101        self.assertIsNone(posix.setresuid(-1, -1, -1))
102
103    @unittest.skipUnless(hasattr(posix, 'setresuid'),
104                         'test needs posix.setresuid()')
105    def test_setresuid_exception(self):
106        # Don't do this test if someone is silly enough to run us as root.
107        current_user_ids = posix.getresuid()
108        if 0 not in current_user_ids:
109            new_user_ids = (current_user_ids[0]+1, -1, -1)
110            self.assertRaises(OSError, posix.setresuid, *new_user_ids)
111
112    @unittest.skipUnless(hasattr(posix, 'setresgid'),
113                         'test needs posix.setresgid()')
114    def test_setresgid(self):
115        current_group_ids = posix.getresgid()
116        self.assertIsNone(posix.setresgid(*current_group_ids))
117        # -1 means don't change that value.
118        self.assertIsNone(posix.setresgid(-1, -1, -1))
119
120    @unittest.skipUnless(hasattr(posix, 'setresgid'),
121                         'test needs posix.setresgid()')
122    def test_setresgid_exception(self):
123        # Don't do this test if someone is silly enough to run us as root.
124        current_group_ids = posix.getresgid()
125        if 0 not in current_group_ids:
126            new_group_ids = (current_group_ids[0]+1, -1, -1)
127            self.assertRaises(OSError, posix.setresgid, *new_group_ids)
128
129    @unittest.skipUnless(hasattr(posix, 'initgroups'),
130                         "test needs os.initgroups()")
131    @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
132    def test_initgroups(self):
133        # It takes a string and an integer; check that it raises a TypeError
134        # for other argument lists.
135        self.assertRaises(TypeError, posix.initgroups)
136        self.assertRaises(TypeError, posix.initgroups, None)
137        self.assertRaises(TypeError, posix.initgroups, 3, "foo")
138        self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
139
140        # If a non-privileged user invokes it, it should fail with OSError
141        # EPERM.
142        if os.getuid() != 0:
143            try:
144                name = pwd.getpwuid(posix.getuid()).pw_name
145            except KeyError:
146                # the current UID may not have a pwd entry
147                raise unittest.SkipTest("need a pwd entry")
148            try:
149                posix.initgroups(name, 13)
150            except OSError as e:
151                self.assertEqual(e.errno, errno.EPERM)
152            else:
153                self.fail("Expected OSError to be raised by initgroups")
154
155    @unittest.skipUnless(hasattr(posix, 'statvfs'),
156                         'test needs posix.statvfs()')
157    def test_statvfs(self):
158        self.assertTrue(posix.statvfs(os.curdir))
159
160    @unittest.skipUnless(hasattr(posix, 'fstatvfs'),
161                         'test needs posix.fstatvfs()')
162    def test_fstatvfs(self):
163        fp = open(os_helper.TESTFN)
164        try:
165            self.assertTrue(posix.fstatvfs(fp.fileno()))
166            self.assertTrue(posix.statvfs(fp.fileno()))
167        finally:
168            fp.close()
169
170    @unittest.skipUnless(hasattr(posix, 'ftruncate'),
171                         'test needs posix.ftruncate()')
172    def test_ftruncate(self):
173        fp = open(os_helper.TESTFN, 'w+')
174        try:
175            # we need to have some data to truncate
176            fp.write('test')
177            fp.flush()
178            posix.ftruncate(fp.fileno(), 0)
179        finally:
180            fp.close()
181
182    @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
183    def test_truncate(self):
184        with open(os_helper.TESTFN, 'w') as fp:
185            fp.write('test')
186            fp.flush()
187        posix.truncate(os_helper.TESTFN, 0)
188
189    @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")
190    @support.requires_fork()
191    def test_fexecve(self):
192        fp = os.open(sys.executable, os.O_RDONLY)
193        try:
194            pid = os.fork()
195            if pid == 0:
196                os.chdir(os.path.split(sys.executable)[0])
197                posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)
198            else:
199                support.wait_process(pid, exitcode=0)
200        finally:
201            os.close(fp)
202
203
204    @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
205    @support.requires_fork()
206    def test_waitid(self):
207        pid = os.fork()
208        if pid == 0:
209            os.chdir(os.path.split(sys.executable)[0])
210            posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
211        else:
212            res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
213            self.assertEqual(pid, res.si_pid)
214
215    @support.requires_fork()
216    def test_register_at_fork(self):
217        with self.assertRaises(TypeError, msg="Positional args not allowed"):
218            os.register_at_fork(lambda: None)
219        with self.assertRaises(TypeError, msg="Args must be callable"):
220            os.register_at_fork(before=2)
221        with self.assertRaises(TypeError, msg="Args must be callable"):
222            os.register_at_fork(after_in_child="three")
223        with self.assertRaises(TypeError, msg="Args must be callable"):
224            os.register_at_fork(after_in_parent=b"Five")
225        with self.assertRaises(TypeError, msg="Args must not be None"):
226            os.register_at_fork(before=None)
227        with self.assertRaises(TypeError, msg="Args must not be None"):
228            os.register_at_fork(after_in_child=None)
229        with self.assertRaises(TypeError, msg="Args must not be None"):
230            os.register_at_fork(after_in_parent=None)
231        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
232            # Ensure a combination of valid and invalid is an error.
233            os.register_at_fork(before=None, after_in_parent=lambda: 3)
234        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
235            # Ensure a combination of valid and invalid is an error.
236            os.register_at_fork(before=lambda: None, after_in_child='')
237        # We test actual registrations in their own process so as not to
238        # pollute this one.  There is no way to unregister for cleanup.
239        code = """if 1:
240            import os
241
242            r, w = os.pipe()
243            fin_r, fin_w = os.pipe()
244
245            os.register_at_fork(before=lambda: os.write(w, b'A'))
246            os.register_at_fork(after_in_parent=lambda: os.write(w, b'C'))
247            os.register_at_fork(after_in_child=lambda: os.write(w, b'E'))
248            os.register_at_fork(before=lambda: os.write(w, b'B'),
249                                after_in_parent=lambda: os.write(w, b'D'),
250                                after_in_child=lambda: os.write(w, b'F'))
251
252            pid = os.fork()
253            if pid == 0:
254                # At this point, after-forkers have already been executed
255                os.close(w)
256                # Wait for parent to tell us to exit
257                os.read(fin_r, 1)
258                os._exit(0)
259            else:
260                try:
261                    os.close(w)
262                    with open(r, "rb") as f:
263                        data = f.read()
264                        assert len(data) == 6, data
265                        # Check before-fork callbacks
266                        assert data[:2] == b'BA', data
267                        # Check after-fork callbacks
268                        assert sorted(data[2:]) == list(b'CDEF'), data
269                        assert data.index(b'C') < data.index(b'D'), data
270                        assert data.index(b'E') < data.index(b'F'), data
271                finally:
272                    os.write(fin_w, b'!')
273            """
274        assert_python_ok('-c', code)
275
276    @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
277    def test_lockf(self):
278        fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT)
279        try:
280            os.write(fd, b'test')
281            os.lseek(fd, 0, os.SEEK_SET)
282            posix.lockf(fd, posix.F_LOCK, 4)
283            # section is locked
284            posix.lockf(fd, posix.F_ULOCK, 4)
285        finally:
286            os.close(fd)
287
288    @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
289    def test_pread(self):
290        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
291        try:
292            os.write(fd, b'test')
293            os.lseek(fd, 0, os.SEEK_SET)
294            self.assertEqual(b'es', posix.pread(fd, 2, 1))
295            # the first pread() shouldn't disturb the file offset
296            self.assertEqual(b'te', posix.read(fd, 2))
297        finally:
298            os.close(fd)
299
300    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
301    def test_preadv(self):
302        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
303        try:
304            os.write(fd, b'test1tt2t3t5t6t6t8')
305            buf = [bytearray(i) for i in [5, 3, 2]]
306            self.assertEqual(posix.preadv(fd, buf, 3), 10)
307            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
308        finally:
309            os.close(fd)
310
311    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
312    @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI")
313    def test_preadv_flags(self):
314        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
315        try:
316            os.write(fd, b'test1tt2t3t5t6t6t8')
317            buf = [bytearray(i) for i in [5, 3, 2]]
318            self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10)
319            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
320        except NotImplementedError:
321            self.skipTest("preadv2 not available")
322        except OSError as inst:
323            # Is possible that the macro RWF_HIPRI was defined at compilation time
324            # but the option is not supported by the kernel or the runtime libc shared
325            # library.
326            if inst.errno in {errno.EINVAL, errno.ENOTSUP}:
327                raise unittest.SkipTest("RWF_HIPRI is not supported by the current system")
328            else:
329                raise
330        finally:
331            os.close(fd)
332
333    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
334    @requires_32b
335    def test_preadv_overflow_32bits(self):
336        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
337        try:
338            buf = [bytearray(2**16)] * 2**15
339            with self.assertRaises(OSError) as cm:
340                os.preadv(fd, buf, 0)
341            self.assertEqual(cm.exception.errno, errno.EINVAL)
342            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
343        finally:
344            os.close(fd)
345
346    @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
347    def test_pwrite(self):
348        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
349        try:
350            os.write(fd, b'test')
351            os.lseek(fd, 0, os.SEEK_SET)
352            posix.pwrite(fd, b'xx', 1)
353            self.assertEqual(b'txxt', posix.read(fd, 4))
354        finally:
355            os.close(fd)
356
357    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
358    def test_pwritev(self):
359        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
360        try:
361            os.write(fd, b"xx")
362            os.lseek(fd, 0, os.SEEK_SET)
363            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2)
364            self.assertEqual(n, 10)
365
366            os.lseek(fd, 0, os.SEEK_SET)
367            self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100))
368        finally:
369            os.close(fd)
370
371    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
372    @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC")
373    def test_pwritev_flags(self):
374        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
375        try:
376            os.write(fd,b"xx")
377            os.lseek(fd, 0, os.SEEK_SET)
378            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC)
379            self.assertEqual(n, 10)
380
381            os.lseek(fd, 0, os.SEEK_SET)
382            self.assertEqual(b'xxtest1tt2', posix.read(fd, 100))
383        finally:
384            os.close(fd)
385
386    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
387    @requires_32b
388    def test_pwritev_overflow_32bits(self):
389        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
390        try:
391            with self.assertRaises(OSError) as cm:
392                os.pwritev(fd, [b"x" * 2**16] * 2**15, 0)
393            self.assertEqual(cm.exception.errno, errno.EINVAL)
394        finally:
395            os.close(fd)
396
397    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
398        "test needs posix.posix_fallocate()")
399    def test_posix_fallocate(self):
400        fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT)
401        try:
402            posix.posix_fallocate(fd, 0, 10)
403        except OSError as inst:
404            # issue10812, ZFS doesn't appear to support posix_fallocate,
405            # so skip Solaris-based since they are likely to have ZFS.
406            # issue33655: Also ignore EINVAL on *BSD since ZFS is also
407            # often used there.
408            if inst.errno == errno.EINVAL and sys.platform.startswith(
409                ('sunos', 'freebsd', 'netbsd', 'openbsd', 'gnukfreebsd')):
410                raise unittest.SkipTest("test may fail on ZFS filesystems")
411            else:
412                raise
413        finally:
414            os.close(fd)
415
416    # issue31106 - posix_fallocate() does not set error in errno.
417    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
418        "test needs posix.posix_fallocate()")
419    def test_posix_fallocate_errno(self):
420        try:
421            posix.posix_fallocate(-42, 0, 10)
422        except OSError as inst:
423            if inst.errno != errno.EBADF:
424                raise
425
426    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
427        "test needs posix.posix_fadvise()")
428    def test_posix_fadvise(self):
429        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
430        try:
431            posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
432        finally:
433            os.close(fd)
434
435    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
436        "test needs posix.posix_fadvise()")
437    def test_posix_fadvise_errno(self):
438        try:
439            posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED)
440        except OSError as inst:
441            if inst.errno != errno.EBADF:
442                raise
443
444    @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime")
445    def test_utime_with_fd(self):
446        now = time.time()
447        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
448        try:
449            posix.utime(fd)
450            posix.utime(fd, None)
451            self.assertRaises(TypeError, posix.utime, fd, (None, None))
452            self.assertRaises(TypeError, posix.utime, fd, (now, None))
453            self.assertRaises(TypeError, posix.utime, fd, (None, now))
454            posix.utime(fd, (int(now), int(now)))
455            posix.utime(fd, (now, now))
456            self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now))
457            self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None))
458            self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0))
459            posix.utime(fd, (int(now), int((now - int(now)) * 1e9)))
460            posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9)))
461
462        finally:
463            os.close(fd)
464
465    @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime")
466    def test_utime_nofollow_symlinks(self):
467        now = time.time()
468        posix.utime(os_helper.TESTFN, None, follow_symlinks=False)
469        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
470                          (None, None), follow_symlinks=False)
471        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
472                          (now, None), follow_symlinks=False)
473        self.assertRaises(TypeError, posix.utime, os_helper.TESTFN,
474                          (None, now), follow_symlinks=False)
475        posix.utime(os_helper.TESTFN, (int(now), int(now)),
476                    follow_symlinks=False)
477        posix.utime(os_helper.TESTFN, (now, now), follow_symlinks=False)
478        posix.utime(os_helper.TESTFN, follow_symlinks=False)
479
480    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
481    def test_writev(self):
482        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
483        try:
484            n = os.writev(fd, (b'test1', b'tt2', b't3'))
485            self.assertEqual(n, 10)
486
487            os.lseek(fd, 0, os.SEEK_SET)
488            self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
489
490            # Issue #20113: empty list of buffers should not crash
491            try:
492                size = posix.writev(fd, [])
493            except OSError:
494                # writev(fd, []) raises OSError(22, "Invalid argument")
495                # on OpenIndiana
496                pass
497            else:
498                self.assertEqual(size, 0)
499        finally:
500            os.close(fd)
501
502    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
503    @requires_32b
504    def test_writev_overflow_32bits(self):
505        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
506        try:
507            with self.assertRaises(OSError) as cm:
508                os.writev(fd, [b"x" * 2**16] * 2**15)
509            self.assertEqual(cm.exception.errno, errno.EINVAL)
510        finally:
511            os.close(fd)
512
513    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
514    def test_readv(self):
515        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
516        try:
517            os.write(fd, b'test1tt2t3')
518            os.lseek(fd, 0, os.SEEK_SET)
519            buf = [bytearray(i) for i in [5, 3, 2]]
520            self.assertEqual(posix.readv(fd, buf), 10)
521            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
522
523            # Issue #20113: empty list of buffers should not crash
524            try:
525                size = posix.readv(fd, [])
526            except OSError:
527                # readv(fd, []) raises OSError(22, "Invalid argument")
528                # on OpenIndiana
529                pass
530            else:
531                self.assertEqual(size, 0)
532        finally:
533            os.close(fd)
534
535    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
536    @requires_32b
537    def test_readv_overflow_32bits(self):
538        fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT)
539        try:
540            buf = [bytearray(2**16)] * 2**15
541            with self.assertRaises(OSError) as cm:
542                os.readv(fd, buf)
543            self.assertEqual(cm.exception.errno, errno.EINVAL)
544            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
545        finally:
546            os.close(fd)
547
548    @unittest.skipUnless(hasattr(posix, 'dup'),
549                         'test needs posix.dup()')
550    @unittest.skipIf(support.is_wasi, "WASI does not have dup()")
551    def test_dup(self):
552        fp = open(os_helper.TESTFN)
553        try:
554            fd = posix.dup(fp.fileno())
555            self.assertIsInstance(fd, int)
556            os.close(fd)
557        finally:
558            fp.close()
559
560    @unittest.skipUnless(hasattr(posix, 'confstr'),
561                         'test needs posix.confstr()')
562    def test_confstr(self):
563        self.assertRaises(ValueError, posix.confstr, "CS_garbage")
564        self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
565
566    @unittest.skipUnless(hasattr(posix, 'dup2'),
567                         'test needs posix.dup2()')
568    @unittest.skipIf(support.is_wasi, "WASI does not have dup2()")
569    def test_dup2(self):
570        fp1 = open(os_helper.TESTFN)
571        fp2 = open(os_helper.TESTFN)
572        try:
573            posix.dup2(fp1.fileno(), fp2.fileno())
574        finally:
575            fp1.close()
576            fp2.close()
577
578    @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
579    @support.requires_linux_version(2, 6, 23)
580    @support.requires_subprocess()
581    def test_oscloexec(self):
582        fd = os.open(os_helper.TESTFN, os.O_RDONLY|os.O_CLOEXEC)
583        self.addCleanup(os.close, fd)
584        self.assertFalse(os.get_inheritable(fd))
585
586    @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'),
587                         'test needs posix.O_EXLOCK')
588    def test_osexlock(self):
589        fd = os.open(os_helper.TESTFN,
590                     os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
591        self.assertRaises(OSError, os.open, os_helper.TESTFN,
592                          os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
593        os.close(fd)
594
595        if hasattr(posix, "O_SHLOCK"):
596            fd = os.open(os_helper.TESTFN,
597                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
598            self.assertRaises(OSError, os.open, os_helper.TESTFN,
599                              os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
600            os.close(fd)
601
602    @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'),
603                         'test needs posix.O_SHLOCK')
604    def test_osshlock(self):
605        fd1 = os.open(os_helper.TESTFN,
606                     os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
607        fd2 = os.open(os_helper.TESTFN,
608                      os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
609        os.close(fd2)
610        os.close(fd1)
611
612        if hasattr(posix, "O_EXLOCK"):
613            fd = os.open(os_helper.TESTFN,
614                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
615            self.assertRaises(OSError, os.open, os_helper.TESTFN,
616                              os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
617            os.close(fd)
618
619    @unittest.skipUnless(hasattr(posix, 'fstat'),
620                         'test needs posix.fstat()')
621    def test_fstat(self):
622        fp = open(os_helper.TESTFN)
623        try:
624            self.assertTrue(posix.fstat(fp.fileno()))
625            self.assertTrue(posix.stat(fp.fileno()))
626
627            self.assertRaisesRegex(TypeError,
628                    'should be string, bytes, os.PathLike or integer, not',
629                    posix.stat, float(fp.fileno()))
630        finally:
631            fp.close()
632
633    def test_stat(self):
634        self.assertTrue(posix.stat(os_helper.TESTFN))
635        self.assertTrue(posix.stat(os.fsencode(os_helper.TESTFN)))
636
637        self.assertWarnsRegex(DeprecationWarning,
638                'should be string, bytes, os.PathLike or integer, not',
639                posix.stat, bytearray(os.fsencode(os_helper.TESTFN)))
640        self.assertRaisesRegex(TypeError,
641                'should be string, bytes, os.PathLike or integer, not',
642                posix.stat, None)
643        self.assertRaisesRegex(TypeError,
644                'should be string, bytes, os.PathLike or integer, not',
645                posix.stat, list(os_helper.TESTFN))
646        self.assertRaisesRegex(TypeError,
647                'should be string, bytes, os.PathLike or integer, not',
648                posix.stat, list(os.fsencode(os_helper.TESTFN)))
649
650    @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
651    def test_mkfifo(self):
652        if sys.platform == "vxworks":
653            fifo_path = os.path.join("/fifos/", os_helper.TESTFN)
654        else:
655            fifo_path = os_helper.TESTFN
656        os_helper.unlink(fifo_path)
657        self.addCleanup(os_helper.unlink, fifo_path)
658        try:
659            posix.mkfifo(fifo_path, stat.S_IRUSR | stat.S_IWUSR)
660        except PermissionError as e:
661            self.skipTest('posix.mkfifo(): %s' % e)
662        self.assertTrue(stat.S_ISFIFO(posix.stat(fifo_path).st_mode))
663
664    @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'),
665                         "don't have mknod()/S_IFIFO")
666    def test_mknod(self):
667        # Test using mknod() to create a FIFO (the only use specified
668        # by POSIX).
669        os_helper.unlink(os_helper.TESTFN)
670        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
671        try:
672            posix.mknod(os_helper.TESTFN, mode, 0)
673        except OSError as e:
674            # Some old systems don't allow unprivileged users to use
675            # mknod(), or only support creating device nodes.
676            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
677        else:
678            self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode))
679
680        # Keyword arguments are also supported
681        os_helper.unlink(os_helper.TESTFN)
682        try:
683            posix.mknod(path=os_helper.TESTFN, mode=mode, device=0,
684                dir_fd=None)
685        except OSError as e:
686            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
687
688    @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()')
689    def test_makedev(self):
690        st = posix.stat(os_helper.TESTFN)
691        dev = st.st_dev
692        self.assertIsInstance(dev, int)
693        self.assertGreaterEqual(dev, 0)
694
695        major = posix.major(dev)
696        self.assertIsInstance(major, int)
697        self.assertGreaterEqual(major, 0)
698        self.assertEqual(posix.major(dev), major)
699        self.assertRaises(TypeError, posix.major, float(dev))
700        self.assertRaises(TypeError, posix.major)
701        self.assertRaises((ValueError, OverflowError), posix.major, -1)
702
703        minor = posix.minor(dev)
704        self.assertIsInstance(minor, int)
705        self.assertGreaterEqual(minor, 0)
706        self.assertEqual(posix.minor(dev), minor)
707        self.assertRaises(TypeError, posix.minor, float(dev))
708        self.assertRaises(TypeError, posix.minor)
709        self.assertRaises((ValueError, OverflowError), posix.minor, -1)
710
711        self.assertEqual(posix.makedev(major, minor), dev)
712        self.assertRaises(TypeError, posix.makedev, float(major), minor)
713        self.assertRaises(TypeError, posix.makedev, major, float(minor))
714        self.assertRaises(TypeError, posix.makedev, major)
715        self.assertRaises(TypeError, posix.makedev)
716
717    def _test_all_chown_common(self, chown_func, first_param, stat_func):
718        """Common code for chown, fchown and lchown tests."""
719        def check_stat(uid, gid):
720            if stat_func is not None:
721                stat = stat_func(first_param)
722                self.assertEqual(stat.st_uid, uid)
723                self.assertEqual(stat.st_gid, gid)
724        uid = os.getuid()
725        gid = os.getgid()
726        # test a successful chown call
727        chown_func(first_param, uid, gid)
728        check_stat(uid, gid)
729        chown_func(first_param, -1, gid)
730        check_stat(uid, gid)
731        chown_func(first_param, uid, -1)
732        check_stat(uid, gid)
733
734        if sys.platform == "vxworks":
735            # On VxWorks, root user id is 1 and 0 means no login user:
736            # both are super users.
737            is_root = (uid in (0, 1))
738        else:
739            is_root = (uid == 0)
740        if support.is_emscripten:
741            # Emscripten getuid() / geteuid() always return 0 (root), but
742            # cannot chown uid/gid to random value.
743            pass
744        elif is_root:
745            # Try an amusingly large uid/gid to make sure we handle
746            # large unsigned values.  (chown lets you use any
747            # uid/gid you like, even if they aren't defined.)
748            #
749            # On VxWorks uid_t is defined as unsigned short. A big
750            # value greater than 65535 will result in underflow error.
751            #
752            # This problem keeps coming up:
753            #   http://bugs.python.org/issue1747858
754            #   http://bugs.python.org/issue4591
755            #   http://bugs.python.org/issue15301
756            # Hopefully the fix in 4591 fixes it for good!
757            #
758            # This part of the test only runs when run as root.
759            # Only scary people run their tests as root.
760
761            big_value = (2**31 if sys.platform != "vxworks" else 2**15)
762            chown_func(first_param, big_value, big_value)
763            check_stat(big_value, big_value)
764            chown_func(first_param, -1, -1)
765            check_stat(big_value, big_value)
766            chown_func(first_param, uid, gid)
767            check_stat(uid, gid)
768        elif platform.system() in ('HP-UX', 'SunOS'):
769            # HP-UX and Solaris can allow a non-root user to chown() to root
770            # (issue #5113)
771            raise unittest.SkipTest("Skipping because of non-standard chown() "
772                                    "behavior")
773        else:
774            # non-root cannot chown to root, raises OSError
775            self.assertRaises(OSError, chown_func, first_param, 0, 0)
776            check_stat(uid, gid)
777            self.assertRaises(OSError, chown_func, first_param, 0, -1)
778            check_stat(uid, gid)
779            if 0 not in os.getgroups():
780                self.assertRaises(OSError, chown_func, first_param, -1, 0)
781                check_stat(uid, gid)
782        # test illegal types
783        for t in str, float:
784            self.assertRaises(TypeError, chown_func, first_param, t(uid), gid)
785            check_stat(uid, gid)
786            self.assertRaises(TypeError, chown_func, first_param, uid, t(gid))
787            check_stat(uid, gid)
788
789    @os_helper.skip_unless_working_chmod
790    @unittest.skipIf(support.is_emscripten, "getgid() is a stub")
791    def test_chown(self):
792        # raise an OSError if the file does not exist
793        os.unlink(os_helper.TESTFN)
794        self.assertRaises(OSError, posix.chown, os_helper.TESTFN, -1, -1)
795
796        # re-create the file
797        os_helper.create_empty_file(os_helper.TESTFN)
798        self._test_all_chown_common(posix.chown, os_helper.TESTFN, posix.stat)
799
800    @os_helper.skip_unless_working_chmod
801    @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
802    @unittest.skipIf(support.is_emscripten, "getgid() is a stub")
803    def test_fchown(self):
804        os.unlink(os_helper.TESTFN)
805
806        # re-create the file
807        test_file = open(os_helper.TESTFN, 'w')
808        try:
809            fd = test_file.fileno()
810            self._test_all_chown_common(posix.fchown, fd,
811                                        getattr(posix, 'fstat', None))
812        finally:
813            test_file.close()
814
815    @os_helper.skip_unless_working_chmod
816    @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
817    def test_lchown(self):
818        os.unlink(os_helper.TESTFN)
819        # create a symlink
820        os.symlink(_DUMMY_SYMLINK, os_helper.TESTFN)
821        self._test_all_chown_common(posix.lchown, os_helper.TESTFN,
822                                    getattr(posix, 'lstat', None))
823
824    @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()')
825    def test_chdir(self):
826        posix.chdir(os.curdir)
827        self.assertRaises(OSError, posix.chdir, os_helper.TESTFN)
828
829    def test_listdir(self):
830        self.assertIn(os_helper.TESTFN, posix.listdir(os.curdir))
831
832    def test_listdir_default(self):
833        # When listdir is called without argument,
834        # it's the same as listdir(os.curdir).
835        self.assertIn(os_helper.TESTFN, posix.listdir())
836
837    def test_listdir_bytes(self):
838        # When listdir is called with a bytes object,
839        # the returned strings are of type bytes.
840        self.assertIn(os.fsencode(os_helper.TESTFN), posix.listdir(b'.'))
841
842    def test_listdir_bytes_like(self):
843        for cls in bytearray, memoryview:
844            with self.assertWarns(DeprecationWarning):
845                names = posix.listdir(cls(b'.'))
846            self.assertIn(os.fsencode(os_helper.TESTFN), names)
847            for name in names:
848                self.assertIs(type(name), bytes)
849
850    @unittest.skipUnless(posix.listdir in os.supports_fd,
851                         "test needs fd support for posix.listdir()")
852    def test_listdir_fd(self):
853        f = posix.open(posix.getcwd(), posix.O_RDONLY)
854        self.addCleanup(posix.close, f)
855        self.assertEqual(
856            sorted(posix.listdir('.')),
857            sorted(posix.listdir(f))
858            )
859        # Check that the fd offset was reset (issue #13739)
860        self.assertEqual(
861            sorted(posix.listdir('.')),
862            sorted(posix.listdir(f))
863            )
864
865    @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()')
866    def test_access(self):
867        self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK))
868
869    @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()')
870    def test_umask(self):
871        old_mask = posix.umask(0)
872        self.assertIsInstance(old_mask, int)
873        posix.umask(old_mask)
874
875    @unittest.skipUnless(hasattr(posix, 'strerror'),
876                         'test needs posix.strerror()')
877    def test_strerror(self):
878        self.assertTrue(posix.strerror(0))
879
880    @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()')
881    def test_pipe(self):
882        reader, writer = posix.pipe()
883        os.close(reader)
884        os.close(writer)
885
886    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
887    @support.requires_linux_version(2, 6, 27)
888    def test_pipe2(self):
889        self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
890        self.assertRaises(TypeError, os.pipe2, 0, 0)
891
892        # try calling with flags = 0, like os.pipe()
893        r, w = os.pipe2(0)
894        os.close(r)
895        os.close(w)
896
897        # test flags
898        r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
899        self.addCleanup(os.close, r)
900        self.addCleanup(os.close, w)
901        self.assertFalse(os.get_inheritable(r))
902        self.assertFalse(os.get_inheritable(w))
903        self.assertFalse(os.get_blocking(r))
904        self.assertFalse(os.get_blocking(w))
905        # try reading from an empty pipe: this should fail, not block
906        self.assertRaises(OSError, os.read, r, 1)
907        # try a write big enough to fill-up the pipe: this should either
908        # fail or perform a partial write, not block
909        try:
910            os.write(w, b'x' * support.PIPE_MAX_SIZE)
911        except OSError:
912            pass
913
914    @support.cpython_only
915    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
916    @support.requires_linux_version(2, 6, 27)
917    def test_pipe2_c_limits(self):
918        # Issue 15989
919        import _testcapi
920        self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)
921        self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)
922
923    @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()')
924    def test_utime(self):
925        now = time.time()
926        posix.utime(os_helper.TESTFN, None)
927        self.assertRaises(TypeError, posix.utime,
928                          os_helper.TESTFN, (None, None))
929        self.assertRaises(TypeError, posix.utime,
930                          os_helper.TESTFN, (now, None))
931        self.assertRaises(TypeError, posix.utime,
932                          os_helper.TESTFN, (None, now))
933        posix.utime(os_helper.TESTFN, (int(now), int(now)))
934        posix.utime(os_helper.TESTFN, (now, now))
935
936    def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
937        st = os.stat(target_file)
938        self.assertTrue(hasattr(st, 'st_flags'))
939
940        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
941        flags = st.st_flags | stat.UF_IMMUTABLE
942        try:
943            chflags_func(target_file, flags, **kwargs)
944        except OSError as err:
945            if err.errno != errno.EOPNOTSUPP:
946                raise
947            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
948            self.skipTest(msg)
949
950        try:
951            new_st = os.stat(target_file)
952            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
953            try:
954                fd = open(target_file, 'w+')
955            except OSError as e:
956                self.assertEqual(e.errno, errno.EPERM)
957        finally:
958            posix.chflags(target_file, st.st_flags)
959
960    @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
961    def test_chflags(self):
962        self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN)
963
964    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
965    def test_lchflags_regular_file(self):
966        self._test_chflags_regular_file(posix.lchflags, os_helper.TESTFN)
967        self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN,
968                                        follow_symlinks=False)
969
970    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
971    def test_lchflags_symlink(self):
972        testfn_st = os.stat(os_helper.TESTFN)
973
974        self.assertTrue(hasattr(testfn_st, 'st_flags'))
975
976        self.addCleanup(os_helper.unlink, _DUMMY_SYMLINK)
977        os.symlink(os_helper.TESTFN, _DUMMY_SYMLINK)
978        dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
979
980        def chflags_nofollow(path, flags):
981            return posix.chflags(path, flags, follow_symlinks=False)
982
983        for fn in (posix.lchflags, chflags_nofollow):
984            # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
985            flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE
986            try:
987                fn(_DUMMY_SYMLINK, flags)
988            except OSError as err:
989                if err.errno != errno.EOPNOTSUPP:
990                    raise
991                msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
992                self.skipTest(msg)
993            try:
994                new_testfn_st = os.stat(os_helper.TESTFN)
995                new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
996
997                self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
998                self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
999                                 new_dummy_symlink_st.st_flags)
1000            finally:
1001                fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
1002
1003    def test_environ(self):
1004        if os.name == "nt":
1005            item_type = str
1006        else:
1007            item_type = bytes
1008        for k, v in posix.environ.items():
1009            self.assertEqual(type(k), item_type)
1010            self.assertEqual(type(v), item_type)
1011
1012    def test_putenv(self):
1013        with self.assertRaises(ValueError):
1014            os.putenv('FRUIT\0VEGETABLE', 'cabbage')
1015        with self.assertRaises(ValueError):
1016            os.putenv(b'FRUIT\0VEGETABLE', b'cabbage')
1017        with self.assertRaises(ValueError):
1018            os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage')
1019        with self.assertRaises(ValueError):
1020            os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage')
1021        with self.assertRaises(ValueError):
1022            os.putenv('FRUIT=ORANGE', 'lemon')
1023        with self.assertRaises(ValueError):
1024            os.putenv(b'FRUIT=ORANGE', b'lemon')
1025
1026    @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()')
1027    def test_getcwd_long_pathnames(self):
1028        dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
1029        curdir = os.getcwd()
1030        base_path = os.path.abspath(os_helper.TESTFN) + '.getcwd'
1031
1032        try:
1033            os.mkdir(base_path)
1034            os.chdir(base_path)
1035        except:
1036            #  Just returning nothing instead of the SkipTest exception, because
1037            #  the test results in Error in that case.  Is that ok?
1038            #  raise unittest.SkipTest("cannot create directory for testing")
1039            return
1040
1041            def _create_and_do_getcwd(dirname, current_path_length = 0):
1042                try:
1043                    os.mkdir(dirname)
1044                except:
1045                    raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test")
1046
1047                os.chdir(dirname)
1048                try:
1049                    os.getcwd()
1050                    if current_path_length < 1027:
1051                        _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
1052                finally:
1053                    os.chdir('..')
1054                    os.rmdir(dirname)
1055
1056            _create_and_do_getcwd(dirname)
1057
1058        finally:
1059            os.chdir(curdir)
1060            os_helper.rmtree(base_path)
1061
1062    @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")
1063    @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
1064    @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")
1065    def test_getgrouplist(self):
1066        user = pwd.getpwuid(os.getuid())[0]
1067        group = pwd.getpwuid(os.getuid())[3]
1068        self.assertIn(group, posix.getgrouplist(user, group))
1069
1070
1071    @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
1072    @unittest.skipUnless(hasattr(os, 'popen'), "test needs os.popen()")
1073    @support.requires_subprocess()
1074    def test_getgroups(self):
1075        with os.popen('id -G 2>/dev/null') as idg:
1076            groups = idg.read().strip()
1077            ret = idg.close()
1078
1079        try:
1080            idg_groups = set(int(g) for g in groups.split())
1081        except ValueError:
1082            idg_groups = set()
1083        if ret is not None or not idg_groups:
1084            raise unittest.SkipTest("need working 'id -G'")
1085
1086        # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()
1087        if sys.platform == 'darwin':
1088            import sysconfig
1089            dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.3'
1090            if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6):
1091                raise unittest.SkipTest("getgroups(2) is broken prior to 10.6")
1092
1093        # 'id -G' and 'os.getgroups()' should return the same
1094        # groups, ignoring order, duplicates, and the effective gid.
1095        # #10822/#26944 - It is implementation defined whether
1096        # posix.getgroups() includes the effective gid.
1097        symdiff = idg_groups.symmetric_difference(posix.getgroups())
1098        self.assertTrue(not symdiff or symdiff == {posix.getegid()})
1099
1100    @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal')
1101    @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result")
1102    def test_cld_xxxx_constants(self):
1103        os.CLD_EXITED
1104        os.CLD_KILLED
1105        os.CLD_DUMPED
1106        os.CLD_TRAPPED
1107        os.CLD_STOPPED
1108        os.CLD_CONTINUED
1109
1110    requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
1111                                           "don't have scheduling support")
1112    requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),
1113                                                  "don't have sched affinity support")
1114
1115    @requires_sched_h
1116    def test_sched_yield(self):
1117        # This has no error conditions (at least on Linux).
1118        posix.sched_yield()
1119
1120    @requires_sched_h
1121    @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
1122                         "requires sched_get_priority_max()")
1123    def test_sched_priority(self):
1124        # Round-robin usually has interesting priorities.
1125        pol = posix.SCHED_RR
1126        lo = posix.sched_get_priority_min(pol)
1127        hi = posix.sched_get_priority_max(pol)
1128        self.assertIsInstance(lo, int)
1129        self.assertIsInstance(hi, int)
1130        self.assertGreaterEqual(hi, lo)
1131        # OSX evidently just returns 15 without checking the argument.
1132        if sys.platform != "darwin":
1133            self.assertRaises(OSError, posix.sched_get_priority_min, -23)
1134            self.assertRaises(OSError, posix.sched_get_priority_max, -23)
1135
1136    @requires_sched
1137    def test_get_and_set_scheduler_and_param(self):
1138        possible_schedulers = [sched for name, sched in posix.__dict__.items()
1139                               if name.startswith("SCHED_")]
1140        mine = posix.sched_getscheduler(0)
1141        self.assertIn(mine, possible_schedulers)
1142        try:
1143            parent = posix.sched_getscheduler(os.getppid())
1144        except OSError as e:
1145            if e.errno != errno.EPERM:
1146                raise
1147        else:
1148            self.assertIn(parent, possible_schedulers)
1149        self.assertRaises(OSError, posix.sched_getscheduler, -1)
1150        self.assertRaises(OSError, posix.sched_getparam, -1)
1151        param = posix.sched_getparam(0)
1152        self.assertIsInstance(param.sched_priority, int)
1153
1154        # POSIX states that calling sched_setparam() or sched_setscheduler() on
1155        # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR
1156        # is implementation-defined: NetBSD and FreeBSD can return EINVAL.
1157        if not sys.platform.startswith(('freebsd', 'netbsd')):
1158            try:
1159                posix.sched_setscheduler(0, mine, param)
1160                posix.sched_setparam(0, param)
1161            except OSError as e:
1162                if e.errno != errno.EPERM:
1163                    raise
1164            self.assertRaises(OSError, posix.sched_setparam, -1, param)
1165
1166        self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
1167        self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
1168        self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
1169        param = posix.sched_param(None)
1170        self.assertRaises(TypeError, posix.sched_setparam, 0, param)
1171        large = 214748364700
1172        param = posix.sched_param(large)
1173        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1174        param = posix.sched_param(sched_priority=-large)
1175        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1176
1177    @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
1178    def test_sched_rr_get_interval(self):
1179        try:
1180            interval = posix.sched_rr_get_interval(0)
1181        except OSError as e:
1182            # This likely means that sched_rr_get_interval is only valid for
1183            # processes with the SCHED_RR scheduler in effect.
1184            if e.errno != errno.EINVAL:
1185                raise
1186            self.skipTest("only works on SCHED_RR processes")
1187        self.assertIsInstance(interval, float)
1188        # Reasonable constraints, I think.
1189        self.assertGreaterEqual(interval, 0.)
1190        self.assertLess(interval, 1.)
1191
1192    @requires_sched_affinity
1193    def test_sched_getaffinity(self):
1194        mask = posix.sched_getaffinity(0)
1195        self.assertIsInstance(mask, set)
1196        self.assertGreaterEqual(len(mask), 1)
1197        if not sys.platform.startswith("freebsd"):
1198            # bpo-47205: does not raise OSError on FreeBSD
1199            self.assertRaises(OSError, posix.sched_getaffinity, -1)
1200        for cpu in mask:
1201            self.assertIsInstance(cpu, int)
1202            self.assertGreaterEqual(cpu, 0)
1203            self.assertLess(cpu, 1 << 32)
1204
1205    @requires_sched_affinity
1206    def test_sched_setaffinity(self):
1207        mask = posix.sched_getaffinity(0)
1208        if len(mask) > 1:
1209            # Empty masks are forbidden
1210            mask.pop()
1211        posix.sched_setaffinity(0, mask)
1212        self.assertEqual(posix.sched_getaffinity(0), mask)
1213        self.assertRaises(OSError, posix.sched_setaffinity, 0, [])
1214        self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10])
1215        self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X"))
1216        self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128])
1217        if not sys.platform.startswith("freebsd"):
1218            # bpo-47205: does not raise OSError on FreeBSD
1219            self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
1220
1221    @unittest.skipIf(support.is_wasi, "No dynamic linking on WASI")
1222    def test_rtld_constants(self):
1223        # check presence of major RTLD_* constants
1224        posix.RTLD_LAZY
1225        posix.RTLD_NOW
1226        posix.RTLD_GLOBAL
1227        posix.RTLD_LOCAL
1228
1229    @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'),
1230                         "test needs an OS that reports file holes")
1231    def test_fs_holes(self):
1232        # Even if the filesystem doesn't report holes,
1233        # if the OS supports it the SEEK_* constants
1234        # will be defined and will have a consistent
1235        # behaviour:
1236        # os.SEEK_DATA = current position
1237        # os.SEEK_HOLE = end of file position
1238        with open(os_helper.TESTFN, 'r+b') as fp:
1239            fp.write(b"hello")
1240            fp.flush()
1241            size = fp.tell()
1242            fno = fp.fileno()
1243            try :
1244                for i in range(size):
1245                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
1246                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
1247                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
1248                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
1249            except OSError :
1250                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
1251                # but it is not true.
1252                # For instance:
1253                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
1254                raise unittest.SkipTest("OSError raised!")
1255
1256    def test_path_error2(self):
1257        """
1258        Test functions that call path_error2(), providing two filenames in their exceptions.
1259        """
1260        for name in ("rename", "replace", "link"):
1261            function = getattr(os, name, None)
1262            if function is None:
1263                continue
1264
1265            for dst in ("noodly2", os_helper.TESTFN):
1266                try:
1267                    function('doesnotexistfilename', dst)
1268                except OSError as e:
1269                    self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e))
1270                    break
1271            else:
1272                self.fail("No valid path_error2() test for os." + name)
1273
1274    def test_path_with_null_character(self):
1275        fn = os_helper.TESTFN
1276        fn_with_NUL = fn + '\0'
1277        self.addCleanup(os_helper.unlink, fn)
1278        os_helper.unlink(fn)
1279        fd = None
1280        try:
1281            with self.assertRaises(ValueError):
1282                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1283        finally:
1284            if fd is not None:
1285                os.close(fd)
1286        self.assertFalse(os.path.exists(fn))
1287        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1288        self.assertFalse(os.path.exists(fn))
1289        open(fn, 'wb').close()
1290        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1291
1292    def test_path_with_null_byte(self):
1293        fn = os.fsencode(os_helper.TESTFN)
1294        fn_with_NUL = fn + b'\0'
1295        self.addCleanup(os_helper.unlink, fn)
1296        os_helper.unlink(fn)
1297        fd = None
1298        try:
1299            with self.assertRaises(ValueError):
1300                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1301        finally:
1302            if fd is not None:
1303                os.close(fd)
1304        self.assertFalse(os.path.exists(fn))
1305        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1306        self.assertFalse(os.path.exists(fn))
1307        open(fn, 'wb').close()
1308        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1309
1310    @unittest.skipUnless(hasattr(os, "pidfd_open"), "pidfd_open unavailable")
1311    def test_pidfd_open(self):
1312        with self.assertRaises(OSError) as cm:
1313            os.pidfd_open(-1)
1314        if cm.exception.errno == errno.ENOSYS:
1315            self.skipTest("system does not support pidfd_open")
1316        if isinstance(cm.exception, PermissionError):
1317            self.skipTest(f"pidfd_open syscall blocked: {cm.exception!r}")
1318        self.assertEqual(cm.exception.errno, errno.EINVAL)
1319        os.close(os.pidfd_open(os.getpid(), 0))
1320
1321
1322# tests for the posix *at functions follow
1323class TestPosixDirFd(unittest.TestCase):
1324    count = 0
1325
1326    @contextmanager
1327    def prepare(self):
1328        TestPosixDirFd.count += 1
1329        name = f'{os_helper.TESTFN}_{self.count}'
1330        base_dir = f'{os_helper.TESTFN}_{self.count}base'
1331        posix.mkdir(base_dir)
1332        self.addCleanup(posix.rmdir, base_dir)
1333        fullname = os.path.join(base_dir, name)
1334        assert not os.path.exists(fullname)
1335        with os_helper.open_dir_fd(base_dir) as dir_fd:
1336            yield (dir_fd, name, fullname)
1337
1338    @contextmanager
1339    def prepare_file(self):
1340        with self.prepare() as (dir_fd, name, fullname):
1341            os_helper.create_empty_file(fullname)
1342            self.addCleanup(posix.unlink, fullname)
1343            yield (dir_fd, name, fullname)
1344
1345    @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
1346    def test_access_dir_fd(self):
1347        with self.prepare_file() as (dir_fd, name, fullname):
1348            self.assertTrue(posix.access(name, os.R_OK, dir_fd=dir_fd))
1349
1350    @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
1351    def test_chmod_dir_fd(self):
1352        with self.prepare_file() as (dir_fd, name, fullname):
1353            posix.chmod(fullname, stat.S_IRUSR)
1354            posix.chmod(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd)
1355            s = posix.stat(fullname)
1356            self.assertEqual(s.st_mode & stat.S_IRWXU,
1357                             stat.S_IRUSR | stat.S_IWUSR)
1358
1359    @unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd),
1360                         "test needs dir_fd support in os.chown()")
1361    @unittest.skipIf(support.is_emscripten, "getgid() is a stub")
1362    def test_chown_dir_fd(self):
1363        with self.prepare_file() as (dir_fd, name, fullname):
1364            posix.chown(name, os.getuid(), os.getgid(), dir_fd=dir_fd)
1365
1366    @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
1367    def test_stat_dir_fd(self):
1368        with self.prepare() as (dir_fd, name, fullname):
1369            with open(fullname, 'w') as outfile:
1370                outfile.write("testline\n")
1371            self.addCleanup(posix.unlink, fullname)
1372
1373            s1 = posix.stat(fullname)
1374            s2 = posix.stat(name, dir_fd=dir_fd)
1375            self.assertEqual(s1, s2)
1376            s2 = posix.stat(fullname, dir_fd=None)
1377            self.assertEqual(s1, s2)
1378
1379            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1380                    posix.stat, name, dir_fd=posix.getcwd())
1381            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1382                    posix.stat, name, dir_fd=float(dir_fd))
1383            self.assertRaises(OverflowError,
1384                    posix.stat, name, dir_fd=10**20)
1385
1386    @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
1387    def test_utime_dir_fd(self):
1388        with self.prepare_file() as (dir_fd, name, fullname):
1389            now = time.time()
1390            posix.utime(name, None, dir_fd=dir_fd)
1391            posix.utime(name, dir_fd=dir_fd)
1392            self.assertRaises(TypeError, posix.utime, name,
1393                              now, dir_fd=dir_fd)
1394            self.assertRaises(TypeError, posix.utime, name,
1395                              (None, None), dir_fd=dir_fd)
1396            self.assertRaises(TypeError, posix.utime, name,
1397                              (now, None), dir_fd=dir_fd)
1398            self.assertRaises(TypeError, posix.utime, name,
1399                              (None, now), dir_fd=dir_fd)
1400            self.assertRaises(TypeError, posix.utime, name,
1401                              (now, "x"), dir_fd=dir_fd)
1402            posix.utime(name, (int(now), int(now)), dir_fd=dir_fd)
1403            posix.utime(name, (now, now), dir_fd=dir_fd)
1404            posix.utime(name,
1405                    (int(now), int((now - int(now)) * 1e9)), dir_fd=dir_fd)
1406            posix.utime(name, dir_fd=dir_fd,
1407                            times=(int(now), int((now - int(now)) * 1e9)))
1408
1409            # try dir_fd and follow_symlinks together
1410            if os.utime in os.supports_follow_symlinks:
1411                try:
1412                    posix.utime(name, follow_symlinks=False, dir_fd=dir_fd)
1413                except ValueError:
1414                    # whoops!  using both together not supported on this platform.
1415                    pass
1416
1417    @unittest.skipIf(
1418        support.is_wasi,
1419        "WASI: symlink following on path_link is not supported"
1420    )
1421    @unittest.skipUnless(
1422        hasattr(os, "link") and os.link in os.supports_dir_fd,
1423        "test needs dir_fd support in os.link()"
1424    )
1425    def test_link_dir_fd(self):
1426        with self.prepare_file() as (dir_fd, name, fullname), \
1427             self.prepare() as (dir_fd2, linkname, fulllinkname):
1428            try:
1429                posix.link(name, linkname, src_dir_fd=dir_fd, dst_dir_fd=dir_fd2)
1430            except PermissionError as e:
1431                self.skipTest('posix.link(): %s' % e)
1432            self.addCleanup(posix.unlink, fulllinkname)
1433            # should have same inodes
1434            self.assertEqual(posix.stat(fullname)[1],
1435                posix.stat(fulllinkname)[1])
1436
1437    @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
1438    def test_mkdir_dir_fd(self):
1439        with self.prepare() as (dir_fd, name, fullname):
1440            posix.mkdir(name, dir_fd=dir_fd)
1441            self.addCleanup(posix.rmdir, fullname)
1442            posix.stat(fullname) # should not raise exception
1443
1444    @unittest.skipUnless(hasattr(os, 'mknod')
1445                         and (os.mknod in os.supports_dir_fd)
1446                         and hasattr(stat, 'S_IFIFO'),
1447                         "test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
1448    def test_mknod_dir_fd(self):
1449        # Test using mknodat() to create a FIFO (the only use specified
1450        # by POSIX).
1451        with self.prepare() as (dir_fd, name, fullname):
1452            mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
1453            try:
1454                posix.mknod(name, mode, 0, dir_fd=dir_fd)
1455            except OSError as e:
1456                # Some old systems don't allow unprivileged users to use
1457                # mknod(), or only support creating device nodes.
1458                self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
1459            else:
1460                self.addCleanup(posix.unlink, fullname)
1461                self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode))
1462
1463    @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
1464    def test_open_dir_fd(self):
1465        with self.prepare() as (dir_fd, name, fullname):
1466            with open(fullname, 'wb') as outfile:
1467                outfile.write(b"testline\n")
1468            self.addCleanup(posix.unlink, fullname)
1469            fd = posix.open(name, posix.O_RDONLY, dir_fd=dir_fd)
1470            try:
1471                res = posix.read(fd, 9)
1472                self.assertEqual(b"testline\n", res)
1473            finally:
1474                posix.close(fd)
1475
1476    @unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd),
1477                         "test needs dir_fd support in os.readlink()")
1478    def test_readlink_dir_fd(self):
1479        with self.prepare() as (dir_fd, name, fullname):
1480            os.symlink('symlink', fullname)
1481            self.addCleanup(posix.unlink, fullname)
1482            self.assertEqual(posix.readlink(name, dir_fd=dir_fd), 'symlink')
1483
1484    @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
1485    def test_rename_dir_fd(self):
1486        with self.prepare_file() as (dir_fd, name, fullname), \
1487             self.prepare() as (dir_fd2, name2, fullname2):
1488            posix.rename(name, name2,
1489                         src_dir_fd=dir_fd, dst_dir_fd=dir_fd2)
1490            posix.stat(fullname2) # should not raise exception
1491            posix.rename(fullname2, fullname)
1492
1493    @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
1494    def test_symlink_dir_fd(self):
1495        with self.prepare() as (dir_fd, name, fullname):
1496            posix.symlink('symlink', name, dir_fd=dir_fd)
1497            self.addCleanup(posix.unlink, fullname)
1498            self.assertEqual(posix.readlink(fullname), 'symlink')
1499
1500    @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
1501    def test_unlink_dir_fd(self):
1502        with self.prepare() as (dir_fd, name, fullname):
1503            os_helper.create_empty_file(fullname)
1504            posix.stat(fullname) # should not raise exception
1505            try:
1506                posix.unlink(name, dir_fd=dir_fd)
1507                self.assertRaises(OSError, posix.stat, fullname)
1508            except:
1509                self.addCleanup(posix.unlink, fullname)
1510                raise
1511
1512    @unittest.skipUnless(hasattr(os, 'mkfifo') and os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
1513    def test_mkfifo_dir_fd(self):
1514        with self.prepare() as (dir_fd, name, fullname):
1515            try:
1516                posix.mkfifo(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd)
1517            except PermissionError as e:
1518                self.skipTest('posix.mkfifo(): %s' % e)
1519            self.addCleanup(posix.unlink, fullname)
1520            self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode))
1521
1522
1523class PosixGroupsTester(unittest.TestCase):
1524
1525    def setUp(self):
1526        if posix.getuid() != 0:
1527            raise unittest.SkipTest("not enough privileges")
1528        if not hasattr(posix, 'getgroups'):
1529            raise unittest.SkipTest("need posix.getgroups")
1530        if sys.platform == 'darwin':
1531            raise unittest.SkipTest("getgroups(2) is broken on OSX")
1532        self.saved_groups = posix.getgroups()
1533
1534    def tearDown(self):
1535        if hasattr(posix, 'setgroups'):
1536            posix.setgroups(self.saved_groups)
1537        elif hasattr(posix, 'initgroups'):
1538            name = pwd.getpwuid(posix.getuid()).pw_name
1539            posix.initgroups(name, self.saved_groups[0])
1540
1541    @unittest.skipUnless(hasattr(posix, 'initgroups'),
1542                         "test needs posix.initgroups()")
1543    def test_initgroups(self):
1544        # find missing group
1545
1546        g = max(self.saved_groups or [0]) + 1
1547        name = pwd.getpwuid(posix.getuid()).pw_name
1548        posix.initgroups(name, g)
1549        self.assertIn(g, posix.getgroups())
1550
1551    @unittest.skipUnless(hasattr(posix, 'setgroups'),
1552                         "test needs posix.setgroups()")
1553    def test_setgroups(self):
1554        for groups in [[0], list(range(16))]:
1555            posix.setgroups(groups)
1556            self.assertListEqual(groups, posix.getgroups())
1557
1558
1559class _PosixSpawnMixin:
1560    # Program which does nothing and exits with status 0 (success)
1561    NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass')
1562    spawn_func = None
1563
1564    def python_args(self, *args):
1565        # Disable site module to avoid side effects. For example,
1566        # on Fedora 28, if the HOME environment variable is not set,
1567        # site._getuserbase() calls pwd.getpwuid() which opens
1568        # /var/lib/sss/mc/passwd but then leaves the file open which makes
1569        # test_close_file() to fail.
1570        return (sys.executable, '-I', '-S', *args)
1571
1572    def test_returns_pid(self):
1573        pidfile = os_helper.TESTFN
1574        self.addCleanup(os_helper.unlink, pidfile)
1575        script = f"""if 1:
1576            import os
1577            with open({pidfile!r}, "w") as pidfile:
1578                pidfile.write(str(os.getpid()))
1579            """
1580        args = self.python_args('-c', script)
1581        pid = self.spawn_func(args[0], args, os.environ)
1582        support.wait_process(pid, exitcode=0)
1583        with open(pidfile, encoding="utf-8") as f:
1584            self.assertEqual(f.read(), str(pid))
1585
1586    def test_no_such_executable(self):
1587        no_such_executable = 'no_such_executable'
1588        try:
1589            pid = self.spawn_func(no_such_executable,
1590                                  [no_such_executable],
1591                                  os.environ)
1592        # bpo-35794: PermissionError can be raised if there are
1593        # directories in the $PATH that are not accessible.
1594        except (FileNotFoundError, PermissionError) as exc:
1595            self.assertEqual(exc.filename, no_such_executable)
1596        else:
1597            pid2, status = os.waitpid(pid, 0)
1598            self.assertEqual(pid2, pid)
1599            self.assertNotEqual(status, 0)
1600
1601    def test_specify_environment(self):
1602        envfile = os_helper.TESTFN
1603        self.addCleanup(os_helper.unlink, envfile)
1604        script = f"""if 1:
1605            import os
1606            with open({envfile!r}, "w", encoding="utf-8") as envfile:
1607                envfile.write(os.environ['foo'])
1608        """
1609        args = self.python_args('-c', script)
1610        pid = self.spawn_func(args[0], args,
1611                              {**os.environ, 'foo': 'bar'})
1612        support.wait_process(pid, exitcode=0)
1613        with open(envfile, encoding="utf-8") as f:
1614            self.assertEqual(f.read(), 'bar')
1615
1616    def test_none_file_actions(self):
1617        pid = self.spawn_func(
1618            self.NOOP_PROGRAM[0],
1619            self.NOOP_PROGRAM,
1620            os.environ,
1621            file_actions=None
1622        )
1623        support.wait_process(pid, exitcode=0)
1624
1625    def test_empty_file_actions(self):
1626        pid = self.spawn_func(
1627            self.NOOP_PROGRAM[0],
1628            self.NOOP_PROGRAM,
1629            os.environ,
1630            file_actions=[]
1631        )
1632        support.wait_process(pid, exitcode=0)
1633
1634    def test_resetids_explicit_default(self):
1635        pid = self.spawn_func(
1636            sys.executable,
1637            [sys.executable, '-c', 'pass'],
1638            os.environ,
1639            resetids=False
1640        )
1641        support.wait_process(pid, exitcode=0)
1642
1643    def test_resetids(self):
1644        pid = self.spawn_func(
1645            sys.executable,
1646            [sys.executable, '-c', 'pass'],
1647            os.environ,
1648            resetids=True
1649        )
1650        support.wait_process(pid, exitcode=0)
1651
1652    def test_resetids_wrong_type(self):
1653        with self.assertRaises(TypeError):
1654            self.spawn_func(sys.executable,
1655                            [sys.executable, "-c", "pass"],
1656                            os.environ, resetids=None)
1657
1658    def test_setpgroup(self):
1659        pid = self.spawn_func(
1660            sys.executable,
1661            [sys.executable, '-c', 'pass'],
1662            os.environ,
1663            setpgroup=os.getpgrp()
1664        )
1665        support.wait_process(pid, exitcode=0)
1666
1667    def test_setpgroup_wrong_type(self):
1668        with self.assertRaises(TypeError):
1669            self.spawn_func(sys.executable,
1670                            [sys.executable, "-c", "pass"],
1671                            os.environ, setpgroup="023")
1672
1673    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1674                           'need signal.pthread_sigmask()')
1675    def test_setsigmask(self):
1676        code = textwrap.dedent("""\
1677            import signal
1678            signal.raise_signal(signal.SIGUSR1)""")
1679
1680        pid = self.spawn_func(
1681            sys.executable,
1682            [sys.executable, '-c', code],
1683            os.environ,
1684            setsigmask=[signal.SIGUSR1]
1685        )
1686        support.wait_process(pid, exitcode=0)
1687
1688    def test_setsigmask_wrong_type(self):
1689        with self.assertRaises(TypeError):
1690            self.spawn_func(sys.executable,
1691                            [sys.executable, "-c", "pass"],
1692                            os.environ, setsigmask=34)
1693        with self.assertRaises(TypeError):
1694            self.spawn_func(sys.executable,
1695                            [sys.executable, "-c", "pass"],
1696                            os.environ, setsigmask=["j"])
1697        with self.assertRaises(ValueError):
1698            self.spawn_func(sys.executable,
1699                            [sys.executable, "-c", "pass"],
1700                            os.environ, setsigmask=[signal.NSIG,
1701                                                    signal.NSIG+1])
1702
1703    def test_setsid(self):
1704        rfd, wfd = os.pipe()
1705        self.addCleanup(os.close, rfd)
1706        try:
1707            os.set_inheritable(wfd, True)
1708
1709            code = textwrap.dedent(f"""
1710                import os
1711                fd = {wfd}
1712                sid = os.getsid(0)
1713                os.write(fd, str(sid).encode())
1714            """)
1715
1716            try:
1717                pid = self.spawn_func(sys.executable,
1718                                      [sys.executable, "-c", code],
1719                                      os.environ, setsid=True)
1720            except NotImplementedError as exc:
1721                self.skipTest(f"setsid is not supported: {exc!r}")
1722            except PermissionError as exc:
1723                self.skipTest(f"setsid failed with: {exc!r}")
1724        finally:
1725            os.close(wfd)
1726
1727        support.wait_process(pid, exitcode=0)
1728
1729        output = os.read(rfd, 100)
1730        child_sid = int(output)
1731        parent_sid = os.getsid(os.getpid())
1732        self.assertNotEqual(parent_sid, child_sid)
1733
1734    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1735                         'need signal.pthread_sigmask()')
1736    def test_setsigdef(self):
1737        original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN)
1738        code = textwrap.dedent("""\
1739            import signal
1740            signal.raise_signal(signal.SIGUSR1)""")
1741        try:
1742            pid = self.spawn_func(
1743                sys.executable,
1744                [sys.executable, '-c', code],
1745                os.environ,
1746                setsigdef=[signal.SIGUSR1]
1747            )
1748        finally:
1749            signal.signal(signal.SIGUSR1, original_handler)
1750
1751        support.wait_process(pid, exitcode=-signal.SIGUSR1)
1752
1753    def test_setsigdef_wrong_type(self):
1754        with self.assertRaises(TypeError):
1755            self.spawn_func(sys.executable,
1756                            [sys.executable, "-c", "pass"],
1757                            os.environ, setsigdef=34)
1758        with self.assertRaises(TypeError):
1759            self.spawn_func(sys.executable,
1760                            [sys.executable, "-c", "pass"],
1761                            os.environ, setsigdef=["j"])
1762        with self.assertRaises(ValueError):
1763            self.spawn_func(sys.executable,
1764                            [sys.executable, "-c", "pass"],
1765                            os.environ, setsigdef=[signal.NSIG, signal.NSIG+1])
1766
1767    @requires_sched
1768    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1769                     "bpo-34685: test can fail on BSD")
1770    def test_setscheduler_only_param(self):
1771        policy = os.sched_getscheduler(0)
1772        priority = os.sched_get_priority_min(policy)
1773        code = textwrap.dedent(f"""\
1774            import os, sys
1775            if os.sched_getscheduler(0) != {policy}:
1776                sys.exit(101)
1777            if os.sched_getparam(0).sched_priority != {priority}:
1778                sys.exit(102)""")
1779        pid = self.spawn_func(
1780            sys.executable,
1781            [sys.executable, '-c', code],
1782            os.environ,
1783            scheduler=(None, os.sched_param(priority))
1784        )
1785        support.wait_process(pid, exitcode=0)
1786
1787    @requires_sched
1788    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1789                     "bpo-34685: test can fail on BSD")
1790    def test_setscheduler_with_policy(self):
1791        policy = os.sched_getscheduler(0)
1792        priority = os.sched_get_priority_min(policy)
1793        code = textwrap.dedent(f"""\
1794            import os, sys
1795            if os.sched_getscheduler(0) != {policy}:
1796                sys.exit(101)
1797            if os.sched_getparam(0).sched_priority != {priority}:
1798                sys.exit(102)""")
1799        pid = self.spawn_func(
1800            sys.executable,
1801            [sys.executable, '-c', code],
1802            os.environ,
1803            scheduler=(policy, os.sched_param(priority))
1804        )
1805        support.wait_process(pid, exitcode=0)
1806
1807    def test_multiple_file_actions(self):
1808        file_actions = [
1809            (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0),
1810            (os.POSIX_SPAWN_CLOSE, 0),
1811            (os.POSIX_SPAWN_DUP2, 1, 4),
1812        ]
1813        pid = self.spawn_func(self.NOOP_PROGRAM[0],
1814                              self.NOOP_PROGRAM,
1815                              os.environ,
1816                              file_actions=file_actions)
1817        support.wait_process(pid, exitcode=0)
1818
1819    def test_bad_file_actions(self):
1820        args = self.NOOP_PROGRAM
1821        with self.assertRaises(TypeError):
1822            self.spawn_func(args[0], args, os.environ,
1823                            file_actions=[None])
1824        with self.assertRaises(TypeError):
1825            self.spawn_func(args[0], args, os.environ,
1826                            file_actions=[()])
1827        with self.assertRaises(TypeError):
1828            self.spawn_func(args[0], args, os.environ,
1829                            file_actions=[(None,)])
1830        with self.assertRaises(TypeError):
1831            self.spawn_func(args[0], args, os.environ,
1832                            file_actions=[(12345,)])
1833        with self.assertRaises(TypeError):
1834            self.spawn_func(args[0], args, os.environ,
1835                            file_actions=[(os.POSIX_SPAWN_CLOSE,)])
1836        with self.assertRaises(TypeError):
1837            self.spawn_func(args[0], args, os.environ,
1838                            file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)])
1839        with self.assertRaises(TypeError):
1840            self.spawn_func(args[0], args, os.environ,
1841                            file_actions=[(os.POSIX_SPAWN_CLOSE, None)])
1842        with self.assertRaises(ValueError):
1843            self.spawn_func(args[0], args, os.environ,
1844                            file_actions=[(os.POSIX_SPAWN_OPEN,
1845                                           3, __file__ + '\0',
1846                                           os.O_RDONLY, 0)])
1847
1848    def test_open_file(self):
1849        outfile = os_helper.TESTFN
1850        self.addCleanup(os_helper.unlink, outfile)
1851        script = """if 1:
1852            import sys
1853            sys.stdout.write("hello")
1854            """
1855        file_actions = [
1856            (os.POSIX_SPAWN_OPEN, 1, outfile,
1857                os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
1858                stat.S_IRUSR | stat.S_IWUSR),
1859        ]
1860        args = self.python_args('-c', script)
1861        pid = self.spawn_func(args[0], args, os.environ,
1862                              file_actions=file_actions)
1863
1864        support.wait_process(pid, exitcode=0)
1865        with open(outfile, encoding="utf-8") as f:
1866            self.assertEqual(f.read(), 'hello')
1867
1868    def test_close_file(self):
1869        closefile = os_helper.TESTFN
1870        self.addCleanup(os_helper.unlink, closefile)
1871        script = f"""if 1:
1872            import os
1873            try:
1874                os.fstat(0)
1875            except OSError as e:
1876                with open({closefile!r}, 'w', encoding='utf-8') as closefile:
1877                    closefile.write('is closed %d' % e.errno)
1878            """
1879        args = self.python_args('-c', script)
1880        pid = self.spawn_func(args[0], args, os.environ,
1881                              file_actions=[(os.POSIX_SPAWN_CLOSE, 0)])
1882
1883        support.wait_process(pid, exitcode=0)
1884        with open(closefile, encoding="utf-8") as f:
1885            self.assertEqual(f.read(), 'is closed %d' % errno.EBADF)
1886
1887    def test_dup2(self):
1888        dupfile = os_helper.TESTFN
1889        self.addCleanup(os_helper.unlink, dupfile)
1890        script = """if 1:
1891            import sys
1892            sys.stdout.write("hello")
1893            """
1894        with open(dupfile, "wb") as childfile:
1895            file_actions = [
1896                (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1),
1897            ]
1898            args = self.python_args('-c', script)
1899            pid = self.spawn_func(args[0], args, os.environ,
1900                                  file_actions=file_actions)
1901            support.wait_process(pid, exitcode=0)
1902        with open(dupfile, encoding="utf-8") as f:
1903            self.assertEqual(f.read(), 'hello')
1904
1905
1906@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn")
1907class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin):
1908    spawn_func = getattr(posix, 'posix_spawn', None)
1909
1910
1911@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp")
1912class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin):
1913    spawn_func = getattr(posix, 'posix_spawnp', None)
1914
1915    @os_helper.skip_unless_symlink
1916    def test_posix_spawnp(self):
1917        # Use a symlink to create a program in its own temporary directory
1918        temp_dir = tempfile.mkdtemp()
1919        self.addCleanup(os_helper.rmtree, temp_dir)
1920
1921        program = 'posix_spawnp_test_program.exe'
1922        program_fullpath = os.path.join(temp_dir, program)
1923        os.symlink(sys.executable, program_fullpath)
1924
1925        try:
1926            path = os.pathsep.join((temp_dir, os.environ['PATH']))
1927        except KeyError:
1928            path = temp_dir   # PATH is not set
1929
1930        spawn_args = (program, '-I', '-S', '-c', 'pass')
1931        code = textwrap.dedent("""
1932            import os
1933            from test import support
1934
1935            args = %a
1936            pid = os.posix_spawnp(args[0], args, os.environ)
1937
1938            support.wait_process(pid, exitcode=0)
1939        """ % (spawn_args,))
1940
1941        # Use a subprocess to test os.posix_spawnp() with a modified PATH
1942        # environment variable: posix_spawnp() uses the current environment
1943        # to locate the program, not its environment argument.
1944        args = ('-c', code)
1945        assert_python_ok(*args, PATH=path)
1946
1947
1948@unittest.skipUnless(sys.platform == "darwin", "test weak linking on macOS")
1949class TestPosixWeaklinking(unittest.TestCase):
1950    # These test cases verify that weak linking support on macOS works
1951    # as expected. These cases only test new behaviour introduced by weak linking,
1952    # regular behaviour is tested by the normal test cases.
1953    #
1954    # See the section on Weak Linking in Mac/README.txt for more information.
1955    def setUp(self):
1956        import sysconfig
1957        import platform
1958
1959        config_vars = sysconfig.get_config_vars()
1960        self.available = { nm for nm in config_vars if nm.startswith("HAVE_") and config_vars[nm] }
1961        self.mac_ver = tuple(int(part) for part in platform.mac_ver()[0].split("."))
1962
1963    def _verify_available(self, name):
1964        if name not in self.available:
1965            raise unittest.SkipTest(f"{name} not weak-linked")
1966
1967    def test_pwritev(self):
1968        self._verify_available("HAVE_PWRITEV")
1969        if self.mac_ver >= (10, 16):
1970            self.assertTrue(hasattr(os, "pwritev"), "os.pwritev is not available")
1971            self.assertTrue(hasattr(os, "preadv"), "os.readv is not available")
1972
1973        else:
1974            self.assertFalse(hasattr(os, "pwritev"), "os.pwritev is available")
1975            self.assertFalse(hasattr(os, "preadv"), "os.readv is available")
1976
1977    def test_stat(self):
1978        self._verify_available("HAVE_FSTATAT")
1979        if self.mac_ver >= (10, 10):
1980            self.assertIn("HAVE_FSTATAT", posix._have_functions)
1981
1982        else:
1983            self.assertNotIn("HAVE_FSTATAT", posix._have_functions)
1984
1985            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1986                os.stat("file", dir_fd=0)
1987
1988    def test_access(self):
1989        self._verify_available("HAVE_FACCESSAT")
1990        if self.mac_ver >= (10, 10):
1991            self.assertIn("HAVE_FACCESSAT", posix._have_functions)
1992
1993        else:
1994            self.assertNotIn("HAVE_FACCESSAT", posix._have_functions)
1995
1996            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1997                os.access("file", os.R_OK, dir_fd=0)
1998
1999            with self.assertRaisesRegex(NotImplementedError, "follow_symlinks unavailable"):
2000                os.access("file", os.R_OK, follow_symlinks=False)
2001
2002            with self.assertRaisesRegex(NotImplementedError, "effective_ids unavailable"):
2003                os.access("file", os.R_OK, effective_ids=True)
2004
2005    def test_chmod(self):
2006        self._verify_available("HAVE_FCHMODAT")
2007        if self.mac_ver >= (10, 10):
2008            self.assertIn("HAVE_FCHMODAT", posix._have_functions)
2009
2010        else:
2011            self.assertNotIn("HAVE_FCHMODAT", posix._have_functions)
2012            self.assertIn("HAVE_LCHMOD", posix._have_functions)
2013
2014            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2015                os.chmod("file", 0o644, dir_fd=0)
2016
2017    def test_chown(self):
2018        self._verify_available("HAVE_FCHOWNAT")
2019        if self.mac_ver >= (10, 10):
2020            self.assertIn("HAVE_FCHOWNAT", posix._have_functions)
2021
2022        else:
2023            self.assertNotIn("HAVE_FCHOWNAT", posix._have_functions)
2024            self.assertIn("HAVE_LCHOWN", posix._have_functions)
2025
2026            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2027                os.chown("file", 0, 0, dir_fd=0)
2028
2029    def test_link(self):
2030        self._verify_available("HAVE_LINKAT")
2031        if self.mac_ver >= (10, 10):
2032            self.assertIn("HAVE_LINKAT", posix._have_functions)
2033
2034        else:
2035            self.assertNotIn("HAVE_LINKAT", posix._have_functions)
2036
2037            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"):
2038                os.link("source", "target",  src_dir_fd=0)
2039
2040            with self.assertRaisesRegex(NotImplementedError, "dst_dir_fd unavailable"):
2041                os.link("source", "target",  dst_dir_fd=0)
2042
2043            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"):
2044                os.link("source", "target",  src_dir_fd=0, dst_dir_fd=0)
2045
2046            # issue 41355: !HAVE_LINKAT code path ignores the follow_symlinks flag
2047            with os_helper.temp_dir() as base_path:
2048                link_path = os.path.join(base_path, "link")
2049                target_path = os.path.join(base_path, "target")
2050                source_path = os.path.join(base_path, "source")
2051
2052                with open(source_path, "w") as fp:
2053                    fp.write("data")
2054
2055                os.symlink("target", link_path)
2056
2057                # Calling os.link should fail in the link(2) call, and
2058                # should not reject *follow_symlinks* (to match the
2059                # behaviour you'd get when building on a platform without
2060                # linkat)
2061                with self.assertRaises(FileExistsError):
2062                    os.link(source_path, link_path, follow_symlinks=True)
2063
2064                with self.assertRaises(FileExistsError):
2065                    os.link(source_path, link_path, follow_symlinks=False)
2066
2067
2068    def test_listdir_scandir(self):
2069        self._verify_available("HAVE_FDOPENDIR")
2070        if self.mac_ver >= (10, 10):
2071            self.assertIn("HAVE_FDOPENDIR", posix._have_functions)
2072
2073        else:
2074            self.assertNotIn("HAVE_FDOPENDIR", posix._have_functions)
2075
2076            with self.assertRaisesRegex(TypeError, "listdir: path should be string, bytes, os.PathLike or None, not int"):
2077                os.listdir(0)
2078
2079            with self.assertRaisesRegex(TypeError, "scandir: path should be string, bytes, os.PathLike or None, not int"):
2080                os.scandir(0)
2081
2082    def test_mkdir(self):
2083        self._verify_available("HAVE_MKDIRAT")
2084        if self.mac_ver >= (10, 10):
2085            self.assertIn("HAVE_MKDIRAT", posix._have_functions)
2086
2087        else:
2088            self.assertNotIn("HAVE_MKDIRAT", posix._have_functions)
2089
2090            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2091                os.mkdir("dir", dir_fd=0)
2092
2093    def test_mkfifo(self):
2094        self._verify_available("HAVE_MKFIFOAT")
2095        if self.mac_ver >= (13, 0):
2096            self.assertIn("HAVE_MKFIFOAT", posix._have_functions)
2097
2098        else:
2099            self.assertNotIn("HAVE_MKFIFOAT", posix._have_functions)
2100
2101            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2102                os.mkfifo("path", dir_fd=0)
2103
2104    def test_mknod(self):
2105        self._verify_available("HAVE_MKNODAT")
2106        if self.mac_ver >= (13, 0):
2107            self.assertIn("HAVE_MKNODAT", posix._have_functions)
2108
2109        else:
2110            self.assertNotIn("HAVE_MKNODAT", posix._have_functions)
2111
2112            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2113                os.mknod("path", dir_fd=0)
2114
2115    def test_rename_replace(self):
2116        self._verify_available("HAVE_RENAMEAT")
2117        if self.mac_ver >= (10, 10):
2118            self.assertIn("HAVE_RENAMEAT", posix._have_functions)
2119
2120        else:
2121            self.assertNotIn("HAVE_RENAMEAT", posix._have_functions)
2122
2123            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2124                os.rename("a", "b", src_dir_fd=0)
2125
2126            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2127                os.rename("a", "b", dst_dir_fd=0)
2128
2129            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2130                os.replace("a", "b", src_dir_fd=0)
2131
2132            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2133                os.replace("a", "b", dst_dir_fd=0)
2134
2135    def test_unlink_rmdir(self):
2136        self._verify_available("HAVE_UNLINKAT")
2137        if self.mac_ver >= (10, 10):
2138            self.assertIn("HAVE_UNLINKAT", posix._have_functions)
2139
2140        else:
2141            self.assertNotIn("HAVE_UNLINKAT", posix._have_functions)
2142
2143            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2144                os.unlink("path", dir_fd=0)
2145
2146            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2147                os.rmdir("path", dir_fd=0)
2148
2149    def test_open(self):
2150        self._verify_available("HAVE_OPENAT")
2151        if self.mac_ver >= (10, 10):
2152            self.assertIn("HAVE_OPENAT", posix._have_functions)
2153
2154        else:
2155            self.assertNotIn("HAVE_OPENAT", posix._have_functions)
2156
2157            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2158                os.open("path", os.O_RDONLY, dir_fd=0)
2159
2160    def test_readlink(self):
2161        self._verify_available("HAVE_READLINKAT")
2162        if self.mac_ver >= (10, 10):
2163            self.assertIn("HAVE_READLINKAT", posix._have_functions)
2164
2165        else:
2166            self.assertNotIn("HAVE_READLINKAT", posix._have_functions)
2167
2168            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2169                os.readlink("path",  dir_fd=0)
2170
2171    def test_symlink(self):
2172        self._verify_available("HAVE_SYMLINKAT")
2173        if self.mac_ver >= (10, 10):
2174            self.assertIn("HAVE_SYMLINKAT", posix._have_functions)
2175
2176        else:
2177            self.assertNotIn("HAVE_SYMLINKAT", posix._have_functions)
2178
2179            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2180                os.symlink("a", "b",  dir_fd=0)
2181
2182    def test_utime(self):
2183        self._verify_available("HAVE_FUTIMENS")
2184        self._verify_available("HAVE_UTIMENSAT")
2185        if self.mac_ver >= (10, 13):
2186            self.assertIn("HAVE_FUTIMENS", posix._have_functions)
2187            self.assertIn("HAVE_UTIMENSAT", posix._have_functions)
2188
2189        else:
2190            self.assertNotIn("HAVE_FUTIMENS", posix._have_functions)
2191            self.assertNotIn("HAVE_UTIMENSAT", posix._have_functions)
2192
2193            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2194                os.utime("path", dir_fd=0)
2195
2196
2197def tearDownModule():
2198    support.reap_children()
2199
2200
2201if __name__ == '__main__':
2202    unittest.main()
2203