1"Test posix functions" 2 3from test import support 4from test.support import import_helper 5from test.support import os_helper 6from test.support import warnings_helper 7from test.support.script_helper import assert_python_ok 8 9# Skip these tests if there is no posix module. 10posix = import_helper.import_module('posix') 11 12import errno 13import sys 14import signal 15import time 16import os 17import platform 18import stat 19import tempfile 20import unittest 21import warnings 22import textwrap 23from contextlib import contextmanager 24 25try: 26 import pwd 27except ImportError: 28 pwd = None 29 30_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), 31 os_helper.TESTFN + '-dummy-symlink') 32 33requires_32b = unittest.skipUnless( 34 # Emscripten/WASI have 32 bits pointers, but support 64 bits syscall args. 35 sys.maxsize < 2**32 and not (support.is_emscripten or support.is_wasi), 36 'test is only meaningful on 32-bit builds' 37) 38 39def _supports_sched(): 40 if not hasattr(posix, 'sched_getscheduler'): 41 return False 42 try: 43 posix.sched_getscheduler(0) 44 except OSError as e: 45 if e.errno == errno.ENOSYS: 46 return False 47 return True 48 49requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API') 50 51 52class PosixTester(unittest.TestCase): 53 54 def setUp(self): 55 # create empty file 56 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 57 with open(os_helper.TESTFN, "wb"): 58 pass 59 self.enterContext(warnings_helper.check_warnings()) 60 warnings.filterwarnings('ignore', '.* potential security risk .*', 61 RuntimeWarning) 62 63 def testNoArgFunctions(self): 64 # test posix functions which take no arguments and have 65 # no side-effects which we need to cleanup (e.g., fork, wait, abort) 66 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", 67 "times", "getloadavg", 68 "getegid", "geteuid", "getgid", "getgroups", 69 "getpid", "getpgrp", "getppid", "getuid", "sync", 70 ] 71 72 for name in NO_ARG_FUNCTIONS: 73 posix_func = getattr(posix, name, None) 74 if posix_func is not None: 75 with self.subTest(name): 76 posix_func() 77 self.assertRaises(TypeError, posix_func, 1) 78 79 @unittest.skipUnless(hasattr(posix, 'getresuid'), 80 'test needs posix.getresuid()') 81 def test_getresuid(self): 82 user_ids = posix.getresuid() 83 self.assertEqual(len(user_ids), 3) 84 for val in user_ids: 85 self.assertGreaterEqual(val, 0) 86 87 @unittest.skipUnless(hasattr(posix, 'getresgid'), 88 'test needs posix.getresgid()') 89 def test_getresgid(self): 90 group_ids = posix.getresgid() 91 self.assertEqual(len(group_ids), 3) 92 for val in group_ids: 93 self.assertGreaterEqual(val, 0) 94 95 @unittest.skipUnless(hasattr(posix, 'setresuid'), 96 'test needs posix.setresuid()') 97 def test_setresuid(self): 98 current_user_ids = posix.getresuid() 99 self.assertIsNone(posix.setresuid(*current_user_ids)) 100 # -1 means don't change that value. 101 self.assertIsNone(posix.setresuid(-1, -1, -1)) 102 103 @unittest.skipUnless(hasattr(posix, 'setresuid'), 104 'test needs posix.setresuid()') 105 def test_setresuid_exception(self): 106 # Don't do this test if someone is silly enough to run us as root. 107 current_user_ids = posix.getresuid() 108 if 0 not in current_user_ids: 109 new_user_ids = (current_user_ids[0]+1, -1, -1) 110 self.assertRaises(OSError, posix.setresuid, *new_user_ids) 111 112 @unittest.skipUnless(hasattr(posix, 'setresgid'), 113 'test needs posix.setresgid()') 114 def test_setresgid(self): 115 current_group_ids = posix.getresgid() 116 self.assertIsNone(posix.setresgid(*current_group_ids)) 117 # -1 means don't change that value. 118 self.assertIsNone(posix.setresgid(-1, -1, -1)) 119 120 @unittest.skipUnless(hasattr(posix, 'setresgid'), 121 'test needs posix.setresgid()') 122 def test_setresgid_exception(self): 123 # Don't do this test if someone is silly enough to run us as root. 124 current_group_ids = posix.getresgid() 125 if 0 not in current_group_ids: 126 new_group_ids = (current_group_ids[0]+1, -1, -1) 127 self.assertRaises(OSError, posix.setresgid, *new_group_ids) 128 129 @unittest.skipUnless(hasattr(posix, 'initgroups'), 130 "test needs os.initgroups()") 131 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()") 132 def test_initgroups(self): 133 # It takes a string and an integer; check that it raises a TypeError 134 # for other argument lists. 135 self.assertRaises(TypeError, posix.initgroups) 136 self.assertRaises(TypeError, posix.initgroups, None) 137 self.assertRaises(TypeError, posix.initgroups, 3, "foo") 138 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) 139 140 # If a non-privileged user invokes it, it should fail with OSError 141 # EPERM. 142 if os.getuid() != 0: 143 try: 144 name = pwd.getpwuid(posix.getuid()).pw_name 145 except KeyError: 146 # the current UID may not have a pwd entry 147 raise unittest.SkipTest("need a pwd entry") 148 try: 149 posix.initgroups(name, 13) 150 except OSError as e: 151 self.assertEqual(e.errno, errno.EPERM) 152 else: 153 self.fail("Expected OSError to be raised by initgroups") 154 155 @unittest.skipUnless(hasattr(posix, 'statvfs'), 156 'test needs posix.statvfs()') 157 def test_statvfs(self): 158 self.assertTrue(posix.statvfs(os.curdir)) 159 160 @unittest.skipUnless(hasattr(posix, 'fstatvfs'), 161 'test needs posix.fstatvfs()') 162 def test_fstatvfs(self): 163 fp = open(os_helper.TESTFN) 164 try: 165 self.assertTrue(posix.fstatvfs(fp.fileno())) 166 self.assertTrue(posix.statvfs(fp.fileno())) 167 finally: 168 fp.close() 169 170 @unittest.skipUnless(hasattr(posix, 'ftruncate'), 171 'test needs posix.ftruncate()') 172 def test_ftruncate(self): 173 fp = open(os_helper.TESTFN, 'w+') 174 try: 175 # we need to have some data to truncate 176 fp.write('test') 177 fp.flush() 178 posix.ftruncate(fp.fileno(), 0) 179 finally: 180 fp.close() 181 182 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") 183 def test_truncate(self): 184 with open(os_helper.TESTFN, 'w') as fp: 185 fp.write('test') 186 fp.flush() 187 posix.truncate(os_helper.TESTFN, 0) 188 189 @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter") 190 @support.requires_fork() 191 def test_fexecve(self): 192 fp = os.open(sys.executable, os.O_RDONLY) 193 try: 194 pid = os.fork() 195 if pid == 0: 196 os.chdir(os.path.split(sys.executable)[0]) 197 posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ) 198 else: 199 support.wait_process(pid, exitcode=0) 200 finally: 201 os.close(fp) 202 203 204 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") 205 @support.requires_fork() 206 def test_waitid(self): 207 pid = os.fork() 208 if pid == 0: 209 os.chdir(os.path.split(sys.executable)[0]) 210 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ) 211 else: 212 res = posix.waitid(posix.P_PID, pid, posix.WEXITED) 213 self.assertEqual(pid, res.si_pid) 214 215 @support.requires_fork() 216 def test_register_at_fork(self): 217 with self.assertRaises(TypeError, msg="Positional args not allowed"): 218 os.register_at_fork(lambda: None) 219 with self.assertRaises(TypeError, msg="Args must be callable"): 220 os.register_at_fork(before=2) 221 with self.assertRaises(TypeError, msg="Args must be callable"): 222 os.register_at_fork(after_in_child="three") 223 with self.assertRaises(TypeError, msg="Args must be callable"): 224 os.register_at_fork(after_in_parent=b"Five") 225 with self.assertRaises(TypeError, msg="Args must not be None"): 226 os.register_at_fork(before=None) 227 with self.assertRaises(TypeError, msg="Args must not be None"): 228 os.register_at_fork(after_in_child=None) 229 with self.assertRaises(TypeError, msg="Args must not be None"): 230 os.register_at_fork(after_in_parent=None) 231 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 232 # Ensure a combination of valid and invalid is an error. 233 os.register_at_fork(before=None, after_in_parent=lambda: 3) 234 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 235 # Ensure a combination of valid and invalid is an error. 236 os.register_at_fork(before=lambda: None, after_in_child='') 237 # We test actual registrations in their own process so as not to 238 # pollute this one. There is no way to unregister for cleanup. 239 code = """if 1: 240 import os 241 242 r, w = os.pipe() 243 fin_r, fin_w = os.pipe() 244 245 os.register_at_fork(before=lambda: os.write(w, b'A')) 246 os.register_at_fork(after_in_parent=lambda: os.write(w, b'C')) 247 os.register_at_fork(after_in_child=lambda: os.write(w, b'E')) 248 os.register_at_fork(before=lambda: os.write(w, b'B'), 249 after_in_parent=lambda: os.write(w, b'D'), 250 after_in_child=lambda: os.write(w, b'F')) 251 252 pid = os.fork() 253 if pid == 0: 254 # At this point, after-forkers have already been executed 255 os.close(w) 256 # Wait for parent to tell us to exit 257 os.read(fin_r, 1) 258 os._exit(0) 259 else: 260 try: 261 os.close(w) 262 with open(r, "rb") as f: 263 data = f.read() 264 assert len(data) == 6, data 265 # Check before-fork callbacks 266 assert data[:2] == b'BA', data 267 # Check after-fork callbacks 268 assert sorted(data[2:]) == list(b'CDEF'), data 269 assert data.index(b'C') < data.index(b'D'), data 270 assert data.index(b'E') < data.index(b'F'), data 271 finally: 272 os.write(fin_w, b'!') 273 """ 274 assert_python_ok('-c', code) 275 276 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") 277 def test_lockf(self): 278 fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT) 279 try: 280 os.write(fd, b'test') 281 os.lseek(fd, 0, os.SEEK_SET) 282 posix.lockf(fd, posix.F_LOCK, 4) 283 # section is locked 284 posix.lockf(fd, posix.F_ULOCK, 4) 285 finally: 286 os.close(fd) 287 288 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") 289 def test_pread(self): 290 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 291 try: 292 os.write(fd, b'test') 293 os.lseek(fd, 0, os.SEEK_SET) 294 self.assertEqual(b'es', posix.pread(fd, 2, 1)) 295 # the first pread() shouldn't disturb the file offset 296 self.assertEqual(b'te', posix.read(fd, 2)) 297 finally: 298 os.close(fd) 299 300 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 301 def test_preadv(self): 302 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 303 try: 304 os.write(fd, b'test1tt2t3t5t6t6t8') 305 buf = [bytearray(i) for i in [5, 3, 2]] 306 self.assertEqual(posix.preadv(fd, buf, 3), 10) 307 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 308 finally: 309 os.close(fd) 310 311 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 312 @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI") 313 def test_preadv_flags(self): 314 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 315 try: 316 os.write(fd, b'test1tt2t3t5t6t6t8') 317 buf = [bytearray(i) for i in [5, 3, 2]] 318 self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10) 319 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 320 except NotImplementedError: 321 self.skipTest("preadv2 not available") 322 except OSError as inst: 323 # Is possible that the macro RWF_HIPRI was defined at compilation time 324 # but the option is not supported by the kernel or the runtime libc shared 325 # library. 326 if inst.errno in {errno.EINVAL, errno.ENOTSUP}: 327 raise unittest.SkipTest("RWF_HIPRI is not supported by the current system") 328 else: 329 raise 330 finally: 331 os.close(fd) 332 333 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 334 @requires_32b 335 def test_preadv_overflow_32bits(self): 336 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 337 try: 338 buf = [bytearray(2**16)] * 2**15 339 with self.assertRaises(OSError) as cm: 340 os.preadv(fd, buf, 0) 341 self.assertEqual(cm.exception.errno, errno.EINVAL) 342 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 343 finally: 344 os.close(fd) 345 346 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") 347 def test_pwrite(self): 348 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 349 try: 350 os.write(fd, b'test') 351 os.lseek(fd, 0, os.SEEK_SET) 352 posix.pwrite(fd, b'xx', 1) 353 self.assertEqual(b'txxt', posix.read(fd, 4)) 354 finally: 355 os.close(fd) 356 357 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 358 def test_pwritev(self): 359 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 360 try: 361 os.write(fd, b"xx") 362 os.lseek(fd, 0, os.SEEK_SET) 363 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2) 364 self.assertEqual(n, 10) 365 366 os.lseek(fd, 0, os.SEEK_SET) 367 self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100)) 368 finally: 369 os.close(fd) 370 371 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 372 @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC") 373 def test_pwritev_flags(self): 374 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 375 try: 376 os.write(fd,b"xx") 377 os.lseek(fd, 0, os.SEEK_SET) 378 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC) 379 self.assertEqual(n, 10) 380 381 os.lseek(fd, 0, os.SEEK_SET) 382 self.assertEqual(b'xxtest1tt2', posix.read(fd, 100)) 383 finally: 384 os.close(fd) 385 386 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 387 @requires_32b 388 def test_pwritev_overflow_32bits(self): 389 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 390 try: 391 with self.assertRaises(OSError) as cm: 392 os.pwritev(fd, [b"x" * 2**16] * 2**15, 0) 393 self.assertEqual(cm.exception.errno, errno.EINVAL) 394 finally: 395 os.close(fd) 396 397 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 398 "test needs posix.posix_fallocate()") 399 def test_posix_fallocate(self): 400 fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT) 401 try: 402 posix.posix_fallocate(fd, 0, 10) 403 except OSError as inst: 404 # issue10812, ZFS doesn't appear to support posix_fallocate, 405 # so skip Solaris-based since they are likely to have ZFS. 406 # issue33655: Also ignore EINVAL on *BSD since ZFS is also 407 # often used there. 408 if inst.errno == errno.EINVAL and sys.platform.startswith( 409 ('sunos', 'freebsd', 'netbsd', 'openbsd', 'gnukfreebsd')): 410 raise unittest.SkipTest("test may fail on ZFS filesystems") 411 else: 412 raise 413 finally: 414 os.close(fd) 415 416 # issue31106 - posix_fallocate() does not set error in errno. 417 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 418 "test needs posix.posix_fallocate()") 419 def test_posix_fallocate_errno(self): 420 try: 421 posix.posix_fallocate(-42, 0, 10) 422 except OSError as inst: 423 if inst.errno != errno.EBADF: 424 raise 425 426 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 427 "test needs posix.posix_fadvise()") 428 def test_posix_fadvise(self): 429 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 430 try: 431 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) 432 finally: 433 os.close(fd) 434 435 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 436 "test needs posix.posix_fadvise()") 437 def test_posix_fadvise_errno(self): 438 try: 439 posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED) 440 except OSError as inst: 441 if inst.errno != errno.EBADF: 442 raise 443 444 @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime") 445 def test_utime_with_fd(self): 446 now = time.time() 447 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 448 try: 449 posix.utime(fd) 450 posix.utime(fd, None) 451 self.assertRaises(TypeError, posix.utime, fd, (None, None)) 452 self.assertRaises(TypeError, posix.utime, fd, (now, None)) 453 self.assertRaises(TypeError, posix.utime, fd, (None, now)) 454 posix.utime(fd, (int(now), int(now))) 455 posix.utime(fd, (now, now)) 456 self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now)) 457 self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None)) 458 self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0)) 459 posix.utime(fd, (int(now), int((now - int(now)) * 1e9))) 460 posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9))) 461 462 finally: 463 os.close(fd) 464 465 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime") 466 def test_utime_nofollow_symlinks(self): 467 now = time.time() 468 posix.utime(os_helper.TESTFN, None, follow_symlinks=False) 469 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 470 (None, None), follow_symlinks=False) 471 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 472 (now, None), follow_symlinks=False) 473 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 474 (None, now), follow_symlinks=False) 475 posix.utime(os_helper.TESTFN, (int(now), int(now)), 476 follow_symlinks=False) 477 posix.utime(os_helper.TESTFN, (now, now), follow_symlinks=False) 478 posix.utime(os_helper.TESTFN, follow_symlinks=False) 479 480 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 481 def test_writev(self): 482 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 483 try: 484 n = os.writev(fd, (b'test1', b'tt2', b't3')) 485 self.assertEqual(n, 10) 486 487 os.lseek(fd, 0, os.SEEK_SET) 488 self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) 489 490 # Issue #20113: empty list of buffers should not crash 491 try: 492 size = posix.writev(fd, []) 493 except OSError: 494 # writev(fd, []) raises OSError(22, "Invalid argument") 495 # on OpenIndiana 496 pass 497 else: 498 self.assertEqual(size, 0) 499 finally: 500 os.close(fd) 501 502 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 503 @requires_32b 504 def test_writev_overflow_32bits(self): 505 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 506 try: 507 with self.assertRaises(OSError) as cm: 508 os.writev(fd, [b"x" * 2**16] * 2**15) 509 self.assertEqual(cm.exception.errno, errno.EINVAL) 510 finally: 511 os.close(fd) 512 513 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 514 def test_readv(self): 515 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 516 try: 517 os.write(fd, b'test1tt2t3') 518 os.lseek(fd, 0, os.SEEK_SET) 519 buf = [bytearray(i) for i in [5, 3, 2]] 520 self.assertEqual(posix.readv(fd, buf), 10) 521 self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) 522 523 # Issue #20113: empty list of buffers should not crash 524 try: 525 size = posix.readv(fd, []) 526 except OSError: 527 # readv(fd, []) raises OSError(22, "Invalid argument") 528 # on OpenIndiana 529 pass 530 else: 531 self.assertEqual(size, 0) 532 finally: 533 os.close(fd) 534 535 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 536 @requires_32b 537 def test_readv_overflow_32bits(self): 538 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 539 try: 540 buf = [bytearray(2**16)] * 2**15 541 with self.assertRaises(OSError) as cm: 542 os.readv(fd, buf) 543 self.assertEqual(cm.exception.errno, errno.EINVAL) 544 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 545 finally: 546 os.close(fd) 547 548 @unittest.skipUnless(hasattr(posix, 'dup'), 549 'test needs posix.dup()') 550 @unittest.skipIf(support.is_wasi, "WASI does not have dup()") 551 def test_dup(self): 552 fp = open(os_helper.TESTFN) 553 try: 554 fd = posix.dup(fp.fileno()) 555 self.assertIsInstance(fd, int) 556 os.close(fd) 557 finally: 558 fp.close() 559 560 @unittest.skipUnless(hasattr(posix, 'confstr'), 561 'test needs posix.confstr()') 562 def test_confstr(self): 563 self.assertRaises(ValueError, posix.confstr, "CS_garbage") 564 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) 565 566 @unittest.skipUnless(hasattr(posix, 'dup2'), 567 'test needs posix.dup2()') 568 @unittest.skipIf(support.is_wasi, "WASI does not have dup2()") 569 def test_dup2(self): 570 fp1 = open(os_helper.TESTFN) 571 fp2 = open(os_helper.TESTFN) 572 try: 573 posix.dup2(fp1.fileno(), fp2.fileno()) 574 finally: 575 fp1.close() 576 fp2.close() 577 578 @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC") 579 @support.requires_linux_version(2, 6, 23) 580 @support.requires_subprocess() 581 def test_oscloexec(self): 582 fd = os.open(os_helper.TESTFN, os.O_RDONLY|os.O_CLOEXEC) 583 self.addCleanup(os.close, fd) 584 self.assertFalse(os.get_inheritable(fd)) 585 586 @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'), 587 'test needs posix.O_EXLOCK') 588 def test_osexlock(self): 589 fd = os.open(os_helper.TESTFN, 590 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) 591 self.assertRaises(OSError, os.open, os_helper.TESTFN, 592 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 593 os.close(fd) 594 595 if hasattr(posix, "O_SHLOCK"): 596 fd = os.open(os_helper.TESTFN, 597 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 598 self.assertRaises(OSError, os.open, os_helper.TESTFN, 599 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 600 os.close(fd) 601 602 @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'), 603 'test needs posix.O_SHLOCK') 604 def test_osshlock(self): 605 fd1 = os.open(os_helper.TESTFN, 606 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 607 fd2 = os.open(os_helper.TESTFN, 608 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 609 os.close(fd2) 610 os.close(fd1) 611 612 if hasattr(posix, "O_EXLOCK"): 613 fd = os.open(os_helper.TESTFN, 614 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 615 self.assertRaises(OSError, os.open, os_helper.TESTFN, 616 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) 617 os.close(fd) 618 619 @unittest.skipUnless(hasattr(posix, 'fstat'), 620 'test needs posix.fstat()') 621 def test_fstat(self): 622 fp = open(os_helper.TESTFN) 623 try: 624 self.assertTrue(posix.fstat(fp.fileno())) 625 self.assertTrue(posix.stat(fp.fileno())) 626 627 self.assertRaisesRegex(TypeError, 628 'should be string, bytes, os.PathLike or integer, not', 629 posix.stat, float(fp.fileno())) 630 finally: 631 fp.close() 632 633 def test_stat(self): 634 self.assertTrue(posix.stat(os_helper.TESTFN)) 635 self.assertTrue(posix.stat(os.fsencode(os_helper.TESTFN))) 636 637 self.assertWarnsRegex(DeprecationWarning, 638 'should be string, bytes, os.PathLike or integer, not', 639 posix.stat, bytearray(os.fsencode(os_helper.TESTFN))) 640 self.assertRaisesRegex(TypeError, 641 'should be string, bytes, os.PathLike or integer, not', 642 posix.stat, None) 643 self.assertRaisesRegex(TypeError, 644 'should be string, bytes, os.PathLike or integer, not', 645 posix.stat, list(os_helper.TESTFN)) 646 self.assertRaisesRegex(TypeError, 647 'should be string, bytes, os.PathLike or integer, not', 648 posix.stat, list(os.fsencode(os_helper.TESTFN))) 649 650 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()") 651 def test_mkfifo(self): 652 if sys.platform == "vxworks": 653 fifo_path = os.path.join("/fifos/", os_helper.TESTFN) 654 else: 655 fifo_path = os_helper.TESTFN 656 os_helper.unlink(fifo_path) 657 self.addCleanup(os_helper.unlink, fifo_path) 658 try: 659 posix.mkfifo(fifo_path, stat.S_IRUSR | stat.S_IWUSR) 660 except PermissionError as e: 661 self.skipTest('posix.mkfifo(): %s' % e) 662 self.assertTrue(stat.S_ISFIFO(posix.stat(fifo_path).st_mode)) 663 664 @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'), 665 "don't have mknod()/S_IFIFO") 666 def test_mknod(self): 667 # Test using mknod() to create a FIFO (the only use specified 668 # by POSIX). 669 os_helper.unlink(os_helper.TESTFN) 670 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 671 try: 672 posix.mknod(os_helper.TESTFN, mode, 0) 673 except OSError as e: 674 # Some old systems don't allow unprivileged users to use 675 # mknod(), or only support creating device nodes. 676 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 677 else: 678 self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode)) 679 680 # Keyword arguments are also supported 681 os_helper.unlink(os_helper.TESTFN) 682 try: 683 posix.mknod(path=os_helper.TESTFN, mode=mode, device=0, 684 dir_fd=None) 685 except OSError as e: 686 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 687 688 @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()') 689 def test_makedev(self): 690 st = posix.stat(os_helper.TESTFN) 691 dev = st.st_dev 692 self.assertIsInstance(dev, int) 693 self.assertGreaterEqual(dev, 0) 694 695 major = posix.major(dev) 696 self.assertIsInstance(major, int) 697 self.assertGreaterEqual(major, 0) 698 self.assertEqual(posix.major(dev), major) 699 self.assertRaises(TypeError, posix.major, float(dev)) 700 self.assertRaises(TypeError, posix.major) 701 self.assertRaises((ValueError, OverflowError), posix.major, -1) 702 703 minor = posix.minor(dev) 704 self.assertIsInstance(minor, int) 705 self.assertGreaterEqual(minor, 0) 706 self.assertEqual(posix.minor(dev), minor) 707 self.assertRaises(TypeError, posix.minor, float(dev)) 708 self.assertRaises(TypeError, posix.minor) 709 self.assertRaises((ValueError, OverflowError), posix.minor, -1) 710 711 self.assertEqual(posix.makedev(major, minor), dev) 712 self.assertRaises(TypeError, posix.makedev, float(major), minor) 713 self.assertRaises(TypeError, posix.makedev, major, float(minor)) 714 self.assertRaises(TypeError, posix.makedev, major) 715 self.assertRaises(TypeError, posix.makedev) 716 717 def _test_all_chown_common(self, chown_func, first_param, stat_func): 718 """Common code for chown, fchown and lchown tests.""" 719 def check_stat(uid, gid): 720 if stat_func is not None: 721 stat = stat_func(first_param) 722 self.assertEqual(stat.st_uid, uid) 723 self.assertEqual(stat.st_gid, gid) 724 uid = os.getuid() 725 gid = os.getgid() 726 # test a successful chown call 727 chown_func(first_param, uid, gid) 728 check_stat(uid, gid) 729 chown_func(first_param, -1, gid) 730 check_stat(uid, gid) 731 chown_func(first_param, uid, -1) 732 check_stat(uid, gid) 733 734 if sys.platform == "vxworks": 735 # On VxWorks, root user id is 1 and 0 means no login user: 736 # both are super users. 737 is_root = (uid in (0, 1)) 738 else: 739 is_root = (uid == 0) 740 if support.is_emscripten: 741 # Emscripten getuid() / geteuid() always return 0 (root), but 742 # cannot chown uid/gid to random value. 743 pass 744 elif is_root: 745 # Try an amusingly large uid/gid to make sure we handle 746 # large unsigned values. (chown lets you use any 747 # uid/gid you like, even if they aren't defined.) 748 # 749 # On VxWorks uid_t is defined as unsigned short. A big 750 # value greater than 65535 will result in underflow error. 751 # 752 # This problem keeps coming up: 753 # http://bugs.python.org/issue1747858 754 # http://bugs.python.org/issue4591 755 # http://bugs.python.org/issue15301 756 # Hopefully the fix in 4591 fixes it for good! 757 # 758 # This part of the test only runs when run as root. 759 # Only scary people run their tests as root. 760 761 big_value = (2**31 if sys.platform != "vxworks" else 2**15) 762 chown_func(first_param, big_value, big_value) 763 check_stat(big_value, big_value) 764 chown_func(first_param, -1, -1) 765 check_stat(big_value, big_value) 766 chown_func(first_param, uid, gid) 767 check_stat(uid, gid) 768 elif platform.system() in ('HP-UX', 'SunOS'): 769 # HP-UX and Solaris can allow a non-root user to chown() to root 770 # (issue #5113) 771 raise unittest.SkipTest("Skipping because of non-standard chown() " 772 "behavior") 773 else: 774 # non-root cannot chown to root, raises OSError 775 self.assertRaises(OSError, chown_func, first_param, 0, 0) 776 check_stat(uid, gid) 777 self.assertRaises(OSError, chown_func, first_param, 0, -1) 778 check_stat(uid, gid) 779 if 0 not in os.getgroups(): 780 self.assertRaises(OSError, chown_func, first_param, -1, 0) 781 check_stat(uid, gid) 782 # test illegal types 783 for t in str, float: 784 self.assertRaises(TypeError, chown_func, first_param, t(uid), gid) 785 check_stat(uid, gid) 786 self.assertRaises(TypeError, chown_func, first_param, uid, t(gid)) 787 check_stat(uid, gid) 788 789 @os_helper.skip_unless_working_chmod 790 @unittest.skipIf(support.is_emscripten, "getgid() is a stub") 791 def test_chown(self): 792 # raise an OSError if the file does not exist 793 os.unlink(os_helper.TESTFN) 794 self.assertRaises(OSError, posix.chown, os_helper.TESTFN, -1, -1) 795 796 # re-create the file 797 os_helper.create_empty_file(os_helper.TESTFN) 798 self._test_all_chown_common(posix.chown, os_helper.TESTFN, posix.stat) 799 800 @os_helper.skip_unless_working_chmod 801 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") 802 @unittest.skipIf(support.is_emscripten, "getgid() is a stub") 803 def test_fchown(self): 804 os.unlink(os_helper.TESTFN) 805 806 # re-create the file 807 test_file = open(os_helper.TESTFN, 'w') 808 try: 809 fd = test_file.fileno() 810 self._test_all_chown_common(posix.fchown, fd, 811 getattr(posix, 'fstat', None)) 812 finally: 813 test_file.close() 814 815 @os_helper.skip_unless_working_chmod 816 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()") 817 def test_lchown(self): 818 os.unlink(os_helper.TESTFN) 819 # create a symlink 820 os.symlink(_DUMMY_SYMLINK, os_helper.TESTFN) 821 self._test_all_chown_common(posix.lchown, os_helper.TESTFN, 822 getattr(posix, 'lstat', None)) 823 824 @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()') 825 def test_chdir(self): 826 posix.chdir(os.curdir) 827 self.assertRaises(OSError, posix.chdir, os_helper.TESTFN) 828 829 def test_listdir(self): 830 self.assertIn(os_helper.TESTFN, posix.listdir(os.curdir)) 831 832 def test_listdir_default(self): 833 # When listdir is called without argument, 834 # it's the same as listdir(os.curdir). 835 self.assertIn(os_helper.TESTFN, posix.listdir()) 836 837 def test_listdir_bytes(self): 838 # When listdir is called with a bytes object, 839 # the returned strings are of type bytes. 840 self.assertIn(os.fsencode(os_helper.TESTFN), posix.listdir(b'.')) 841 842 def test_listdir_bytes_like(self): 843 for cls in bytearray, memoryview: 844 with self.assertWarns(DeprecationWarning): 845 names = posix.listdir(cls(b'.')) 846 self.assertIn(os.fsencode(os_helper.TESTFN), names) 847 for name in names: 848 self.assertIs(type(name), bytes) 849 850 @unittest.skipUnless(posix.listdir in os.supports_fd, 851 "test needs fd support for posix.listdir()") 852 def test_listdir_fd(self): 853 f = posix.open(posix.getcwd(), posix.O_RDONLY) 854 self.addCleanup(posix.close, f) 855 self.assertEqual( 856 sorted(posix.listdir('.')), 857 sorted(posix.listdir(f)) 858 ) 859 # Check that the fd offset was reset (issue #13739) 860 self.assertEqual( 861 sorted(posix.listdir('.')), 862 sorted(posix.listdir(f)) 863 ) 864 865 @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()') 866 def test_access(self): 867 self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK)) 868 869 @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()') 870 def test_umask(self): 871 old_mask = posix.umask(0) 872 self.assertIsInstance(old_mask, int) 873 posix.umask(old_mask) 874 875 @unittest.skipUnless(hasattr(posix, 'strerror'), 876 'test needs posix.strerror()') 877 def test_strerror(self): 878 self.assertTrue(posix.strerror(0)) 879 880 @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()') 881 def test_pipe(self): 882 reader, writer = posix.pipe() 883 os.close(reader) 884 os.close(writer) 885 886 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 887 @support.requires_linux_version(2, 6, 27) 888 def test_pipe2(self): 889 self.assertRaises(TypeError, os.pipe2, 'DEADBEEF') 890 self.assertRaises(TypeError, os.pipe2, 0, 0) 891 892 # try calling with flags = 0, like os.pipe() 893 r, w = os.pipe2(0) 894 os.close(r) 895 os.close(w) 896 897 # test flags 898 r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK) 899 self.addCleanup(os.close, r) 900 self.addCleanup(os.close, w) 901 self.assertFalse(os.get_inheritable(r)) 902 self.assertFalse(os.get_inheritable(w)) 903 self.assertFalse(os.get_blocking(r)) 904 self.assertFalse(os.get_blocking(w)) 905 # try reading from an empty pipe: this should fail, not block 906 self.assertRaises(OSError, os.read, r, 1) 907 # try a write big enough to fill-up the pipe: this should either 908 # fail or perform a partial write, not block 909 try: 910 os.write(w, b'x' * support.PIPE_MAX_SIZE) 911 except OSError: 912 pass 913 914 @support.cpython_only 915 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 916 @support.requires_linux_version(2, 6, 27) 917 def test_pipe2_c_limits(self): 918 # Issue 15989 919 import _testcapi 920 self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1) 921 self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1) 922 923 @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()') 924 def test_utime(self): 925 now = time.time() 926 posix.utime(os_helper.TESTFN, None) 927 self.assertRaises(TypeError, posix.utime, 928 os_helper.TESTFN, (None, None)) 929 self.assertRaises(TypeError, posix.utime, 930 os_helper.TESTFN, (now, None)) 931 self.assertRaises(TypeError, posix.utime, 932 os_helper.TESTFN, (None, now)) 933 posix.utime(os_helper.TESTFN, (int(now), int(now))) 934 posix.utime(os_helper.TESTFN, (now, now)) 935 936 def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): 937 st = os.stat(target_file) 938 self.assertTrue(hasattr(st, 'st_flags')) 939 940 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 941 flags = st.st_flags | stat.UF_IMMUTABLE 942 try: 943 chflags_func(target_file, flags, **kwargs) 944 except OSError as err: 945 if err.errno != errno.EOPNOTSUPP: 946 raise 947 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 948 self.skipTest(msg) 949 950 try: 951 new_st = os.stat(target_file) 952 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) 953 try: 954 fd = open(target_file, 'w+') 955 except OSError as e: 956 self.assertEqual(e.errno, errno.EPERM) 957 finally: 958 posix.chflags(target_file, st.st_flags) 959 960 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()') 961 def test_chflags(self): 962 self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN) 963 964 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 965 def test_lchflags_regular_file(self): 966 self._test_chflags_regular_file(posix.lchflags, os_helper.TESTFN) 967 self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN, 968 follow_symlinks=False) 969 970 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 971 def test_lchflags_symlink(self): 972 testfn_st = os.stat(os_helper.TESTFN) 973 974 self.assertTrue(hasattr(testfn_st, 'st_flags')) 975 976 self.addCleanup(os_helper.unlink, _DUMMY_SYMLINK) 977 os.symlink(os_helper.TESTFN, _DUMMY_SYMLINK) 978 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 979 980 def chflags_nofollow(path, flags): 981 return posix.chflags(path, flags, follow_symlinks=False) 982 983 for fn in (posix.lchflags, chflags_nofollow): 984 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 985 flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE 986 try: 987 fn(_DUMMY_SYMLINK, flags) 988 except OSError as err: 989 if err.errno != errno.EOPNOTSUPP: 990 raise 991 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 992 self.skipTest(msg) 993 try: 994 new_testfn_st = os.stat(os_helper.TESTFN) 995 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 996 997 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) 998 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, 999 new_dummy_symlink_st.st_flags) 1000 finally: 1001 fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) 1002 1003 def test_environ(self): 1004 if os.name == "nt": 1005 item_type = str 1006 else: 1007 item_type = bytes 1008 for k, v in posix.environ.items(): 1009 self.assertEqual(type(k), item_type) 1010 self.assertEqual(type(v), item_type) 1011 1012 def test_putenv(self): 1013 with self.assertRaises(ValueError): 1014 os.putenv('FRUIT\0VEGETABLE', 'cabbage') 1015 with self.assertRaises(ValueError): 1016 os.putenv(b'FRUIT\0VEGETABLE', b'cabbage') 1017 with self.assertRaises(ValueError): 1018 os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage') 1019 with self.assertRaises(ValueError): 1020 os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage') 1021 with self.assertRaises(ValueError): 1022 os.putenv('FRUIT=ORANGE', 'lemon') 1023 with self.assertRaises(ValueError): 1024 os.putenv(b'FRUIT=ORANGE', b'lemon') 1025 1026 @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()') 1027 def test_getcwd_long_pathnames(self): 1028 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' 1029 curdir = os.getcwd() 1030 base_path = os.path.abspath(os_helper.TESTFN) + '.getcwd' 1031 1032 try: 1033 os.mkdir(base_path) 1034 os.chdir(base_path) 1035 except: 1036 # Just returning nothing instead of the SkipTest exception, because 1037 # the test results in Error in that case. Is that ok? 1038 # raise unittest.SkipTest("cannot create directory for testing") 1039 return 1040 1041 def _create_and_do_getcwd(dirname, current_path_length = 0): 1042 try: 1043 os.mkdir(dirname) 1044 except: 1045 raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test") 1046 1047 os.chdir(dirname) 1048 try: 1049 os.getcwd() 1050 if current_path_length < 1027: 1051 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) 1052 finally: 1053 os.chdir('..') 1054 os.rmdir(dirname) 1055 1056 _create_and_do_getcwd(dirname) 1057 1058 finally: 1059 os.chdir(curdir) 1060 os_helper.rmtree(base_path) 1061 1062 @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()") 1063 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()") 1064 @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()") 1065 def test_getgrouplist(self): 1066 user = pwd.getpwuid(os.getuid())[0] 1067 group = pwd.getpwuid(os.getuid())[3] 1068 self.assertIn(group, posix.getgrouplist(user, group)) 1069 1070 1071 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") 1072 @unittest.skipUnless(hasattr(os, 'popen'), "test needs os.popen()") 1073 @support.requires_subprocess() 1074 def test_getgroups(self): 1075 with os.popen('id -G 2>/dev/null') as idg: 1076 groups = idg.read().strip() 1077 ret = idg.close() 1078 1079 try: 1080 idg_groups = set(int(g) for g in groups.split()) 1081 except ValueError: 1082 idg_groups = set() 1083 if ret is not None or not idg_groups: 1084 raise unittest.SkipTest("need working 'id -G'") 1085 1086 # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups() 1087 if sys.platform == 'darwin': 1088 import sysconfig 1089 dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.3' 1090 if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6): 1091 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6") 1092 1093 # 'id -G' and 'os.getgroups()' should return the same 1094 # groups, ignoring order, duplicates, and the effective gid. 1095 # #10822/#26944 - It is implementation defined whether 1096 # posix.getgroups() includes the effective gid. 1097 symdiff = idg_groups.symmetric_difference(posix.getgroups()) 1098 self.assertTrue(not symdiff or symdiff == {posix.getegid()}) 1099 1100 @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal') 1101 @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result") 1102 def test_cld_xxxx_constants(self): 1103 os.CLD_EXITED 1104 os.CLD_KILLED 1105 os.CLD_DUMPED 1106 os.CLD_TRAPPED 1107 os.CLD_STOPPED 1108 os.CLD_CONTINUED 1109 1110 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), 1111 "don't have scheduling support") 1112 requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'), 1113 "don't have sched affinity support") 1114 1115 @requires_sched_h 1116 def test_sched_yield(self): 1117 # This has no error conditions (at least on Linux). 1118 posix.sched_yield() 1119 1120 @requires_sched_h 1121 @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'), 1122 "requires sched_get_priority_max()") 1123 def test_sched_priority(self): 1124 # Round-robin usually has interesting priorities. 1125 pol = posix.SCHED_RR 1126 lo = posix.sched_get_priority_min(pol) 1127 hi = posix.sched_get_priority_max(pol) 1128 self.assertIsInstance(lo, int) 1129 self.assertIsInstance(hi, int) 1130 self.assertGreaterEqual(hi, lo) 1131 # OSX evidently just returns 15 without checking the argument. 1132 if sys.platform != "darwin": 1133 self.assertRaises(OSError, posix.sched_get_priority_min, -23) 1134 self.assertRaises(OSError, posix.sched_get_priority_max, -23) 1135 1136 @requires_sched 1137 def test_get_and_set_scheduler_and_param(self): 1138 possible_schedulers = [sched for name, sched in posix.__dict__.items() 1139 if name.startswith("SCHED_")] 1140 mine = posix.sched_getscheduler(0) 1141 self.assertIn(mine, possible_schedulers) 1142 try: 1143 parent = posix.sched_getscheduler(os.getppid()) 1144 except OSError as e: 1145 if e.errno != errno.EPERM: 1146 raise 1147 else: 1148 self.assertIn(parent, possible_schedulers) 1149 self.assertRaises(OSError, posix.sched_getscheduler, -1) 1150 self.assertRaises(OSError, posix.sched_getparam, -1) 1151 param = posix.sched_getparam(0) 1152 self.assertIsInstance(param.sched_priority, int) 1153 1154 # POSIX states that calling sched_setparam() or sched_setscheduler() on 1155 # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR 1156 # is implementation-defined: NetBSD and FreeBSD can return EINVAL. 1157 if not sys.platform.startswith(('freebsd', 'netbsd')): 1158 try: 1159 posix.sched_setscheduler(0, mine, param) 1160 posix.sched_setparam(0, param) 1161 except OSError as e: 1162 if e.errno != errno.EPERM: 1163 raise 1164 self.assertRaises(OSError, posix.sched_setparam, -1, param) 1165 1166 self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param) 1167 self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None) 1168 self.assertRaises(TypeError, posix.sched_setparam, 0, 43) 1169 param = posix.sched_param(None) 1170 self.assertRaises(TypeError, posix.sched_setparam, 0, param) 1171 large = 214748364700 1172 param = posix.sched_param(large) 1173 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1174 param = posix.sched_param(sched_priority=-large) 1175 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1176 1177 @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function") 1178 def test_sched_rr_get_interval(self): 1179 try: 1180 interval = posix.sched_rr_get_interval(0) 1181 except OSError as e: 1182 # This likely means that sched_rr_get_interval is only valid for 1183 # processes with the SCHED_RR scheduler in effect. 1184 if e.errno != errno.EINVAL: 1185 raise 1186 self.skipTest("only works on SCHED_RR processes") 1187 self.assertIsInstance(interval, float) 1188 # Reasonable constraints, I think. 1189 self.assertGreaterEqual(interval, 0.) 1190 self.assertLess(interval, 1.) 1191 1192 @requires_sched_affinity 1193 def test_sched_getaffinity(self): 1194 mask = posix.sched_getaffinity(0) 1195 self.assertIsInstance(mask, set) 1196 self.assertGreaterEqual(len(mask), 1) 1197 if not sys.platform.startswith("freebsd"): 1198 # bpo-47205: does not raise OSError on FreeBSD 1199 self.assertRaises(OSError, posix.sched_getaffinity, -1) 1200 for cpu in mask: 1201 self.assertIsInstance(cpu, int) 1202 self.assertGreaterEqual(cpu, 0) 1203 self.assertLess(cpu, 1 << 32) 1204 1205 @requires_sched_affinity 1206 def test_sched_setaffinity(self): 1207 mask = posix.sched_getaffinity(0) 1208 if len(mask) > 1: 1209 # Empty masks are forbidden 1210 mask.pop() 1211 posix.sched_setaffinity(0, mask) 1212 self.assertEqual(posix.sched_getaffinity(0), mask) 1213 self.assertRaises(OSError, posix.sched_setaffinity, 0, []) 1214 self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10]) 1215 self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X")) 1216 self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128]) 1217 if not sys.platform.startswith("freebsd"): 1218 # bpo-47205: does not raise OSError on FreeBSD 1219 self.assertRaises(OSError, posix.sched_setaffinity, -1, mask) 1220 1221 @unittest.skipIf(support.is_wasi, "No dynamic linking on WASI") 1222 def test_rtld_constants(self): 1223 # check presence of major RTLD_* constants 1224 posix.RTLD_LAZY 1225 posix.RTLD_NOW 1226 posix.RTLD_GLOBAL 1227 posix.RTLD_LOCAL 1228 1229 @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'), 1230 "test needs an OS that reports file holes") 1231 def test_fs_holes(self): 1232 # Even if the filesystem doesn't report holes, 1233 # if the OS supports it the SEEK_* constants 1234 # will be defined and will have a consistent 1235 # behaviour: 1236 # os.SEEK_DATA = current position 1237 # os.SEEK_HOLE = end of file position 1238 with open(os_helper.TESTFN, 'r+b') as fp: 1239 fp.write(b"hello") 1240 fp.flush() 1241 size = fp.tell() 1242 fno = fp.fileno() 1243 try : 1244 for i in range(size): 1245 self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA)) 1246 self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE)) 1247 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA) 1248 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE) 1249 except OSError : 1250 # Some OSs claim to support SEEK_HOLE/SEEK_DATA 1251 # but it is not true. 1252 # For instance: 1253 # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html 1254 raise unittest.SkipTest("OSError raised!") 1255 1256 def test_path_error2(self): 1257 """ 1258 Test functions that call path_error2(), providing two filenames in their exceptions. 1259 """ 1260 for name in ("rename", "replace", "link"): 1261 function = getattr(os, name, None) 1262 if function is None: 1263 continue 1264 1265 for dst in ("noodly2", os_helper.TESTFN): 1266 try: 1267 function('doesnotexistfilename', dst) 1268 except OSError as e: 1269 self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e)) 1270 break 1271 else: 1272 self.fail("No valid path_error2() test for os." + name) 1273 1274 def test_path_with_null_character(self): 1275 fn = os_helper.TESTFN 1276 fn_with_NUL = fn + '\0' 1277 self.addCleanup(os_helper.unlink, fn) 1278 os_helper.unlink(fn) 1279 fd = None 1280 try: 1281 with self.assertRaises(ValueError): 1282 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1283 finally: 1284 if fd is not None: 1285 os.close(fd) 1286 self.assertFalse(os.path.exists(fn)) 1287 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1288 self.assertFalse(os.path.exists(fn)) 1289 open(fn, 'wb').close() 1290 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1291 1292 def test_path_with_null_byte(self): 1293 fn = os.fsencode(os_helper.TESTFN) 1294 fn_with_NUL = fn + b'\0' 1295 self.addCleanup(os_helper.unlink, fn) 1296 os_helper.unlink(fn) 1297 fd = None 1298 try: 1299 with self.assertRaises(ValueError): 1300 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1301 finally: 1302 if fd is not None: 1303 os.close(fd) 1304 self.assertFalse(os.path.exists(fn)) 1305 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1306 self.assertFalse(os.path.exists(fn)) 1307 open(fn, 'wb').close() 1308 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1309 1310 @unittest.skipUnless(hasattr(os, "pidfd_open"), "pidfd_open unavailable") 1311 def test_pidfd_open(self): 1312 with self.assertRaises(OSError) as cm: 1313 os.pidfd_open(-1) 1314 if cm.exception.errno == errno.ENOSYS: 1315 self.skipTest("system does not support pidfd_open") 1316 if isinstance(cm.exception, PermissionError): 1317 self.skipTest(f"pidfd_open syscall blocked: {cm.exception!r}") 1318 self.assertEqual(cm.exception.errno, errno.EINVAL) 1319 os.close(os.pidfd_open(os.getpid(), 0)) 1320 1321 1322# tests for the posix *at functions follow 1323class TestPosixDirFd(unittest.TestCase): 1324 count = 0 1325 1326 @contextmanager 1327 def prepare(self): 1328 TestPosixDirFd.count += 1 1329 name = f'{os_helper.TESTFN}_{self.count}' 1330 base_dir = f'{os_helper.TESTFN}_{self.count}base' 1331 posix.mkdir(base_dir) 1332 self.addCleanup(posix.rmdir, base_dir) 1333 fullname = os.path.join(base_dir, name) 1334 assert not os.path.exists(fullname) 1335 with os_helper.open_dir_fd(base_dir) as dir_fd: 1336 yield (dir_fd, name, fullname) 1337 1338 @contextmanager 1339 def prepare_file(self): 1340 with self.prepare() as (dir_fd, name, fullname): 1341 os_helper.create_empty_file(fullname) 1342 self.addCleanup(posix.unlink, fullname) 1343 yield (dir_fd, name, fullname) 1344 1345 @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") 1346 def test_access_dir_fd(self): 1347 with self.prepare_file() as (dir_fd, name, fullname): 1348 self.assertTrue(posix.access(name, os.R_OK, dir_fd=dir_fd)) 1349 1350 @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") 1351 def test_chmod_dir_fd(self): 1352 with self.prepare_file() as (dir_fd, name, fullname): 1353 posix.chmod(fullname, stat.S_IRUSR) 1354 posix.chmod(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd) 1355 s = posix.stat(fullname) 1356 self.assertEqual(s.st_mode & stat.S_IRWXU, 1357 stat.S_IRUSR | stat.S_IWUSR) 1358 1359 @unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd), 1360 "test needs dir_fd support in os.chown()") 1361 @unittest.skipIf(support.is_emscripten, "getgid() is a stub") 1362 def test_chown_dir_fd(self): 1363 with self.prepare_file() as (dir_fd, name, fullname): 1364 posix.chown(name, os.getuid(), os.getgid(), dir_fd=dir_fd) 1365 1366 @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") 1367 def test_stat_dir_fd(self): 1368 with self.prepare() as (dir_fd, name, fullname): 1369 with open(fullname, 'w') as outfile: 1370 outfile.write("testline\n") 1371 self.addCleanup(posix.unlink, fullname) 1372 1373 s1 = posix.stat(fullname) 1374 s2 = posix.stat(name, dir_fd=dir_fd) 1375 self.assertEqual(s1, s2) 1376 s2 = posix.stat(fullname, dir_fd=None) 1377 self.assertEqual(s1, s2) 1378 1379 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1380 posix.stat, name, dir_fd=posix.getcwd()) 1381 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1382 posix.stat, name, dir_fd=float(dir_fd)) 1383 self.assertRaises(OverflowError, 1384 posix.stat, name, dir_fd=10**20) 1385 1386 @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") 1387 def test_utime_dir_fd(self): 1388 with self.prepare_file() as (dir_fd, name, fullname): 1389 now = time.time() 1390 posix.utime(name, None, dir_fd=dir_fd) 1391 posix.utime(name, dir_fd=dir_fd) 1392 self.assertRaises(TypeError, posix.utime, name, 1393 now, dir_fd=dir_fd) 1394 self.assertRaises(TypeError, posix.utime, name, 1395 (None, None), dir_fd=dir_fd) 1396 self.assertRaises(TypeError, posix.utime, name, 1397 (now, None), dir_fd=dir_fd) 1398 self.assertRaises(TypeError, posix.utime, name, 1399 (None, now), dir_fd=dir_fd) 1400 self.assertRaises(TypeError, posix.utime, name, 1401 (now, "x"), dir_fd=dir_fd) 1402 posix.utime(name, (int(now), int(now)), dir_fd=dir_fd) 1403 posix.utime(name, (now, now), dir_fd=dir_fd) 1404 posix.utime(name, 1405 (int(now), int((now - int(now)) * 1e9)), dir_fd=dir_fd) 1406 posix.utime(name, dir_fd=dir_fd, 1407 times=(int(now), int((now - int(now)) * 1e9))) 1408 1409 # try dir_fd and follow_symlinks together 1410 if os.utime in os.supports_follow_symlinks: 1411 try: 1412 posix.utime(name, follow_symlinks=False, dir_fd=dir_fd) 1413 except ValueError: 1414 # whoops! using both together not supported on this platform. 1415 pass 1416 1417 @unittest.skipIf( 1418 support.is_wasi, 1419 "WASI: symlink following on path_link is not supported" 1420 ) 1421 @unittest.skipUnless( 1422 hasattr(os, "link") and os.link in os.supports_dir_fd, 1423 "test needs dir_fd support in os.link()" 1424 ) 1425 def test_link_dir_fd(self): 1426 with self.prepare_file() as (dir_fd, name, fullname), \ 1427 self.prepare() as (dir_fd2, linkname, fulllinkname): 1428 try: 1429 posix.link(name, linkname, src_dir_fd=dir_fd, dst_dir_fd=dir_fd2) 1430 except PermissionError as e: 1431 self.skipTest('posix.link(): %s' % e) 1432 self.addCleanup(posix.unlink, fulllinkname) 1433 # should have same inodes 1434 self.assertEqual(posix.stat(fullname)[1], 1435 posix.stat(fulllinkname)[1]) 1436 1437 @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") 1438 def test_mkdir_dir_fd(self): 1439 with self.prepare() as (dir_fd, name, fullname): 1440 posix.mkdir(name, dir_fd=dir_fd) 1441 self.addCleanup(posix.rmdir, fullname) 1442 posix.stat(fullname) # should not raise exception 1443 1444 @unittest.skipUnless(hasattr(os, 'mknod') 1445 and (os.mknod in os.supports_dir_fd) 1446 and hasattr(stat, 'S_IFIFO'), 1447 "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") 1448 def test_mknod_dir_fd(self): 1449 # Test using mknodat() to create a FIFO (the only use specified 1450 # by POSIX). 1451 with self.prepare() as (dir_fd, name, fullname): 1452 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 1453 try: 1454 posix.mknod(name, mode, 0, dir_fd=dir_fd) 1455 except OSError as e: 1456 # Some old systems don't allow unprivileged users to use 1457 # mknod(), or only support creating device nodes. 1458 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 1459 else: 1460 self.addCleanup(posix.unlink, fullname) 1461 self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode)) 1462 1463 @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") 1464 def test_open_dir_fd(self): 1465 with self.prepare() as (dir_fd, name, fullname): 1466 with open(fullname, 'wb') as outfile: 1467 outfile.write(b"testline\n") 1468 self.addCleanup(posix.unlink, fullname) 1469 fd = posix.open(name, posix.O_RDONLY, dir_fd=dir_fd) 1470 try: 1471 res = posix.read(fd, 9) 1472 self.assertEqual(b"testline\n", res) 1473 finally: 1474 posix.close(fd) 1475 1476 @unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd), 1477 "test needs dir_fd support in os.readlink()") 1478 def test_readlink_dir_fd(self): 1479 with self.prepare() as (dir_fd, name, fullname): 1480 os.symlink('symlink', fullname) 1481 self.addCleanup(posix.unlink, fullname) 1482 self.assertEqual(posix.readlink(name, dir_fd=dir_fd), 'symlink') 1483 1484 @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") 1485 def test_rename_dir_fd(self): 1486 with self.prepare_file() as (dir_fd, name, fullname), \ 1487 self.prepare() as (dir_fd2, name2, fullname2): 1488 posix.rename(name, name2, 1489 src_dir_fd=dir_fd, dst_dir_fd=dir_fd2) 1490 posix.stat(fullname2) # should not raise exception 1491 posix.rename(fullname2, fullname) 1492 1493 @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") 1494 def test_symlink_dir_fd(self): 1495 with self.prepare() as (dir_fd, name, fullname): 1496 posix.symlink('symlink', name, dir_fd=dir_fd) 1497 self.addCleanup(posix.unlink, fullname) 1498 self.assertEqual(posix.readlink(fullname), 'symlink') 1499 1500 @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") 1501 def test_unlink_dir_fd(self): 1502 with self.prepare() as (dir_fd, name, fullname): 1503 os_helper.create_empty_file(fullname) 1504 posix.stat(fullname) # should not raise exception 1505 try: 1506 posix.unlink(name, dir_fd=dir_fd) 1507 self.assertRaises(OSError, posix.stat, fullname) 1508 except: 1509 self.addCleanup(posix.unlink, fullname) 1510 raise 1511 1512 @unittest.skipUnless(hasattr(os, 'mkfifo') and os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") 1513 def test_mkfifo_dir_fd(self): 1514 with self.prepare() as (dir_fd, name, fullname): 1515 try: 1516 posix.mkfifo(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd) 1517 except PermissionError as e: 1518 self.skipTest('posix.mkfifo(): %s' % e) 1519 self.addCleanup(posix.unlink, fullname) 1520 self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode)) 1521 1522 1523class PosixGroupsTester(unittest.TestCase): 1524 1525 def setUp(self): 1526 if posix.getuid() != 0: 1527 raise unittest.SkipTest("not enough privileges") 1528 if not hasattr(posix, 'getgroups'): 1529 raise unittest.SkipTest("need posix.getgroups") 1530 if sys.platform == 'darwin': 1531 raise unittest.SkipTest("getgroups(2) is broken on OSX") 1532 self.saved_groups = posix.getgroups() 1533 1534 def tearDown(self): 1535 if hasattr(posix, 'setgroups'): 1536 posix.setgroups(self.saved_groups) 1537 elif hasattr(posix, 'initgroups'): 1538 name = pwd.getpwuid(posix.getuid()).pw_name 1539 posix.initgroups(name, self.saved_groups[0]) 1540 1541 @unittest.skipUnless(hasattr(posix, 'initgroups'), 1542 "test needs posix.initgroups()") 1543 def test_initgroups(self): 1544 # find missing group 1545 1546 g = max(self.saved_groups or [0]) + 1 1547 name = pwd.getpwuid(posix.getuid()).pw_name 1548 posix.initgroups(name, g) 1549 self.assertIn(g, posix.getgroups()) 1550 1551 @unittest.skipUnless(hasattr(posix, 'setgroups'), 1552 "test needs posix.setgroups()") 1553 def test_setgroups(self): 1554 for groups in [[0], list(range(16))]: 1555 posix.setgroups(groups) 1556 self.assertListEqual(groups, posix.getgroups()) 1557 1558 1559class _PosixSpawnMixin: 1560 # Program which does nothing and exits with status 0 (success) 1561 NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass') 1562 spawn_func = None 1563 1564 def python_args(self, *args): 1565 # Disable site module to avoid side effects. For example, 1566 # on Fedora 28, if the HOME environment variable is not set, 1567 # site._getuserbase() calls pwd.getpwuid() which opens 1568 # /var/lib/sss/mc/passwd but then leaves the file open which makes 1569 # test_close_file() to fail. 1570 return (sys.executable, '-I', '-S', *args) 1571 1572 def test_returns_pid(self): 1573 pidfile = os_helper.TESTFN 1574 self.addCleanup(os_helper.unlink, pidfile) 1575 script = f"""if 1: 1576 import os 1577 with open({pidfile!r}, "w") as pidfile: 1578 pidfile.write(str(os.getpid())) 1579 """ 1580 args = self.python_args('-c', script) 1581 pid = self.spawn_func(args[0], args, os.environ) 1582 support.wait_process(pid, exitcode=0) 1583 with open(pidfile, encoding="utf-8") as f: 1584 self.assertEqual(f.read(), str(pid)) 1585 1586 def test_no_such_executable(self): 1587 no_such_executable = 'no_such_executable' 1588 try: 1589 pid = self.spawn_func(no_such_executable, 1590 [no_such_executable], 1591 os.environ) 1592 # bpo-35794: PermissionError can be raised if there are 1593 # directories in the $PATH that are not accessible. 1594 except (FileNotFoundError, PermissionError) as exc: 1595 self.assertEqual(exc.filename, no_such_executable) 1596 else: 1597 pid2, status = os.waitpid(pid, 0) 1598 self.assertEqual(pid2, pid) 1599 self.assertNotEqual(status, 0) 1600 1601 def test_specify_environment(self): 1602 envfile = os_helper.TESTFN 1603 self.addCleanup(os_helper.unlink, envfile) 1604 script = f"""if 1: 1605 import os 1606 with open({envfile!r}, "w", encoding="utf-8") as envfile: 1607 envfile.write(os.environ['foo']) 1608 """ 1609 args = self.python_args('-c', script) 1610 pid = self.spawn_func(args[0], args, 1611 {**os.environ, 'foo': 'bar'}) 1612 support.wait_process(pid, exitcode=0) 1613 with open(envfile, encoding="utf-8") as f: 1614 self.assertEqual(f.read(), 'bar') 1615 1616 def test_none_file_actions(self): 1617 pid = self.spawn_func( 1618 self.NOOP_PROGRAM[0], 1619 self.NOOP_PROGRAM, 1620 os.environ, 1621 file_actions=None 1622 ) 1623 support.wait_process(pid, exitcode=0) 1624 1625 def test_empty_file_actions(self): 1626 pid = self.spawn_func( 1627 self.NOOP_PROGRAM[0], 1628 self.NOOP_PROGRAM, 1629 os.environ, 1630 file_actions=[] 1631 ) 1632 support.wait_process(pid, exitcode=0) 1633 1634 def test_resetids_explicit_default(self): 1635 pid = self.spawn_func( 1636 sys.executable, 1637 [sys.executable, '-c', 'pass'], 1638 os.environ, 1639 resetids=False 1640 ) 1641 support.wait_process(pid, exitcode=0) 1642 1643 def test_resetids(self): 1644 pid = self.spawn_func( 1645 sys.executable, 1646 [sys.executable, '-c', 'pass'], 1647 os.environ, 1648 resetids=True 1649 ) 1650 support.wait_process(pid, exitcode=0) 1651 1652 def test_resetids_wrong_type(self): 1653 with self.assertRaises(TypeError): 1654 self.spawn_func(sys.executable, 1655 [sys.executable, "-c", "pass"], 1656 os.environ, resetids=None) 1657 1658 def test_setpgroup(self): 1659 pid = self.spawn_func( 1660 sys.executable, 1661 [sys.executable, '-c', 'pass'], 1662 os.environ, 1663 setpgroup=os.getpgrp() 1664 ) 1665 support.wait_process(pid, exitcode=0) 1666 1667 def test_setpgroup_wrong_type(self): 1668 with self.assertRaises(TypeError): 1669 self.spawn_func(sys.executable, 1670 [sys.executable, "-c", "pass"], 1671 os.environ, setpgroup="023") 1672 1673 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1674 'need signal.pthread_sigmask()') 1675 def test_setsigmask(self): 1676 code = textwrap.dedent("""\ 1677 import signal 1678 signal.raise_signal(signal.SIGUSR1)""") 1679 1680 pid = self.spawn_func( 1681 sys.executable, 1682 [sys.executable, '-c', code], 1683 os.environ, 1684 setsigmask=[signal.SIGUSR1] 1685 ) 1686 support.wait_process(pid, exitcode=0) 1687 1688 def test_setsigmask_wrong_type(self): 1689 with self.assertRaises(TypeError): 1690 self.spawn_func(sys.executable, 1691 [sys.executable, "-c", "pass"], 1692 os.environ, setsigmask=34) 1693 with self.assertRaises(TypeError): 1694 self.spawn_func(sys.executable, 1695 [sys.executable, "-c", "pass"], 1696 os.environ, setsigmask=["j"]) 1697 with self.assertRaises(ValueError): 1698 self.spawn_func(sys.executable, 1699 [sys.executable, "-c", "pass"], 1700 os.environ, setsigmask=[signal.NSIG, 1701 signal.NSIG+1]) 1702 1703 def test_setsid(self): 1704 rfd, wfd = os.pipe() 1705 self.addCleanup(os.close, rfd) 1706 try: 1707 os.set_inheritable(wfd, True) 1708 1709 code = textwrap.dedent(f""" 1710 import os 1711 fd = {wfd} 1712 sid = os.getsid(0) 1713 os.write(fd, str(sid).encode()) 1714 """) 1715 1716 try: 1717 pid = self.spawn_func(sys.executable, 1718 [sys.executable, "-c", code], 1719 os.environ, setsid=True) 1720 except NotImplementedError as exc: 1721 self.skipTest(f"setsid is not supported: {exc!r}") 1722 except PermissionError as exc: 1723 self.skipTest(f"setsid failed with: {exc!r}") 1724 finally: 1725 os.close(wfd) 1726 1727 support.wait_process(pid, exitcode=0) 1728 1729 output = os.read(rfd, 100) 1730 child_sid = int(output) 1731 parent_sid = os.getsid(os.getpid()) 1732 self.assertNotEqual(parent_sid, child_sid) 1733 1734 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1735 'need signal.pthread_sigmask()') 1736 def test_setsigdef(self): 1737 original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN) 1738 code = textwrap.dedent("""\ 1739 import signal 1740 signal.raise_signal(signal.SIGUSR1)""") 1741 try: 1742 pid = self.spawn_func( 1743 sys.executable, 1744 [sys.executable, '-c', code], 1745 os.environ, 1746 setsigdef=[signal.SIGUSR1] 1747 ) 1748 finally: 1749 signal.signal(signal.SIGUSR1, original_handler) 1750 1751 support.wait_process(pid, exitcode=-signal.SIGUSR1) 1752 1753 def test_setsigdef_wrong_type(self): 1754 with self.assertRaises(TypeError): 1755 self.spawn_func(sys.executable, 1756 [sys.executable, "-c", "pass"], 1757 os.environ, setsigdef=34) 1758 with self.assertRaises(TypeError): 1759 self.spawn_func(sys.executable, 1760 [sys.executable, "-c", "pass"], 1761 os.environ, setsigdef=["j"]) 1762 with self.assertRaises(ValueError): 1763 self.spawn_func(sys.executable, 1764 [sys.executable, "-c", "pass"], 1765 os.environ, setsigdef=[signal.NSIG, signal.NSIG+1]) 1766 1767 @requires_sched 1768 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1769 "bpo-34685: test can fail on BSD") 1770 def test_setscheduler_only_param(self): 1771 policy = os.sched_getscheduler(0) 1772 priority = os.sched_get_priority_min(policy) 1773 code = textwrap.dedent(f"""\ 1774 import os, sys 1775 if os.sched_getscheduler(0) != {policy}: 1776 sys.exit(101) 1777 if os.sched_getparam(0).sched_priority != {priority}: 1778 sys.exit(102)""") 1779 pid = self.spawn_func( 1780 sys.executable, 1781 [sys.executable, '-c', code], 1782 os.environ, 1783 scheduler=(None, os.sched_param(priority)) 1784 ) 1785 support.wait_process(pid, exitcode=0) 1786 1787 @requires_sched 1788 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1789 "bpo-34685: test can fail on BSD") 1790 def test_setscheduler_with_policy(self): 1791 policy = os.sched_getscheduler(0) 1792 priority = os.sched_get_priority_min(policy) 1793 code = textwrap.dedent(f"""\ 1794 import os, sys 1795 if os.sched_getscheduler(0) != {policy}: 1796 sys.exit(101) 1797 if os.sched_getparam(0).sched_priority != {priority}: 1798 sys.exit(102)""") 1799 pid = self.spawn_func( 1800 sys.executable, 1801 [sys.executable, '-c', code], 1802 os.environ, 1803 scheduler=(policy, os.sched_param(priority)) 1804 ) 1805 support.wait_process(pid, exitcode=0) 1806 1807 def test_multiple_file_actions(self): 1808 file_actions = [ 1809 (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0), 1810 (os.POSIX_SPAWN_CLOSE, 0), 1811 (os.POSIX_SPAWN_DUP2, 1, 4), 1812 ] 1813 pid = self.spawn_func(self.NOOP_PROGRAM[0], 1814 self.NOOP_PROGRAM, 1815 os.environ, 1816 file_actions=file_actions) 1817 support.wait_process(pid, exitcode=0) 1818 1819 def test_bad_file_actions(self): 1820 args = self.NOOP_PROGRAM 1821 with self.assertRaises(TypeError): 1822 self.spawn_func(args[0], args, os.environ, 1823 file_actions=[None]) 1824 with self.assertRaises(TypeError): 1825 self.spawn_func(args[0], args, os.environ, 1826 file_actions=[()]) 1827 with self.assertRaises(TypeError): 1828 self.spawn_func(args[0], args, os.environ, 1829 file_actions=[(None,)]) 1830 with self.assertRaises(TypeError): 1831 self.spawn_func(args[0], args, os.environ, 1832 file_actions=[(12345,)]) 1833 with self.assertRaises(TypeError): 1834 self.spawn_func(args[0], args, os.environ, 1835 file_actions=[(os.POSIX_SPAWN_CLOSE,)]) 1836 with self.assertRaises(TypeError): 1837 self.spawn_func(args[0], args, os.environ, 1838 file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)]) 1839 with self.assertRaises(TypeError): 1840 self.spawn_func(args[0], args, os.environ, 1841 file_actions=[(os.POSIX_SPAWN_CLOSE, None)]) 1842 with self.assertRaises(ValueError): 1843 self.spawn_func(args[0], args, os.environ, 1844 file_actions=[(os.POSIX_SPAWN_OPEN, 1845 3, __file__ + '\0', 1846 os.O_RDONLY, 0)]) 1847 1848 def test_open_file(self): 1849 outfile = os_helper.TESTFN 1850 self.addCleanup(os_helper.unlink, outfile) 1851 script = """if 1: 1852 import sys 1853 sys.stdout.write("hello") 1854 """ 1855 file_actions = [ 1856 (os.POSIX_SPAWN_OPEN, 1, outfile, 1857 os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 1858 stat.S_IRUSR | stat.S_IWUSR), 1859 ] 1860 args = self.python_args('-c', script) 1861 pid = self.spawn_func(args[0], args, os.environ, 1862 file_actions=file_actions) 1863 1864 support.wait_process(pid, exitcode=0) 1865 with open(outfile, encoding="utf-8") as f: 1866 self.assertEqual(f.read(), 'hello') 1867 1868 def test_close_file(self): 1869 closefile = os_helper.TESTFN 1870 self.addCleanup(os_helper.unlink, closefile) 1871 script = f"""if 1: 1872 import os 1873 try: 1874 os.fstat(0) 1875 except OSError as e: 1876 with open({closefile!r}, 'w', encoding='utf-8') as closefile: 1877 closefile.write('is closed %d' % e.errno) 1878 """ 1879 args = self.python_args('-c', script) 1880 pid = self.spawn_func(args[0], args, os.environ, 1881 file_actions=[(os.POSIX_SPAWN_CLOSE, 0)]) 1882 1883 support.wait_process(pid, exitcode=0) 1884 with open(closefile, encoding="utf-8") as f: 1885 self.assertEqual(f.read(), 'is closed %d' % errno.EBADF) 1886 1887 def test_dup2(self): 1888 dupfile = os_helper.TESTFN 1889 self.addCleanup(os_helper.unlink, dupfile) 1890 script = """if 1: 1891 import sys 1892 sys.stdout.write("hello") 1893 """ 1894 with open(dupfile, "wb") as childfile: 1895 file_actions = [ 1896 (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1), 1897 ] 1898 args = self.python_args('-c', script) 1899 pid = self.spawn_func(args[0], args, os.environ, 1900 file_actions=file_actions) 1901 support.wait_process(pid, exitcode=0) 1902 with open(dupfile, encoding="utf-8") as f: 1903 self.assertEqual(f.read(), 'hello') 1904 1905 1906@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn") 1907class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin): 1908 spawn_func = getattr(posix, 'posix_spawn', None) 1909 1910 1911@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp") 1912class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin): 1913 spawn_func = getattr(posix, 'posix_spawnp', None) 1914 1915 @os_helper.skip_unless_symlink 1916 def test_posix_spawnp(self): 1917 # Use a symlink to create a program in its own temporary directory 1918 temp_dir = tempfile.mkdtemp() 1919 self.addCleanup(os_helper.rmtree, temp_dir) 1920 1921 program = 'posix_spawnp_test_program.exe' 1922 program_fullpath = os.path.join(temp_dir, program) 1923 os.symlink(sys.executable, program_fullpath) 1924 1925 try: 1926 path = os.pathsep.join((temp_dir, os.environ['PATH'])) 1927 except KeyError: 1928 path = temp_dir # PATH is not set 1929 1930 spawn_args = (program, '-I', '-S', '-c', 'pass') 1931 code = textwrap.dedent(""" 1932 import os 1933 from test import support 1934 1935 args = %a 1936 pid = os.posix_spawnp(args[0], args, os.environ) 1937 1938 support.wait_process(pid, exitcode=0) 1939 """ % (spawn_args,)) 1940 1941 # Use a subprocess to test os.posix_spawnp() with a modified PATH 1942 # environment variable: posix_spawnp() uses the current environment 1943 # to locate the program, not its environment argument. 1944 args = ('-c', code) 1945 assert_python_ok(*args, PATH=path) 1946 1947 1948@unittest.skipUnless(sys.platform == "darwin", "test weak linking on macOS") 1949class TestPosixWeaklinking(unittest.TestCase): 1950 # These test cases verify that weak linking support on macOS works 1951 # as expected. These cases only test new behaviour introduced by weak linking, 1952 # regular behaviour is tested by the normal test cases. 1953 # 1954 # See the section on Weak Linking in Mac/README.txt for more information. 1955 def setUp(self): 1956 import sysconfig 1957 import platform 1958 1959 config_vars = sysconfig.get_config_vars() 1960 self.available = { nm for nm in config_vars if nm.startswith("HAVE_") and config_vars[nm] } 1961 self.mac_ver = tuple(int(part) for part in platform.mac_ver()[0].split(".")) 1962 1963 def _verify_available(self, name): 1964 if name not in self.available: 1965 raise unittest.SkipTest(f"{name} not weak-linked") 1966 1967 def test_pwritev(self): 1968 self._verify_available("HAVE_PWRITEV") 1969 if self.mac_ver >= (10, 16): 1970 self.assertTrue(hasattr(os, "pwritev"), "os.pwritev is not available") 1971 self.assertTrue(hasattr(os, "preadv"), "os.readv is not available") 1972 1973 else: 1974 self.assertFalse(hasattr(os, "pwritev"), "os.pwritev is available") 1975 self.assertFalse(hasattr(os, "preadv"), "os.readv is available") 1976 1977 def test_stat(self): 1978 self._verify_available("HAVE_FSTATAT") 1979 if self.mac_ver >= (10, 10): 1980 self.assertIn("HAVE_FSTATAT", posix._have_functions) 1981 1982 else: 1983 self.assertNotIn("HAVE_FSTATAT", posix._have_functions) 1984 1985 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1986 os.stat("file", dir_fd=0) 1987 1988 def test_access(self): 1989 self._verify_available("HAVE_FACCESSAT") 1990 if self.mac_ver >= (10, 10): 1991 self.assertIn("HAVE_FACCESSAT", posix._have_functions) 1992 1993 else: 1994 self.assertNotIn("HAVE_FACCESSAT", posix._have_functions) 1995 1996 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1997 os.access("file", os.R_OK, dir_fd=0) 1998 1999 with self.assertRaisesRegex(NotImplementedError, "follow_symlinks unavailable"): 2000 os.access("file", os.R_OK, follow_symlinks=False) 2001 2002 with self.assertRaisesRegex(NotImplementedError, "effective_ids unavailable"): 2003 os.access("file", os.R_OK, effective_ids=True) 2004 2005 def test_chmod(self): 2006 self._verify_available("HAVE_FCHMODAT") 2007 if self.mac_ver >= (10, 10): 2008 self.assertIn("HAVE_FCHMODAT", posix._have_functions) 2009 2010 else: 2011 self.assertNotIn("HAVE_FCHMODAT", posix._have_functions) 2012 self.assertIn("HAVE_LCHMOD", posix._have_functions) 2013 2014 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2015 os.chmod("file", 0o644, dir_fd=0) 2016 2017 def test_chown(self): 2018 self._verify_available("HAVE_FCHOWNAT") 2019 if self.mac_ver >= (10, 10): 2020 self.assertIn("HAVE_FCHOWNAT", posix._have_functions) 2021 2022 else: 2023 self.assertNotIn("HAVE_FCHOWNAT", posix._have_functions) 2024 self.assertIn("HAVE_LCHOWN", posix._have_functions) 2025 2026 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2027 os.chown("file", 0, 0, dir_fd=0) 2028 2029 def test_link(self): 2030 self._verify_available("HAVE_LINKAT") 2031 if self.mac_ver >= (10, 10): 2032 self.assertIn("HAVE_LINKAT", posix._have_functions) 2033 2034 else: 2035 self.assertNotIn("HAVE_LINKAT", posix._have_functions) 2036 2037 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"): 2038 os.link("source", "target", src_dir_fd=0) 2039 2040 with self.assertRaisesRegex(NotImplementedError, "dst_dir_fd unavailable"): 2041 os.link("source", "target", dst_dir_fd=0) 2042 2043 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"): 2044 os.link("source", "target", src_dir_fd=0, dst_dir_fd=0) 2045 2046 # issue 41355: !HAVE_LINKAT code path ignores the follow_symlinks flag 2047 with os_helper.temp_dir() as base_path: 2048 link_path = os.path.join(base_path, "link") 2049 target_path = os.path.join(base_path, "target") 2050 source_path = os.path.join(base_path, "source") 2051 2052 with open(source_path, "w") as fp: 2053 fp.write("data") 2054 2055 os.symlink("target", link_path) 2056 2057 # Calling os.link should fail in the link(2) call, and 2058 # should not reject *follow_symlinks* (to match the 2059 # behaviour you'd get when building on a platform without 2060 # linkat) 2061 with self.assertRaises(FileExistsError): 2062 os.link(source_path, link_path, follow_symlinks=True) 2063 2064 with self.assertRaises(FileExistsError): 2065 os.link(source_path, link_path, follow_symlinks=False) 2066 2067 2068 def test_listdir_scandir(self): 2069 self._verify_available("HAVE_FDOPENDIR") 2070 if self.mac_ver >= (10, 10): 2071 self.assertIn("HAVE_FDOPENDIR", posix._have_functions) 2072 2073 else: 2074 self.assertNotIn("HAVE_FDOPENDIR", posix._have_functions) 2075 2076 with self.assertRaisesRegex(TypeError, "listdir: path should be string, bytes, os.PathLike or None, not int"): 2077 os.listdir(0) 2078 2079 with self.assertRaisesRegex(TypeError, "scandir: path should be string, bytes, os.PathLike or None, not int"): 2080 os.scandir(0) 2081 2082 def test_mkdir(self): 2083 self._verify_available("HAVE_MKDIRAT") 2084 if self.mac_ver >= (10, 10): 2085 self.assertIn("HAVE_MKDIRAT", posix._have_functions) 2086 2087 else: 2088 self.assertNotIn("HAVE_MKDIRAT", posix._have_functions) 2089 2090 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2091 os.mkdir("dir", dir_fd=0) 2092 2093 def test_mkfifo(self): 2094 self._verify_available("HAVE_MKFIFOAT") 2095 if self.mac_ver >= (13, 0): 2096 self.assertIn("HAVE_MKFIFOAT", posix._have_functions) 2097 2098 else: 2099 self.assertNotIn("HAVE_MKFIFOAT", posix._have_functions) 2100 2101 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2102 os.mkfifo("path", dir_fd=0) 2103 2104 def test_mknod(self): 2105 self._verify_available("HAVE_MKNODAT") 2106 if self.mac_ver >= (13, 0): 2107 self.assertIn("HAVE_MKNODAT", posix._have_functions) 2108 2109 else: 2110 self.assertNotIn("HAVE_MKNODAT", posix._have_functions) 2111 2112 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2113 os.mknod("path", dir_fd=0) 2114 2115 def test_rename_replace(self): 2116 self._verify_available("HAVE_RENAMEAT") 2117 if self.mac_ver >= (10, 10): 2118 self.assertIn("HAVE_RENAMEAT", posix._have_functions) 2119 2120 else: 2121 self.assertNotIn("HAVE_RENAMEAT", posix._have_functions) 2122 2123 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2124 os.rename("a", "b", src_dir_fd=0) 2125 2126 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2127 os.rename("a", "b", dst_dir_fd=0) 2128 2129 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2130 os.replace("a", "b", src_dir_fd=0) 2131 2132 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2133 os.replace("a", "b", dst_dir_fd=0) 2134 2135 def test_unlink_rmdir(self): 2136 self._verify_available("HAVE_UNLINKAT") 2137 if self.mac_ver >= (10, 10): 2138 self.assertIn("HAVE_UNLINKAT", posix._have_functions) 2139 2140 else: 2141 self.assertNotIn("HAVE_UNLINKAT", posix._have_functions) 2142 2143 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2144 os.unlink("path", dir_fd=0) 2145 2146 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2147 os.rmdir("path", dir_fd=0) 2148 2149 def test_open(self): 2150 self._verify_available("HAVE_OPENAT") 2151 if self.mac_ver >= (10, 10): 2152 self.assertIn("HAVE_OPENAT", posix._have_functions) 2153 2154 else: 2155 self.assertNotIn("HAVE_OPENAT", posix._have_functions) 2156 2157 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2158 os.open("path", os.O_RDONLY, dir_fd=0) 2159 2160 def test_readlink(self): 2161 self._verify_available("HAVE_READLINKAT") 2162 if self.mac_ver >= (10, 10): 2163 self.assertIn("HAVE_READLINKAT", posix._have_functions) 2164 2165 else: 2166 self.assertNotIn("HAVE_READLINKAT", posix._have_functions) 2167 2168 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2169 os.readlink("path", dir_fd=0) 2170 2171 def test_symlink(self): 2172 self._verify_available("HAVE_SYMLINKAT") 2173 if self.mac_ver >= (10, 10): 2174 self.assertIn("HAVE_SYMLINKAT", posix._have_functions) 2175 2176 else: 2177 self.assertNotIn("HAVE_SYMLINKAT", posix._have_functions) 2178 2179 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2180 os.symlink("a", "b", dir_fd=0) 2181 2182 def test_utime(self): 2183 self._verify_available("HAVE_FUTIMENS") 2184 self._verify_available("HAVE_UTIMENSAT") 2185 if self.mac_ver >= (10, 13): 2186 self.assertIn("HAVE_FUTIMENS", posix._have_functions) 2187 self.assertIn("HAVE_UTIMENSAT", posix._have_functions) 2188 2189 else: 2190 self.assertNotIn("HAVE_FUTIMENS", posix._have_functions) 2191 self.assertNotIn("HAVE_UTIMENSAT", posix._have_functions) 2192 2193 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2194 os.utime("path", dir_fd=0) 2195 2196 2197def tearDownModule(): 2198 support.reap_children() 2199 2200 2201if __name__ == '__main__': 2202 unittest.main() 2203