1# tempfile.py unit tests. 2import tempfile 3import errno 4import io 5import os 6import pathlib 7import sys 8import re 9import warnings 10import contextlib 11import stat 12import types 13import weakref 14from unittest import mock 15 16import unittest 17from test import support 18from test.support import os_helper 19from test.support import script_helper 20from test.support import warnings_helper 21 22 23has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 24has_spawnl = hasattr(os, 'spawnl') 25 26# TEST_FILES may need to be tweaked for systems depending on the maximum 27# number of files that can be opened at one time (see ulimit -n) 28if sys.platform.startswith('openbsd'): 29 TEST_FILES = 48 30else: 31 TEST_FILES = 100 32 33# This is organized as one test for each chunk of code in tempfile.py, 34# in order of their appearance in the file. Testing which requires 35# threads is not done here. 36 37class TestLowLevelInternals(unittest.TestCase): 38 def test_infer_return_type_singles(self): 39 self.assertIs(str, tempfile._infer_return_type('')) 40 self.assertIs(bytes, tempfile._infer_return_type(b'')) 41 self.assertIs(str, tempfile._infer_return_type(None)) 42 43 def test_infer_return_type_multiples(self): 44 self.assertIs(str, tempfile._infer_return_type('', '')) 45 self.assertIs(bytes, tempfile._infer_return_type(b'', b'')) 46 with self.assertRaises(TypeError): 47 tempfile._infer_return_type('', b'') 48 with self.assertRaises(TypeError): 49 tempfile._infer_return_type(b'', '') 50 51 def test_infer_return_type_multiples_and_none(self): 52 self.assertIs(str, tempfile._infer_return_type(None, '')) 53 self.assertIs(str, tempfile._infer_return_type('', None)) 54 self.assertIs(str, tempfile._infer_return_type(None, None)) 55 self.assertIs(bytes, tempfile._infer_return_type(b'', None)) 56 self.assertIs(bytes, tempfile._infer_return_type(None, b'')) 57 with self.assertRaises(TypeError): 58 tempfile._infer_return_type('', None, b'') 59 with self.assertRaises(TypeError): 60 tempfile._infer_return_type(b'', None, '') 61 62 def test_infer_return_type_pathlib(self): 63 self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/'))) 64 65 def test_infer_return_type_pathlike(self): 66 class Path: 67 def __init__(self, path): 68 self.path = path 69 70 def __fspath__(self): 71 return self.path 72 73 self.assertIs(str, tempfile._infer_return_type(Path('/'))) 74 self.assertIs(bytes, tempfile._infer_return_type(Path(b'/'))) 75 self.assertIs(str, tempfile._infer_return_type('', Path(''))) 76 self.assertIs(bytes, tempfile._infer_return_type(b'', Path(b''))) 77 self.assertIs(bytes, tempfile._infer_return_type(None, Path(b''))) 78 self.assertIs(str, tempfile._infer_return_type(None, Path(''))) 79 80 with self.assertRaises(TypeError): 81 tempfile._infer_return_type('', Path(b'')) 82 with self.assertRaises(TypeError): 83 tempfile._infer_return_type(b'', Path('')) 84 85# Common functionality. 86 87class BaseTestCase(unittest.TestCase): 88 89 str_check = re.compile(r"^[a-z0-9_-]{8}$") 90 b_check = re.compile(br"^[a-z0-9_-]{8}$") 91 92 def setUp(self): 93 self.enterContext(warnings_helper.check_warnings()) 94 warnings.filterwarnings("ignore", category=RuntimeWarning, 95 message="mktemp", module=__name__) 96 97 def nameCheck(self, name, dir, pre, suf): 98 (ndir, nbase) = os.path.split(name) 99 npre = nbase[:len(pre)] 100 nsuf = nbase[len(nbase)-len(suf):] 101 102 if dir is not None: 103 self.assertIs( 104 type(name), 105 str 106 if type(dir) is str or isinstance(dir, os.PathLike) else 107 bytes, 108 "unexpected return type", 109 ) 110 if pre is not None: 111 self.assertIs(type(name), str if type(pre) is str else bytes, 112 "unexpected return type") 113 if suf is not None: 114 self.assertIs(type(name), str if type(suf) is str else bytes, 115 "unexpected return type") 116 if (dir, pre, suf) == (None, None, None): 117 self.assertIs(type(name), str, "default return type must be str") 118 119 # check for equality of the absolute paths! 120 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 121 "file %r not in directory %r" % (name, dir)) 122 self.assertEqual(npre, pre, 123 "file %r does not begin with %r" % (nbase, pre)) 124 self.assertEqual(nsuf, suf, 125 "file %r does not end with %r" % (nbase, suf)) 126 127 nbase = nbase[len(pre):len(nbase)-len(suf)] 128 check = self.str_check if isinstance(nbase, str) else self.b_check 129 self.assertTrue(check.match(nbase), 130 "random characters %r do not match %r" 131 % (nbase, check.pattern)) 132 133 134class TestExports(BaseTestCase): 135 def test_exports(self): 136 # There are no surprising symbols in the tempfile module 137 dict = tempfile.__dict__ 138 139 expected = { 140 "NamedTemporaryFile" : 1, 141 "TemporaryFile" : 1, 142 "mkstemp" : 1, 143 "mkdtemp" : 1, 144 "mktemp" : 1, 145 "TMP_MAX" : 1, 146 "gettempprefix" : 1, 147 "gettempprefixb" : 1, 148 "gettempdir" : 1, 149 "gettempdirb" : 1, 150 "tempdir" : 1, 151 "template" : 1, 152 "SpooledTemporaryFile" : 1, 153 "TemporaryDirectory" : 1, 154 } 155 156 unexp = [] 157 for key in dict: 158 if key[0] != '_' and key not in expected: 159 unexp.append(key) 160 self.assertTrue(len(unexp) == 0, 161 "unexpected keys: %s" % unexp) 162 163 164class TestRandomNameSequence(BaseTestCase): 165 """Test the internal iterator object _RandomNameSequence.""" 166 167 def setUp(self): 168 self.r = tempfile._RandomNameSequence() 169 super().setUp() 170 171 def test_get_eight_char_str(self): 172 # _RandomNameSequence returns a eight-character string 173 s = next(self.r) 174 self.nameCheck(s, '', '', '') 175 176 def test_many(self): 177 # _RandomNameSequence returns no duplicate strings (stochastic) 178 179 dict = {} 180 r = self.r 181 for i in range(TEST_FILES): 182 s = next(r) 183 self.nameCheck(s, '', '', '') 184 self.assertNotIn(s, dict) 185 dict[s] = 1 186 187 def supports_iter(self): 188 # _RandomNameSequence supports the iterator protocol 189 190 i = 0 191 r = self.r 192 for s in r: 193 i += 1 194 if i == 20: 195 break 196 197 @support.requires_fork() 198 def test_process_awareness(self): 199 # ensure that the random source differs between 200 # child and parent. 201 read_fd, write_fd = os.pipe() 202 pid = None 203 try: 204 pid = os.fork() 205 if not pid: 206 # child process 207 os.close(read_fd) 208 os.write(write_fd, next(self.r).encode("ascii")) 209 os.close(write_fd) 210 # bypass the normal exit handlers- leave those to 211 # the parent. 212 os._exit(0) 213 214 # parent process 215 parent_value = next(self.r) 216 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 217 finally: 218 if pid: 219 support.wait_process(pid, exitcode=0) 220 221 os.close(read_fd) 222 os.close(write_fd) 223 self.assertNotEqual(child_value, parent_value) 224 225 226 227class TestCandidateTempdirList(BaseTestCase): 228 """Test the internal function _candidate_tempdir_list.""" 229 230 def test_nonempty_list(self): 231 # _candidate_tempdir_list returns a nonempty list of strings 232 233 cand = tempfile._candidate_tempdir_list() 234 235 self.assertFalse(len(cand) == 0) 236 for c in cand: 237 self.assertIsInstance(c, str) 238 239 def test_wanted_dirs(self): 240 # _candidate_tempdir_list contains the expected directories 241 242 # Make sure the interesting environment variables are all set. 243 with os_helper.EnvironmentVarGuard() as env: 244 for envname in 'TMPDIR', 'TEMP', 'TMP': 245 dirname = os.getenv(envname) 246 if not dirname: 247 env[envname] = os.path.abspath(envname) 248 249 cand = tempfile._candidate_tempdir_list() 250 251 for envname in 'TMPDIR', 'TEMP', 'TMP': 252 dirname = os.getenv(envname) 253 if not dirname: raise ValueError 254 self.assertIn(dirname, cand) 255 256 try: 257 dirname = os.getcwd() 258 except (AttributeError, OSError): 259 dirname = os.curdir 260 261 self.assertIn(dirname, cand) 262 263 # Not practical to try to verify the presence of OS-specific 264 # paths in this list. 265 266 267# We test _get_default_tempdir some more by testing gettempdir. 268 269class TestGetDefaultTempdir(BaseTestCase): 270 """Test _get_default_tempdir().""" 271 272 def test_no_files_left_behind(self): 273 # use a private empty directory 274 with tempfile.TemporaryDirectory() as our_temp_directory: 275 # force _get_default_tempdir() to consider our empty directory 276 def our_candidate_list(): 277 return [our_temp_directory] 278 279 with support.swap_attr(tempfile, "_candidate_tempdir_list", 280 our_candidate_list): 281 # verify our directory is empty after _get_default_tempdir() 282 tempfile._get_default_tempdir() 283 self.assertEqual(os.listdir(our_temp_directory), []) 284 285 def raise_OSError(*args, **kwargs): 286 raise OSError() 287 288 with support.swap_attr(os, "open", raise_OSError): 289 # test again with failing os.open() 290 with self.assertRaises(FileNotFoundError): 291 tempfile._get_default_tempdir() 292 self.assertEqual(os.listdir(our_temp_directory), []) 293 294 with support.swap_attr(os, "write", raise_OSError): 295 # test again with failing os.write() 296 with self.assertRaises(FileNotFoundError): 297 tempfile._get_default_tempdir() 298 self.assertEqual(os.listdir(our_temp_directory), []) 299 300 301class TestGetCandidateNames(BaseTestCase): 302 """Test the internal function _get_candidate_names.""" 303 304 def test_retval(self): 305 # _get_candidate_names returns a _RandomNameSequence object 306 obj = tempfile._get_candidate_names() 307 self.assertIsInstance(obj, tempfile._RandomNameSequence) 308 309 def test_same_thing(self): 310 # _get_candidate_names always returns the same object 311 a = tempfile._get_candidate_names() 312 b = tempfile._get_candidate_names() 313 314 self.assertTrue(a is b) 315 316 317@contextlib.contextmanager 318def _inside_empty_temp_dir(): 319 dir = tempfile.mkdtemp() 320 try: 321 with support.swap_attr(tempfile, 'tempdir', dir): 322 yield 323 finally: 324 os_helper.rmtree(dir) 325 326 327def _mock_candidate_names(*names): 328 return support.swap_attr(tempfile, 329 '_get_candidate_names', 330 lambda: iter(names)) 331 332 333class TestBadTempdir: 334 335 @unittest.skipIf( 336 support.is_emscripten, "Emscripten cannot remove write bits." 337 ) 338 def test_read_only_directory(self): 339 with _inside_empty_temp_dir(): 340 oldmode = mode = os.stat(tempfile.tempdir).st_mode 341 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 342 os.chmod(tempfile.tempdir, mode) 343 try: 344 if os.access(tempfile.tempdir, os.W_OK): 345 self.skipTest("can't set the directory read-only") 346 with self.assertRaises(PermissionError): 347 self.make_temp() 348 self.assertEqual(os.listdir(tempfile.tempdir), []) 349 finally: 350 os.chmod(tempfile.tempdir, oldmode) 351 352 def test_nonexisting_directory(self): 353 with _inside_empty_temp_dir(): 354 tempdir = os.path.join(tempfile.tempdir, 'nonexistent') 355 with support.swap_attr(tempfile, 'tempdir', tempdir): 356 with self.assertRaises(FileNotFoundError): 357 self.make_temp() 358 359 def test_non_directory(self): 360 with _inside_empty_temp_dir(): 361 tempdir = os.path.join(tempfile.tempdir, 'file') 362 open(tempdir, 'wb').close() 363 with support.swap_attr(tempfile, 'tempdir', tempdir): 364 with self.assertRaises((NotADirectoryError, FileNotFoundError)): 365 self.make_temp() 366 367 368class TestMkstempInner(TestBadTempdir, BaseTestCase): 369 """Test the internal function _mkstemp_inner.""" 370 371 class mkstemped: 372 _bflags = tempfile._bin_openflags 373 _tflags = tempfile._text_openflags 374 _close = os.close 375 _unlink = os.unlink 376 377 def __init__(self, dir, pre, suf, bin): 378 if bin: flags = self._bflags 379 else: flags = self._tflags 380 381 output_type = tempfile._infer_return_type(dir, pre, suf) 382 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type) 383 384 def write(self, str): 385 os.write(self.fd, str) 386 387 def __del__(self): 388 self._close(self.fd) 389 self._unlink(self.name) 390 391 def do_create(self, dir=None, pre=None, suf=None, bin=1): 392 output_type = tempfile._infer_return_type(dir, pre, suf) 393 if dir is None: 394 if output_type is str: 395 dir = tempfile.gettempdir() 396 else: 397 dir = tempfile.gettempdirb() 398 if pre is None: 399 pre = output_type() 400 if suf is None: 401 suf = output_type() 402 file = self.mkstemped(dir, pre, suf, bin) 403 404 self.nameCheck(file.name, dir, pre, suf) 405 return file 406 407 def test_basic(self): 408 # _mkstemp_inner can create files 409 self.do_create().write(b"blat") 410 self.do_create(pre="a").write(b"blat") 411 self.do_create(suf="b").write(b"blat") 412 self.do_create(pre="a", suf="b").write(b"blat") 413 self.do_create(pre="aa", suf=".txt").write(b"blat") 414 415 def test_basic_with_bytes_names(self): 416 # _mkstemp_inner can create files when given name parts all 417 # specified as bytes. 418 dir_b = tempfile.gettempdirb() 419 self.do_create(dir=dir_b, suf=b"").write(b"blat") 420 self.do_create(dir=dir_b, pre=b"a").write(b"blat") 421 self.do_create(dir=dir_b, suf=b"b").write(b"blat") 422 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat") 423 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat") 424 # Can't mix str & binary types in the args. 425 with self.assertRaises(TypeError): 426 self.do_create(dir="", suf=b"").write(b"blat") 427 with self.assertRaises(TypeError): 428 self.do_create(dir=dir_b, pre="").write(b"blat") 429 with self.assertRaises(TypeError): 430 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat") 431 432 def test_basic_many(self): 433 # _mkstemp_inner can create many files (stochastic) 434 extant = list(range(TEST_FILES)) 435 for i in extant: 436 extant[i] = self.do_create(pre="aa") 437 438 def test_choose_directory(self): 439 # _mkstemp_inner can create files in a user-selected directory 440 dir = tempfile.mkdtemp() 441 try: 442 self.do_create(dir=dir).write(b"blat") 443 self.do_create(dir=pathlib.Path(dir)).write(b"blat") 444 finally: 445 support.gc_collect() # For PyPy or other GCs. 446 os.rmdir(dir) 447 448 @os_helper.skip_unless_working_chmod 449 def test_file_mode(self): 450 # _mkstemp_inner creates files with the proper mode 451 452 file = self.do_create() 453 mode = stat.S_IMODE(os.stat(file.name).st_mode) 454 expected = 0o600 455 if sys.platform == 'win32': 456 # There's no distinction among 'user', 'group' and 'world'; 457 # replicate the 'user' bits. 458 user = expected >> 6 459 expected = user * (1 + 8 + 64) 460 self.assertEqual(mode, expected) 461 462 @unittest.skipUnless(has_spawnl, 'os.spawnl not available') 463 @support.requires_subprocess() 464 def test_noinherit(self): 465 # _mkstemp_inner file handles are not inherited by child processes 466 467 if support.verbose: 468 v="v" 469 else: 470 v="q" 471 472 file = self.do_create() 473 self.assertEqual(os.get_inheritable(file.fd), False) 474 fd = "%d" % file.fd 475 476 try: 477 me = __file__ 478 except NameError: 479 me = sys.argv[0] 480 481 # We have to exec something, so that FD_CLOEXEC will take 482 # effect. The core of this test is therefore in 483 # tf_inherit_check.py, which see. 484 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 485 "tf_inherit_check.py") 486 487 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 488 # but an arg with embedded spaces should be decorated with double 489 # quotes on each end 490 if sys.platform == 'win32': 491 decorated = '"%s"' % sys.executable 492 tester = '"%s"' % tester 493 else: 494 decorated = sys.executable 495 496 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 497 self.assertFalse(retval < 0, 498 "child process caught fatal signal %d" % -retval) 499 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 500 501 @unittest.skipUnless(has_textmode, "text mode not available") 502 def test_textmode(self): 503 # _mkstemp_inner can create files in text mode 504 505 # A text file is truncated at the first Ctrl+Z byte 506 f = self.do_create(bin=0) 507 f.write(b"blat\x1a") 508 f.write(b"extra\n") 509 os.lseek(f.fd, 0, os.SEEK_SET) 510 self.assertEqual(os.read(f.fd, 20), b"blat") 511 512 def make_temp(self): 513 return tempfile._mkstemp_inner(tempfile.gettempdir(), 514 tempfile.gettempprefix(), 515 '', 516 tempfile._bin_openflags, 517 str) 518 519 def test_collision_with_existing_file(self): 520 # _mkstemp_inner tries another name when a file with 521 # the chosen name already exists 522 with _inside_empty_temp_dir(), \ 523 _mock_candidate_names('aaa', 'aaa', 'bbb'): 524 (fd1, name1) = self.make_temp() 525 os.close(fd1) 526 self.assertTrue(name1.endswith('aaa')) 527 528 (fd2, name2) = self.make_temp() 529 os.close(fd2) 530 self.assertTrue(name2.endswith('bbb')) 531 532 def test_collision_with_existing_directory(self): 533 # _mkstemp_inner tries another name when a directory with 534 # the chosen name already exists 535 with _inside_empty_temp_dir(), \ 536 _mock_candidate_names('aaa', 'aaa', 'bbb'): 537 dir = tempfile.mkdtemp() 538 self.assertTrue(dir.endswith('aaa')) 539 540 (fd, name) = self.make_temp() 541 os.close(fd) 542 self.assertTrue(name.endswith('bbb')) 543 544 545class TestGetTempPrefix(BaseTestCase): 546 """Test gettempprefix().""" 547 548 def test_sane_template(self): 549 # gettempprefix returns a nonempty prefix string 550 p = tempfile.gettempprefix() 551 552 self.assertIsInstance(p, str) 553 self.assertGreater(len(p), 0) 554 555 pb = tempfile.gettempprefixb() 556 557 self.assertIsInstance(pb, bytes) 558 self.assertGreater(len(pb), 0) 559 560 def test_usable_template(self): 561 # gettempprefix returns a usable prefix string 562 563 # Create a temp directory, avoiding use of the prefix. 564 # Then attempt to create a file whose name is 565 # prefix + 'xxxxxx.xxx' in that directory. 566 p = tempfile.gettempprefix() + "xxxxxx.xxx" 567 d = tempfile.mkdtemp(prefix="") 568 try: 569 p = os.path.join(d, p) 570 fd = os.open(p, os.O_RDWR | os.O_CREAT) 571 os.close(fd) 572 os.unlink(p) 573 finally: 574 os.rmdir(d) 575 576 577class TestGetTempDir(BaseTestCase): 578 """Test gettempdir().""" 579 580 def test_directory_exists(self): 581 # gettempdir returns a directory which exists 582 583 for d in (tempfile.gettempdir(), tempfile.gettempdirb()): 584 self.assertTrue(os.path.isabs(d) or d == os.curdir, 585 "%r is not an absolute path" % d) 586 self.assertTrue(os.path.isdir(d), 587 "%r is not a directory" % d) 588 589 def test_directory_writable(self): 590 # gettempdir returns a directory writable by the user 591 592 # sneaky: just instantiate a NamedTemporaryFile, which 593 # defaults to writing into the directory returned by 594 # gettempdir. 595 with tempfile.NamedTemporaryFile() as file: 596 file.write(b"blat") 597 598 def test_same_thing(self): 599 # gettempdir always returns the same object 600 a = tempfile.gettempdir() 601 b = tempfile.gettempdir() 602 c = tempfile.gettempdirb() 603 604 self.assertTrue(a is b) 605 self.assertNotEqual(type(a), type(c)) 606 self.assertEqual(a, os.fsdecode(c)) 607 608 def test_case_sensitive(self): 609 # gettempdir should not flatten its case 610 # even on a case-insensitive file system 611 case_sensitive_tempdir = tempfile.mkdtemp("-Temp") 612 _tempdir, tempfile.tempdir = tempfile.tempdir, None 613 try: 614 with os_helper.EnvironmentVarGuard() as env: 615 # Fake the first env var which is checked as a candidate 616 env["TMPDIR"] = case_sensitive_tempdir 617 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir) 618 finally: 619 tempfile.tempdir = _tempdir 620 os_helper.rmdir(case_sensitive_tempdir) 621 622 623class TestMkstemp(BaseTestCase): 624 """Test mkstemp().""" 625 626 def do_create(self, dir=None, pre=None, suf=None): 627 output_type = tempfile._infer_return_type(dir, pre, suf) 628 if dir is None: 629 if output_type is str: 630 dir = tempfile.gettempdir() 631 else: 632 dir = tempfile.gettempdirb() 633 if pre is None: 634 pre = output_type() 635 if suf is None: 636 suf = output_type() 637 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 638 (ndir, nbase) = os.path.split(name) 639 adir = os.path.abspath(dir) 640 self.assertEqual(adir, ndir, 641 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir)) 642 643 try: 644 self.nameCheck(name, dir, pre, suf) 645 finally: 646 os.close(fd) 647 os.unlink(name) 648 649 def test_basic(self): 650 # mkstemp can create files 651 self.do_create() 652 self.do_create(pre="a") 653 self.do_create(suf="b") 654 self.do_create(pre="a", suf="b") 655 self.do_create(pre="aa", suf=".txt") 656 self.do_create(dir=".") 657 658 def test_basic_with_bytes_names(self): 659 # mkstemp can create files when given name parts all 660 # specified as bytes. 661 d = tempfile.gettempdirb() 662 self.do_create(dir=d, suf=b"") 663 self.do_create(dir=d, pre=b"a") 664 self.do_create(dir=d, suf=b"b") 665 self.do_create(dir=d, pre=b"a", suf=b"b") 666 self.do_create(dir=d, pre=b"aa", suf=b".txt") 667 self.do_create(dir=b".") 668 with self.assertRaises(TypeError): 669 self.do_create(dir=".", pre=b"aa", suf=b".txt") 670 with self.assertRaises(TypeError): 671 self.do_create(dir=b".", pre="aa", suf=b".txt") 672 with self.assertRaises(TypeError): 673 self.do_create(dir=b".", pre=b"aa", suf=".txt") 674 675 676 def test_choose_directory(self): 677 # mkstemp can create directories in a user-selected directory 678 dir = tempfile.mkdtemp() 679 try: 680 self.do_create(dir=dir) 681 self.do_create(dir=pathlib.Path(dir)) 682 finally: 683 os.rmdir(dir) 684 685 def test_for_tempdir_is_bytes_issue40701_api_warts(self): 686 orig_tempdir = tempfile.tempdir 687 self.assertIsInstance(tempfile.tempdir, (str, type(None))) 688 try: 689 fd, path = tempfile.mkstemp() 690 os.close(fd) 691 os.unlink(path) 692 self.assertIsInstance(path, str) 693 tempfile.tempdir = tempfile.gettempdirb() 694 self.assertIsInstance(tempfile.tempdir, bytes) 695 self.assertIsInstance(tempfile.gettempdir(), str) 696 self.assertIsInstance(tempfile.gettempdirb(), bytes) 697 fd, path = tempfile.mkstemp() 698 os.close(fd) 699 os.unlink(path) 700 self.assertIsInstance(path, bytes) 701 fd, path = tempfile.mkstemp(suffix='.txt') 702 os.close(fd) 703 os.unlink(path) 704 self.assertIsInstance(path, str) 705 fd, path = tempfile.mkstemp(prefix='test-temp-') 706 os.close(fd) 707 os.unlink(path) 708 self.assertIsInstance(path, str) 709 fd, path = tempfile.mkstemp(dir=tempfile.gettempdir()) 710 os.close(fd) 711 os.unlink(path) 712 self.assertIsInstance(path, str) 713 finally: 714 tempfile.tempdir = orig_tempdir 715 716 717class TestMkdtemp(TestBadTempdir, BaseTestCase): 718 """Test mkdtemp().""" 719 720 def make_temp(self): 721 return tempfile.mkdtemp() 722 723 def do_create(self, dir=None, pre=None, suf=None): 724 output_type = tempfile._infer_return_type(dir, pre, suf) 725 if dir is None: 726 if output_type is str: 727 dir = tempfile.gettempdir() 728 else: 729 dir = tempfile.gettempdirb() 730 if pre is None: 731 pre = output_type() 732 if suf is None: 733 suf = output_type() 734 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) 735 736 try: 737 self.nameCheck(name, dir, pre, suf) 738 return name 739 except: 740 os.rmdir(name) 741 raise 742 743 def test_basic(self): 744 # mkdtemp can create directories 745 os.rmdir(self.do_create()) 746 os.rmdir(self.do_create(pre="a")) 747 os.rmdir(self.do_create(suf="b")) 748 os.rmdir(self.do_create(pre="a", suf="b")) 749 os.rmdir(self.do_create(pre="aa", suf=".txt")) 750 751 def test_basic_with_bytes_names(self): 752 # mkdtemp can create directories when given all binary parts 753 d = tempfile.gettempdirb() 754 os.rmdir(self.do_create(dir=d)) 755 os.rmdir(self.do_create(dir=d, pre=b"a")) 756 os.rmdir(self.do_create(dir=d, suf=b"b")) 757 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b")) 758 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt")) 759 with self.assertRaises(TypeError): 760 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt")) 761 with self.assertRaises(TypeError): 762 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt")) 763 with self.assertRaises(TypeError): 764 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt")) 765 766 def test_basic_many(self): 767 # mkdtemp can create many directories (stochastic) 768 extant = list(range(TEST_FILES)) 769 try: 770 for i in extant: 771 extant[i] = self.do_create(pre="aa") 772 finally: 773 for i in extant: 774 if(isinstance(i, str)): 775 os.rmdir(i) 776 777 def test_choose_directory(self): 778 # mkdtemp can create directories in a user-selected directory 779 dir = tempfile.mkdtemp() 780 try: 781 os.rmdir(self.do_create(dir=dir)) 782 os.rmdir(self.do_create(dir=pathlib.Path(dir))) 783 finally: 784 os.rmdir(dir) 785 786 @os_helper.skip_unless_working_chmod 787 def test_mode(self): 788 # mkdtemp creates directories with the proper mode 789 790 dir = self.do_create() 791 try: 792 mode = stat.S_IMODE(os.stat(dir).st_mode) 793 mode &= 0o777 # Mask off sticky bits inherited from /tmp 794 expected = 0o700 795 if sys.platform == 'win32': 796 # There's no distinction among 'user', 'group' and 'world'; 797 # replicate the 'user' bits. 798 user = expected >> 6 799 expected = user * (1 + 8 + 64) 800 self.assertEqual(mode, expected) 801 finally: 802 os.rmdir(dir) 803 804 def test_collision_with_existing_file(self): 805 # mkdtemp tries another name when a file with 806 # the chosen name already exists 807 with _inside_empty_temp_dir(), \ 808 _mock_candidate_names('aaa', 'aaa', 'bbb'): 809 file = tempfile.NamedTemporaryFile(delete=False) 810 file.close() 811 self.assertTrue(file.name.endswith('aaa')) 812 dir = tempfile.mkdtemp() 813 self.assertTrue(dir.endswith('bbb')) 814 815 def test_collision_with_existing_directory(self): 816 # mkdtemp tries another name when a directory with 817 # the chosen name already exists 818 with _inside_empty_temp_dir(), \ 819 _mock_candidate_names('aaa', 'aaa', 'bbb'): 820 dir1 = tempfile.mkdtemp() 821 self.assertTrue(dir1.endswith('aaa')) 822 dir2 = tempfile.mkdtemp() 823 self.assertTrue(dir2.endswith('bbb')) 824 825 def test_for_tempdir_is_bytes_issue40701_api_warts(self): 826 orig_tempdir = tempfile.tempdir 827 self.assertIsInstance(tempfile.tempdir, (str, type(None))) 828 try: 829 path = tempfile.mkdtemp() 830 os.rmdir(path) 831 self.assertIsInstance(path, str) 832 tempfile.tempdir = tempfile.gettempdirb() 833 self.assertIsInstance(tempfile.tempdir, bytes) 834 self.assertIsInstance(tempfile.gettempdir(), str) 835 self.assertIsInstance(tempfile.gettempdirb(), bytes) 836 path = tempfile.mkdtemp() 837 os.rmdir(path) 838 self.assertIsInstance(path, bytes) 839 path = tempfile.mkdtemp(suffix='-dir') 840 os.rmdir(path) 841 self.assertIsInstance(path, str) 842 path = tempfile.mkdtemp(prefix='test-mkdtemp-') 843 os.rmdir(path) 844 self.assertIsInstance(path, str) 845 path = tempfile.mkdtemp(dir=tempfile.gettempdir()) 846 os.rmdir(path) 847 self.assertIsInstance(path, str) 848 finally: 849 tempfile.tempdir = orig_tempdir 850 851 852class TestMktemp(BaseTestCase): 853 """Test mktemp().""" 854 855 # For safety, all use of mktemp must occur in a private directory. 856 # We must also suppress the RuntimeWarning it generates. 857 def setUp(self): 858 self.dir = tempfile.mkdtemp() 859 super().setUp() 860 861 def tearDown(self): 862 if self.dir: 863 os.rmdir(self.dir) 864 self.dir = None 865 super().tearDown() 866 867 class mktemped: 868 _unlink = os.unlink 869 _bflags = tempfile._bin_openflags 870 871 def __init__(self, dir, pre, suf): 872 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) 873 # Create the file. This will raise an exception if it's 874 # mysteriously appeared in the meanwhile. 875 os.close(os.open(self.name, self._bflags, 0o600)) 876 877 def __del__(self): 878 self._unlink(self.name) 879 880 def do_create(self, pre="", suf=""): 881 file = self.mktemped(self.dir, pre, suf) 882 883 self.nameCheck(file.name, self.dir, pre, suf) 884 return file 885 886 def test_basic(self): 887 # mktemp can choose usable file names 888 self.do_create() 889 self.do_create(pre="a") 890 self.do_create(suf="b") 891 self.do_create(pre="a", suf="b") 892 self.do_create(pre="aa", suf=".txt") 893 894 def test_many(self): 895 # mktemp can choose many usable file names (stochastic) 896 extant = list(range(TEST_FILES)) 897 for i in extant: 898 extant[i] = self.do_create(pre="aa") 899 del extant 900 support.gc_collect() # For PyPy or other GCs. 901 902## def test_warning(self): 903## # mktemp issues a warning when used 904## warnings.filterwarnings("error", 905## category=RuntimeWarning, 906## message="mktemp") 907## self.assertRaises(RuntimeWarning, 908## tempfile.mktemp, dir=self.dir) 909 910 911# We test _TemporaryFileWrapper by testing NamedTemporaryFile. 912 913 914class TestNamedTemporaryFile(BaseTestCase): 915 """Test NamedTemporaryFile().""" 916 917 def do_create(self, dir=None, pre="", suf="", delete=True): 918 if dir is None: 919 dir = tempfile.gettempdir() 920 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf, 921 delete=delete) 922 923 self.nameCheck(file.name, dir, pre, suf) 924 return file 925 926 927 def test_basic(self): 928 # NamedTemporaryFile can create files 929 self.do_create() 930 self.do_create(pre="a") 931 self.do_create(suf="b") 932 self.do_create(pre="a", suf="b") 933 self.do_create(pre="aa", suf=".txt") 934 935 def test_method_lookup(self): 936 # Issue #18879: Looking up a temporary file method should keep it 937 # alive long enough. 938 f = self.do_create() 939 wr = weakref.ref(f) 940 write = f.write 941 write2 = f.write 942 del f 943 write(b'foo') 944 del write 945 write2(b'bar') 946 del write2 947 if support.check_impl_detail(cpython=True): 948 # No reference cycle was created. 949 self.assertIsNone(wr()) 950 951 def test_iter(self): 952 # Issue #23700: getting iterator from a temporary file should keep 953 # it alive as long as it's being iterated over 954 lines = [b'spam\n', b'eggs\n', b'beans\n'] 955 def make_file(): 956 f = tempfile.NamedTemporaryFile(mode='w+b') 957 f.write(b''.join(lines)) 958 f.seek(0) 959 return f 960 for i, l in enumerate(make_file()): 961 self.assertEqual(l, lines[i]) 962 self.assertEqual(i, len(lines) - 1) 963 964 def test_creates_named(self): 965 # NamedTemporaryFile creates files with names 966 f = tempfile.NamedTemporaryFile() 967 self.assertTrue(os.path.exists(f.name), 968 "NamedTemporaryFile %s does not exist" % f.name) 969 970 def test_del_on_close(self): 971 # A NamedTemporaryFile is deleted when closed 972 dir = tempfile.mkdtemp() 973 try: 974 with tempfile.NamedTemporaryFile(dir=dir) as f: 975 f.write(b'blat') 976 self.assertEqual(os.listdir(dir), []) 977 self.assertFalse(os.path.exists(f.name), 978 "NamedTemporaryFile %s exists after close" % f.name) 979 finally: 980 os.rmdir(dir) 981 982 def test_dis_del_on_close(self): 983 # Tests that delete-on-close can be disabled 984 dir = tempfile.mkdtemp() 985 tmp = None 986 try: 987 f = tempfile.NamedTemporaryFile(dir=dir, delete=False) 988 tmp = f.name 989 f.write(b'blat') 990 f.close() 991 self.assertTrue(os.path.exists(f.name), 992 "NamedTemporaryFile %s missing after close" % f.name) 993 finally: 994 if tmp is not None: 995 os.unlink(tmp) 996 os.rmdir(dir) 997 998 def test_multiple_close(self): 999 # A NamedTemporaryFile can be closed many times without error 1000 f = tempfile.NamedTemporaryFile() 1001 f.write(b'abc\n') 1002 f.close() 1003 f.close() 1004 f.close() 1005 1006 def test_context_manager(self): 1007 # A NamedTemporaryFile can be used as a context manager 1008 with tempfile.NamedTemporaryFile() as f: 1009 self.assertTrue(os.path.exists(f.name)) 1010 self.assertFalse(os.path.exists(f.name)) 1011 def use_closed(): 1012 with f: 1013 pass 1014 self.assertRaises(ValueError, use_closed) 1015 1016 def test_bad_mode(self): 1017 dir = tempfile.mkdtemp() 1018 self.addCleanup(os_helper.rmtree, dir) 1019 with self.assertRaises(ValueError): 1020 tempfile.NamedTemporaryFile(mode='wr', dir=dir) 1021 with self.assertRaises(TypeError): 1022 tempfile.NamedTemporaryFile(mode=2, dir=dir) 1023 self.assertEqual(os.listdir(dir), []) 1024 1025 def test_bad_encoding(self): 1026 dir = tempfile.mkdtemp() 1027 self.addCleanup(os_helper.rmtree, dir) 1028 with self.assertRaises(LookupError): 1029 tempfile.NamedTemporaryFile('w', encoding='bad-encoding', dir=dir) 1030 self.assertEqual(os.listdir(dir), []) 1031 1032 def test_unexpected_error(self): 1033 dir = tempfile.mkdtemp() 1034 self.addCleanup(os_helper.rmtree, dir) 1035 with mock.patch('tempfile._TemporaryFileWrapper') as mock_ntf, \ 1036 mock.patch('io.open', mock.mock_open()) as mock_open: 1037 mock_ntf.side_effect = KeyboardInterrupt() 1038 with self.assertRaises(KeyboardInterrupt): 1039 tempfile.NamedTemporaryFile(dir=dir) 1040 mock_open().close.assert_called() 1041 self.assertEqual(os.listdir(dir), []) 1042 1043 # How to test the mode and bufsize parameters? 1044 1045class TestSpooledTemporaryFile(BaseTestCase): 1046 """Test SpooledTemporaryFile().""" 1047 1048 def do_create(self, max_size=0, dir=None, pre="", suf=""): 1049 if dir is None: 1050 dir = tempfile.gettempdir() 1051 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) 1052 1053 return file 1054 1055 1056 def test_basic(self): 1057 # SpooledTemporaryFile can create files 1058 f = self.do_create() 1059 self.assertFalse(f._rolled) 1060 f = self.do_create(max_size=100, pre="a", suf=".txt") 1061 self.assertFalse(f._rolled) 1062 1063 def test_is_iobase(self): 1064 # SpooledTemporaryFile should implement io.IOBase 1065 self.assertIsInstance(self.do_create(), io.IOBase) 1066 1067 def test_iobase_interface(self): 1068 # SpooledTemporaryFile should implement the io.IOBase interface. 1069 # Ensure it has all the required methods and properties. 1070 iobase_attrs = { 1071 # From IOBase 1072 'fileno', 'seek', 'truncate', 'close', 'closed', '__enter__', 1073 '__exit__', 'flush', 'isatty', '__iter__', '__next__', 'readable', 1074 'readline', 'readlines', 'seekable', 'tell', 'writable', 1075 'writelines', 1076 # From BufferedIOBase (binary mode) and TextIOBase (text mode) 1077 'detach', 'read', 'read1', 'write', 'readinto', 'readinto1', 1078 'encoding', 'errors', 'newlines', 1079 } 1080 spooledtempfile_attrs = set(dir(tempfile.SpooledTemporaryFile)) 1081 missing_attrs = iobase_attrs - spooledtempfile_attrs 1082 self.assertFalse( 1083 missing_attrs, 1084 'SpooledTemporaryFile missing attributes from IOBase/BufferedIOBase/TextIOBase' 1085 ) 1086 1087 def test_del_on_close(self): 1088 # A SpooledTemporaryFile is deleted when closed 1089 dir = tempfile.mkdtemp() 1090 try: 1091 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 1092 self.assertFalse(f._rolled) 1093 f.write(b'blat ' * 5) 1094 self.assertTrue(f._rolled) 1095 filename = f.name 1096 f.close() 1097 self.assertEqual(os.listdir(dir), []) 1098 if not isinstance(filename, int): 1099 self.assertFalse(os.path.exists(filename), 1100 "SpooledTemporaryFile %s exists after close" % filename) 1101 finally: 1102 os.rmdir(dir) 1103 1104 def test_del_unrolled_file(self): 1105 # The unrolled SpooledTemporaryFile should raise a ResourceWarning 1106 # when deleted since the file was not explicitly closed. 1107 f = self.do_create(max_size=10) 1108 f.write(b'foo') 1109 self.assertEqual(f.name, None) # Unrolled so no filename/fd 1110 with self.assertWarns(ResourceWarning): 1111 f.__del__() 1112 1113 @unittest.skipIf( 1114 support.is_emscripten, "Emscripten cannot fstat renamed files." 1115 ) 1116 def test_del_rolled_file(self): 1117 # The rolled file should be deleted when the SpooledTemporaryFile 1118 # object is deleted. This should raise a ResourceWarning since the file 1119 # was not explicitly closed. 1120 f = self.do_create(max_size=2) 1121 f.write(b'foo') 1122 name = f.name # This is a fd on posix+cygwin, a filename everywhere else 1123 self.assertTrue(os.path.exists(name)) 1124 with self.assertWarns(ResourceWarning): 1125 f.__del__() 1126 self.assertFalse( 1127 os.path.exists(name), 1128 "Rolled SpooledTemporaryFile (name=%s) exists after delete" % name 1129 ) 1130 1131 def test_rewrite_small(self): 1132 # A SpooledTemporaryFile can be written to multiple within the max_size 1133 f = self.do_create(max_size=30) 1134 self.assertFalse(f._rolled) 1135 for i in range(5): 1136 f.seek(0, 0) 1137 f.write(b'x' * 20) 1138 self.assertFalse(f._rolled) 1139 1140 def test_write_sequential(self): 1141 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1142 # over afterward 1143 f = self.do_create(max_size=30) 1144 self.assertFalse(f._rolled) 1145 f.write(b'x' * 20) 1146 self.assertFalse(f._rolled) 1147 f.write(b'x' * 10) 1148 self.assertFalse(f._rolled) 1149 f.write(b'x') 1150 self.assertTrue(f._rolled) 1151 1152 def test_writelines(self): 1153 # Verify writelines with a SpooledTemporaryFile 1154 f = self.do_create() 1155 f.writelines((b'x', b'y', b'z')) 1156 pos = f.seek(0) 1157 self.assertEqual(pos, 0) 1158 buf = f.read() 1159 self.assertEqual(buf, b'xyz') 1160 1161 def test_writelines_sequential(self): 1162 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1163 # over afterward 1164 f = self.do_create(max_size=35) 1165 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 1166 self.assertFalse(f._rolled) 1167 f.write(b'x') 1168 self.assertTrue(f._rolled) 1169 1170 def test_sparse(self): 1171 # A SpooledTemporaryFile that is written late in the file will extend 1172 # when that occurs 1173 f = self.do_create(max_size=30) 1174 self.assertFalse(f._rolled) 1175 pos = f.seek(100, 0) 1176 self.assertEqual(pos, 100) 1177 self.assertFalse(f._rolled) 1178 f.write(b'x') 1179 self.assertTrue(f._rolled) 1180 1181 def test_fileno(self): 1182 # A SpooledTemporaryFile should roll over to a real file on fileno() 1183 f = self.do_create(max_size=30) 1184 self.assertFalse(f._rolled) 1185 self.assertTrue(f.fileno() > 0) 1186 self.assertTrue(f._rolled) 1187 1188 def test_multiple_close_before_rollover(self): 1189 # A SpooledTemporaryFile can be closed many times without error 1190 f = tempfile.SpooledTemporaryFile() 1191 f.write(b'abc\n') 1192 self.assertFalse(f._rolled) 1193 f.close() 1194 f.close() 1195 f.close() 1196 1197 def test_multiple_close_after_rollover(self): 1198 # A SpooledTemporaryFile can be closed many times without error 1199 f = tempfile.SpooledTemporaryFile(max_size=1) 1200 f.write(b'abc\n') 1201 self.assertTrue(f._rolled) 1202 f.close() 1203 f.close() 1204 f.close() 1205 1206 def test_bound_methods(self): 1207 # It should be OK to steal a bound method from a SpooledTemporaryFile 1208 # and use it independently; when the file rolls over, those bound 1209 # methods should continue to function 1210 f = self.do_create(max_size=30) 1211 read = f.read 1212 write = f.write 1213 seek = f.seek 1214 1215 write(b"a" * 35) 1216 write(b"b" * 35) 1217 seek(0, 0) 1218 self.assertEqual(read(70), b'a'*35 + b'b'*35) 1219 1220 def test_properties(self): 1221 f = tempfile.SpooledTemporaryFile(max_size=10) 1222 f.write(b'x' * 10) 1223 self.assertFalse(f._rolled) 1224 self.assertEqual(f.mode, 'w+b') 1225 self.assertIsNone(f.name) 1226 with self.assertRaises(AttributeError): 1227 f.newlines 1228 with self.assertRaises(AttributeError): 1229 f.encoding 1230 with self.assertRaises(AttributeError): 1231 f.errors 1232 1233 f.write(b'x') 1234 self.assertTrue(f._rolled) 1235 self.assertEqual(f.mode, 'rb+') 1236 self.assertIsNotNone(f.name) 1237 with self.assertRaises(AttributeError): 1238 f.newlines 1239 with self.assertRaises(AttributeError): 1240 f.encoding 1241 with self.assertRaises(AttributeError): 1242 f.errors 1243 1244 def test_text_mode(self): 1245 # Creating a SpooledTemporaryFile with a text mode should produce 1246 # a file object reading and writing (Unicode) text strings. 1247 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1248 encoding="utf-8") 1249 f.write("abc\n") 1250 f.seek(0) 1251 self.assertEqual(f.read(), "abc\n") 1252 f.write("def\n") 1253 f.seek(0) 1254 self.assertEqual(f.read(), "abc\ndef\n") 1255 self.assertFalse(f._rolled) 1256 self.assertEqual(f.mode, 'w+') 1257 self.assertIsNone(f.name) 1258 self.assertEqual(f.newlines, os.linesep) 1259 self.assertEqual(f.encoding, "utf-8") 1260 self.assertEqual(f.errors, "strict") 1261 1262 f.write("xyzzy\n") 1263 f.seek(0) 1264 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") 1265 # Check that Ctrl+Z doesn't truncate the file 1266 f.write("foo\x1abar\n") 1267 f.seek(0) 1268 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") 1269 self.assertTrue(f._rolled) 1270 self.assertEqual(f.mode, 'w+') 1271 self.assertIsNotNone(f.name) 1272 self.assertEqual(f.newlines, os.linesep) 1273 self.assertEqual(f.encoding, "utf-8") 1274 self.assertEqual(f.errors, "strict") 1275 1276 def test_text_newline_and_encoding(self): 1277 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1278 newline='', encoding='utf-8', 1279 errors='ignore') 1280 f.write("\u039B\r\n") 1281 f.seek(0) 1282 self.assertEqual(f.read(), "\u039B\r\n") 1283 self.assertFalse(f._rolled) 1284 self.assertEqual(f.mode, 'w+') 1285 self.assertIsNone(f.name) 1286 self.assertIsNotNone(f.newlines) 1287 self.assertEqual(f.encoding, "utf-8") 1288 self.assertEqual(f.errors, "ignore") 1289 1290 f.write("\u039C" * 10 + "\r\n") 1291 f.write("\u039D" * 20) 1292 f.seek(0) 1293 self.assertEqual(f.read(), 1294 "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20)) 1295 self.assertTrue(f._rolled) 1296 self.assertEqual(f.mode, 'w+') 1297 self.assertIsNotNone(f.name) 1298 self.assertIsNotNone(f.newlines) 1299 self.assertEqual(f.encoding, 'utf-8') 1300 self.assertEqual(f.errors, 'ignore') 1301 1302 def test_context_manager_before_rollover(self): 1303 # A SpooledTemporaryFile can be used as a context manager 1304 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1305 self.assertFalse(f._rolled) 1306 self.assertFalse(f.closed) 1307 self.assertTrue(f.closed) 1308 def use_closed(): 1309 with f: 1310 pass 1311 self.assertRaises(ValueError, use_closed) 1312 1313 def test_context_manager_during_rollover(self): 1314 # A SpooledTemporaryFile can be used as a context manager 1315 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1316 self.assertFalse(f._rolled) 1317 f.write(b'abc\n') 1318 f.flush() 1319 self.assertTrue(f._rolled) 1320 self.assertFalse(f.closed) 1321 self.assertTrue(f.closed) 1322 def use_closed(): 1323 with f: 1324 pass 1325 self.assertRaises(ValueError, use_closed) 1326 1327 def test_context_manager_after_rollover(self): 1328 # A SpooledTemporaryFile can be used as a context manager 1329 f = tempfile.SpooledTemporaryFile(max_size=1) 1330 f.write(b'abc\n') 1331 f.flush() 1332 self.assertTrue(f._rolled) 1333 with f: 1334 self.assertFalse(f.closed) 1335 self.assertTrue(f.closed) 1336 def use_closed(): 1337 with f: 1338 pass 1339 self.assertRaises(ValueError, use_closed) 1340 1341 @unittest.skipIf( 1342 support.is_emscripten, "Emscripten cannot fstat renamed files." 1343 ) 1344 def test_truncate_with_size_parameter(self): 1345 # A SpooledTemporaryFile can be truncated to zero size 1346 f = tempfile.SpooledTemporaryFile(max_size=10) 1347 f.write(b'abcdefg\n') 1348 f.seek(0) 1349 f.truncate() 1350 self.assertFalse(f._rolled) 1351 self.assertEqual(f._file.getvalue(), b'') 1352 # A SpooledTemporaryFile can be truncated to a specific size 1353 f = tempfile.SpooledTemporaryFile(max_size=10) 1354 f.write(b'abcdefg\n') 1355 f.truncate(4) 1356 self.assertFalse(f._rolled) 1357 self.assertEqual(f._file.getvalue(), b'abcd') 1358 # A SpooledTemporaryFile rolls over if truncated to large size 1359 f = tempfile.SpooledTemporaryFile(max_size=10) 1360 f.write(b'abcdefg\n') 1361 f.truncate(20) 1362 self.assertTrue(f._rolled) 1363 self.assertEqual(os.fstat(f.fileno()).st_size, 20) 1364 1365 def test_class_getitem(self): 1366 self.assertIsInstance(tempfile.SpooledTemporaryFile[bytes], 1367 types.GenericAlias) 1368 1369if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: 1370 1371 class TestTemporaryFile(BaseTestCase): 1372 """Test TemporaryFile().""" 1373 1374 def test_basic(self): 1375 # TemporaryFile can create files 1376 # No point in testing the name params - the file has no name. 1377 tempfile.TemporaryFile() 1378 1379 def test_has_no_name(self): 1380 # TemporaryFile creates files with no names (on this system) 1381 dir = tempfile.mkdtemp() 1382 f = tempfile.TemporaryFile(dir=dir) 1383 f.write(b'blat') 1384 1385 # Sneaky: because this file has no name, it should not prevent 1386 # us from removing the directory it was created in. 1387 try: 1388 os.rmdir(dir) 1389 except: 1390 # cleanup 1391 f.close() 1392 os.rmdir(dir) 1393 raise 1394 1395 def test_multiple_close(self): 1396 # A TemporaryFile can be closed many times without error 1397 f = tempfile.TemporaryFile() 1398 f.write(b'abc\n') 1399 f.close() 1400 f.close() 1401 f.close() 1402 1403 # How to test the mode and bufsize parameters? 1404 def test_mode_and_encoding(self): 1405 1406 def roundtrip(input, *args, **kwargs): 1407 with tempfile.TemporaryFile(*args, **kwargs) as fileobj: 1408 fileobj.write(input) 1409 fileobj.seek(0) 1410 self.assertEqual(input, fileobj.read()) 1411 1412 roundtrip(b"1234", "w+b") 1413 roundtrip("abdc\n", "w+") 1414 roundtrip("\u039B", "w+", encoding="utf-16") 1415 roundtrip("foo\r\n", "w+", newline="") 1416 1417 def test_bad_mode(self): 1418 dir = tempfile.mkdtemp() 1419 self.addCleanup(os_helper.rmtree, dir) 1420 with self.assertRaises(ValueError): 1421 tempfile.TemporaryFile(mode='wr', dir=dir) 1422 with self.assertRaises(TypeError): 1423 tempfile.TemporaryFile(mode=2, dir=dir) 1424 self.assertEqual(os.listdir(dir), []) 1425 1426 def test_bad_encoding(self): 1427 dir = tempfile.mkdtemp() 1428 self.addCleanup(os_helper.rmtree, dir) 1429 with self.assertRaises(LookupError): 1430 tempfile.TemporaryFile('w', encoding='bad-encoding', dir=dir) 1431 self.assertEqual(os.listdir(dir), []) 1432 1433 def test_unexpected_error(self): 1434 dir = tempfile.mkdtemp() 1435 self.addCleanup(os_helper.rmtree, dir) 1436 with mock.patch('tempfile._O_TMPFILE_WORKS', False), \ 1437 mock.patch('os.unlink') as mock_unlink, \ 1438 mock.patch('os.open') as mock_open, \ 1439 mock.patch('os.close') as mock_close: 1440 mock_unlink.side_effect = KeyboardInterrupt() 1441 with self.assertRaises(KeyboardInterrupt): 1442 tempfile.TemporaryFile(dir=dir) 1443 mock_close.assert_called() 1444 self.assertEqual(os.listdir(dir), []) 1445 1446 1447# Helper for test_del_on_shutdown 1448class NulledModules: 1449 def __init__(self, *modules): 1450 self.refs = [mod.__dict__ for mod in modules] 1451 self.contents = [ref.copy() for ref in self.refs] 1452 1453 def __enter__(self): 1454 for d in self.refs: 1455 for key in d: 1456 d[key] = None 1457 1458 def __exit__(self, *exc_info): 1459 for d, c in zip(self.refs, self.contents): 1460 d.clear() 1461 d.update(c) 1462 1463 1464class TestTemporaryDirectory(BaseTestCase): 1465 """Test TemporaryDirectory().""" 1466 1467 def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1, 1468 ignore_cleanup_errors=False): 1469 if dir is None: 1470 dir = tempfile.gettempdir() 1471 tmp = tempfile.TemporaryDirectory( 1472 dir=dir, prefix=pre, suffix=suf, 1473 ignore_cleanup_errors=ignore_cleanup_errors) 1474 self.nameCheck(tmp.name, dir, pre, suf) 1475 self.do_create2(tmp.name, recurse, dirs, files) 1476 return tmp 1477 1478 def do_create2(self, path, recurse=1, dirs=1, files=1): 1479 # Create subdirectories and some files 1480 if recurse: 1481 for i in range(dirs): 1482 name = os.path.join(path, "dir%d" % i) 1483 os.mkdir(name) 1484 self.do_create2(name, recurse-1, dirs, files) 1485 for i in range(files): 1486 with open(os.path.join(path, "test%d.txt" % i), "wb") as f: 1487 f.write(b"Hello world!") 1488 1489 def test_mkdtemp_failure(self): 1490 # Check no additional exception if mkdtemp fails 1491 # Previously would raise AttributeError instead 1492 # (noted as part of Issue #10188) 1493 with tempfile.TemporaryDirectory() as nonexistent: 1494 pass 1495 with self.assertRaises(FileNotFoundError) as cm: 1496 tempfile.TemporaryDirectory(dir=nonexistent) 1497 self.assertEqual(cm.exception.errno, errno.ENOENT) 1498 1499 def test_explicit_cleanup(self): 1500 # A TemporaryDirectory is deleted when cleaned up 1501 dir = tempfile.mkdtemp() 1502 try: 1503 d = self.do_create(dir=dir) 1504 self.assertTrue(os.path.exists(d.name), 1505 "TemporaryDirectory %s does not exist" % d.name) 1506 d.cleanup() 1507 self.assertFalse(os.path.exists(d.name), 1508 "TemporaryDirectory %s exists after cleanup" % d.name) 1509 finally: 1510 os.rmdir(dir) 1511 1512 def test_explict_cleanup_ignore_errors(self): 1513 """Test that cleanup doesn't return an error when ignoring them.""" 1514 with tempfile.TemporaryDirectory() as working_dir: 1515 temp_dir = self.do_create( 1516 dir=working_dir, ignore_cleanup_errors=True) 1517 temp_path = pathlib.Path(temp_dir.name) 1518 self.assertTrue(temp_path.exists(), 1519 f"TemporaryDirectory {temp_path!s} does not exist") 1520 with open(temp_path / "a_file.txt", "w+t") as open_file: 1521 open_file.write("Hello world!\n") 1522 temp_dir.cleanup() 1523 self.assertEqual(len(list(temp_path.glob("*"))), 1524 int(sys.platform.startswith("win")), 1525 "Unexpected number of files in " 1526 f"TemporaryDirectory {temp_path!s}") 1527 self.assertEqual( 1528 temp_path.exists(), 1529 sys.platform.startswith("win"), 1530 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1531 temp_dir.cleanup() 1532 self.assertFalse( 1533 temp_path.exists(), 1534 f"TemporaryDirectory {temp_path!s} exists after cleanup") 1535 1536 @os_helper.skip_unless_symlink 1537 def test_cleanup_with_symlink_to_a_directory(self): 1538 # cleanup() should not follow symlinks to directories (issue #12464) 1539 d1 = self.do_create() 1540 d2 = self.do_create(recurse=0) 1541 1542 # Symlink d1/foo -> d2 1543 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1544 1545 # This call to cleanup() should not follow the "foo" symlink 1546 d1.cleanup() 1547 1548 self.assertFalse(os.path.exists(d1.name), 1549 "TemporaryDirectory %s exists after cleanup" % d1.name) 1550 self.assertTrue(os.path.exists(d2.name), 1551 "Directory pointed to by a symlink was deleted") 1552 self.assertEqual(os.listdir(d2.name), ['test0.txt'], 1553 "Contents of the directory pointed to by a symlink " 1554 "were deleted") 1555 d2.cleanup() 1556 1557 @support.cpython_only 1558 def test_del_on_collection(self): 1559 # A TemporaryDirectory is deleted when garbage collected 1560 dir = tempfile.mkdtemp() 1561 try: 1562 d = self.do_create(dir=dir) 1563 name = d.name 1564 del d # Rely on refcounting to invoke __del__ 1565 self.assertFalse(os.path.exists(name), 1566 "TemporaryDirectory %s exists after __del__" % name) 1567 finally: 1568 os.rmdir(dir) 1569 1570 @support.cpython_only 1571 def test_del_on_collection_ignore_errors(self): 1572 """Test that ignoring errors works when TemporaryDirectory is gced.""" 1573 with tempfile.TemporaryDirectory() as working_dir: 1574 temp_dir = self.do_create( 1575 dir=working_dir, ignore_cleanup_errors=True) 1576 temp_path = pathlib.Path(temp_dir.name) 1577 self.assertTrue(temp_path.exists(), 1578 f"TemporaryDirectory {temp_path!s} does not exist") 1579 with open(temp_path / "a_file.txt", "w+t") as open_file: 1580 open_file.write("Hello world!\n") 1581 del temp_dir 1582 self.assertEqual(len(list(temp_path.glob("*"))), 1583 int(sys.platform.startswith("win")), 1584 "Unexpected number of files in " 1585 f"TemporaryDirectory {temp_path!s}") 1586 self.assertEqual( 1587 temp_path.exists(), 1588 sys.platform.startswith("win"), 1589 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1590 1591 def test_del_on_shutdown(self): 1592 # A TemporaryDirectory may be cleaned up during shutdown 1593 with self.do_create() as dir: 1594 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'): 1595 code = """if True: 1596 import builtins 1597 import os 1598 import shutil 1599 import sys 1600 import tempfile 1601 import warnings 1602 1603 tmp = tempfile.TemporaryDirectory(dir={dir!r}) 1604 sys.stdout.buffer.write(tmp.name.encode()) 1605 1606 tmp2 = os.path.join(tmp.name, 'test_dir') 1607 os.mkdir(tmp2) 1608 with open(os.path.join(tmp2, "test0.txt"), "w") as f: 1609 f.write("Hello world!") 1610 1611 {mod}.tmp = tmp 1612 1613 warnings.filterwarnings("always", category=ResourceWarning) 1614 """.format(dir=dir, mod=mod) 1615 rc, out, err = script_helper.assert_python_ok("-c", code) 1616 tmp_name = out.decode().strip() 1617 self.assertFalse(os.path.exists(tmp_name), 1618 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1619 err = err.decode('utf-8', 'backslashreplace') 1620 self.assertNotIn("Exception ", err) 1621 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1622 1623 def test_del_on_shutdown_ignore_errors(self): 1624 """Test ignoring errors works when a tempdir is gc'ed on shutdown.""" 1625 with tempfile.TemporaryDirectory() as working_dir: 1626 code = """if True: 1627 import pathlib 1628 import sys 1629 import tempfile 1630 import warnings 1631 1632 temp_dir = tempfile.TemporaryDirectory( 1633 dir={working_dir!r}, ignore_cleanup_errors=True) 1634 sys.stdout.buffer.write(temp_dir.name.encode()) 1635 1636 temp_dir_2 = pathlib.Path(temp_dir.name) / "test_dir" 1637 temp_dir_2.mkdir() 1638 with open(temp_dir_2 / "test0.txt", "w") as test_file: 1639 test_file.write("Hello world!") 1640 open_file = open(temp_dir_2 / "open_file.txt", "w") 1641 open_file.write("Hello world!") 1642 1643 warnings.filterwarnings("always", category=ResourceWarning) 1644 """.format(working_dir=working_dir) 1645 __, out, err = script_helper.assert_python_ok("-c", code) 1646 temp_path = pathlib.Path(out.decode().strip()) 1647 self.assertEqual(len(list(temp_path.glob("*"))), 1648 int(sys.platform.startswith("win")), 1649 "Unexpected number of files in " 1650 f"TemporaryDirectory {temp_path!s}") 1651 self.assertEqual( 1652 temp_path.exists(), 1653 sys.platform.startswith("win"), 1654 f"TemporaryDirectory {temp_path!s} existence state unexpected") 1655 err = err.decode('utf-8', 'backslashreplace') 1656 self.assertNotIn("Exception", err) 1657 self.assertNotIn("Error", err) 1658 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1659 1660 def test_exit_on_shutdown(self): 1661 # Issue #22427 1662 with self.do_create() as dir: 1663 code = """if True: 1664 import sys 1665 import tempfile 1666 import warnings 1667 1668 def generator(): 1669 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: 1670 yield tmp 1671 g = generator() 1672 sys.stdout.buffer.write(next(g).encode()) 1673 1674 warnings.filterwarnings("always", category=ResourceWarning) 1675 """.format(dir=dir) 1676 rc, out, err = script_helper.assert_python_ok("-c", code) 1677 tmp_name = out.decode().strip() 1678 self.assertFalse(os.path.exists(tmp_name), 1679 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1680 err = err.decode('utf-8', 'backslashreplace') 1681 self.assertNotIn("Exception ", err) 1682 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1683 1684 def test_warnings_on_cleanup(self): 1685 # ResourceWarning will be triggered by __del__ 1686 with self.do_create() as dir: 1687 d = self.do_create(dir=dir, recurse=3) 1688 name = d.name 1689 1690 # Check for the resource warning 1691 with warnings_helper.check_warnings(('Implicitly', 1692 ResourceWarning), 1693 quiet=False): 1694 warnings.filterwarnings("always", category=ResourceWarning) 1695 del d 1696 support.gc_collect() 1697 self.assertFalse(os.path.exists(name), 1698 "TemporaryDirectory %s exists after __del__" % name) 1699 1700 def test_multiple_close(self): 1701 # Can be cleaned-up many times without error 1702 d = self.do_create() 1703 d.cleanup() 1704 d.cleanup() 1705 d.cleanup() 1706 1707 def test_context_manager(self): 1708 # Can be used as a context manager 1709 d = self.do_create() 1710 with d as name: 1711 self.assertTrue(os.path.exists(name)) 1712 self.assertEqual(name, d.name) 1713 self.assertFalse(os.path.exists(name)) 1714 1715 def test_modes(self): 1716 for mode in range(8): 1717 mode <<= 6 1718 with self.subTest(mode=format(mode, '03o')): 1719 d = self.do_create(recurse=3, dirs=2, files=2) 1720 with d: 1721 # Change files and directories mode recursively. 1722 for root, dirs, files in os.walk(d.name, topdown=False): 1723 for name in files: 1724 os.chmod(os.path.join(root, name), mode) 1725 os.chmod(root, mode) 1726 d.cleanup() 1727 self.assertFalse(os.path.exists(d.name)) 1728 1729 @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags') 1730 def test_flags(self): 1731 flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK 1732 d = self.do_create(recurse=3, dirs=2, files=2) 1733 with d: 1734 # Change files and directories flags recursively. 1735 for root, dirs, files in os.walk(d.name, topdown=False): 1736 for name in files: 1737 os.chflags(os.path.join(root, name), flags) 1738 os.chflags(root, flags) 1739 d.cleanup() 1740 self.assertFalse(os.path.exists(d.name)) 1741 1742 1743if __name__ == "__main__": 1744 unittest.main() 1745