1import importlib 2import shutil 3import stat 4import sys 5import os 6import unittest 7import socket 8import tempfile 9import textwrap 10import errno 11from test import support 12from test.support import script_helper 13 14TESTFN = support.TESTFN 15 16 17class ClassicClass: 18 pass 19 20class NewStyleClass(object): 21 pass 22 23 24class TestSupport(unittest.TestCase): 25 26 def test_import_module(self): 27 support.import_module("ftplib") 28 self.assertRaises(unittest.SkipTest, support.import_module, "foo") 29 30 def test_import_fresh_module(self): 31 support.import_fresh_module("ftplib") 32 33 def test_get_attribute(self): 34 self.assertEqual(support.get_attribute(self, "test_get_attribute"), 35 self.test_get_attribute) 36 self.assertRaises(unittest.SkipTest, support.get_attribute, self, "foo") 37 with self.assertRaisesRegexp(unittest.SkipTest, 'unittest'): 38 support.get_attribute(unittest, 'foo') 39 with self.assertRaisesRegexp(unittest.SkipTest, 'ClassicClass'): 40 support.get_attribute(ClassicClass, 'foo') 41 with self.assertRaisesRegexp(unittest.SkipTest, 'ClassicClass'): 42 support.get_attribute(ClassicClass(), 'foo') 43 with self.assertRaisesRegexp(unittest.SkipTest, 'NewStyleClass'): 44 support.get_attribute(NewStyleClass, 'foo') 45 with self.assertRaisesRegexp(unittest.SkipTest, 'NewStyleClass'): 46 support.get_attribute(NewStyleClass(), 'foo') 47 48 @unittest.skip("failing buildbots") 49 def test_get_original_stdout(self): 50 self.assertEqual(support.get_original_stdout(), sys.stdout) 51 52 def test_unload(self): 53 import sched 54 self.assertIn("sched", sys.modules) 55 support.unload("sched") 56 self.assertNotIn("sched", sys.modules) 57 58 def test_unlink(self): 59 with open(TESTFN, "w") as f: 60 pass 61 support.unlink(TESTFN) 62 self.assertFalse(os.path.exists(TESTFN)) 63 support.unlink(TESTFN) 64 65 def test_rmtree(self): 66 dirpath = support.TESTFN + 'd' 67 subdirpath = os.path.join(dirpath, 'subdir') 68 os.mkdir(dirpath) 69 os.mkdir(subdirpath) 70 support.rmtree(dirpath) 71 self.assertFalse(os.path.exists(dirpath)) 72 with support.swap_attr(support, 'verbose', 0): 73 support.rmtree(dirpath) 74 75 os.mkdir(dirpath) 76 os.mkdir(subdirpath) 77 os.chmod(dirpath, stat.S_IRUSR|stat.S_IXUSR) 78 with support.swap_attr(support, 'verbose', 0): 79 support.rmtree(dirpath) 80 self.assertFalse(os.path.exists(dirpath)) 81 82 os.mkdir(dirpath) 83 os.mkdir(subdirpath) 84 os.chmod(dirpath, 0) 85 with support.swap_attr(support, 'verbose', 0): 86 support.rmtree(dirpath) 87 self.assertFalse(os.path.exists(dirpath)) 88 89 def test_forget(self): 90 mod_filename = TESTFN + '.py' 91 with open(mod_filename, 'wt') as f: 92 f.write('foo = 1\n') 93 sys.path.insert(0, os.curdir) 94 try: 95 mod = __import__(TESTFN) 96 self.assertIn(TESTFN, sys.modules) 97 98 support.forget(TESTFN) 99 self.assertNotIn(TESTFN, sys.modules) 100 finally: 101 del sys.path[0] 102 support.unlink(mod_filename) 103 support.rmtree('__pycache__') 104 105 def test_HOST(self): 106 s = socket.socket() 107 s.bind((support.HOST, 0)) 108 s.close() 109 110 def test_find_unused_port(self): 111 port = support.find_unused_port() 112 s = socket.socket() 113 s.bind((support.HOST, port)) 114 s.close() 115 116 def test_bind_port(self): 117 s = socket.socket() 118 support.bind_port(s) 119 s.listen(5) 120 s.close() 121 122 # Tests for temp_dir() 123 124 def test_temp_dir(self): 125 """Test that temp_dir() creates and destroys its directory.""" 126 parent_dir = tempfile.mkdtemp() 127 parent_dir = os.path.realpath(parent_dir) 128 129 try: 130 path = os.path.join(parent_dir, 'temp') 131 self.assertFalse(os.path.isdir(path)) 132 with support.temp_dir(path) as temp_path: 133 self.assertEqual(temp_path, path) 134 self.assertTrue(os.path.isdir(path)) 135 self.assertFalse(os.path.isdir(path)) 136 finally: 137 support.rmtree(parent_dir) 138 139 def test_temp_dir__path_none(self): 140 """Test passing no path.""" 141 with support.temp_dir() as temp_path: 142 self.assertTrue(os.path.isdir(temp_path)) 143 self.assertFalse(os.path.isdir(temp_path)) 144 145 def test_temp_dir__existing_dir__quiet_default(self): 146 """Test passing a directory that already exists.""" 147 def call_temp_dir(path): 148 with support.temp_dir(path) as temp_path: 149 raise Exception("should not get here") 150 151 path = tempfile.mkdtemp() 152 path = os.path.realpath(path) 153 try: 154 self.assertTrue(os.path.isdir(path)) 155 with self.assertRaises(OSError) as cm: 156 call_temp_dir(path) 157 self.assertEqual(cm.exception.errno, errno.EEXIST) 158 # Make sure temp_dir did not delete the original directory. 159 self.assertTrue(os.path.isdir(path)) 160 finally: 161 shutil.rmtree(path) 162 163 def test_temp_dir__existing_dir__quiet_true(self): 164 """Test passing a directory that already exists with quiet=True.""" 165 path = tempfile.mkdtemp() 166 path = os.path.realpath(path) 167 168 try: 169 with support.check_warnings() as recorder: 170 with support.temp_dir(path, quiet=True) as temp_path: 171 self.assertEqual(path, temp_path) 172 warnings = [str(w.message) for w in recorder.warnings] 173 # Make sure temp_dir did not delete the original directory. 174 self.assertTrue(os.path.isdir(path)) 175 finally: 176 shutil.rmtree(path) 177 178 expected = ['tests may fail, unable to create temp dir: ' + path] 179 self.assertEqual(warnings, expected) 180 181 @unittest.skipUnless(hasattr(os, "fork"), "test requires os.fork") 182 def test_temp_dir__forked_child(self): 183 """Test that a forked child process does not remove the directory.""" 184 # See bpo-30028 for details. 185 # Run the test as an external script, because it uses fork. 186 script_helper.assert_python_ok("-c", textwrap.dedent(""" 187 import os 188 from test import support 189 with support.temp_cwd() as temp_path: 190 pid = os.fork() 191 if pid != 0: 192 # parent process (child has pid == 0) 193 194 # wait for the child to terminate 195 (pid, status) = os.waitpid(pid, 0) 196 if status != 0: 197 raise AssertionError("Child process failed with exit " 198 "status indication " 199 "0x{:x}.".format(status)) 200 201 # Make sure that temp_path is still present. When the child 202 # process leaves the 'temp_cwd'-context, the __exit__()- 203 # method of the context must not remove the temporary 204 # directory. 205 if not os.path.isdir(temp_path): 206 raise AssertionError("Child removed temp_path.") 207 """)) 208 209 # Tests for change_cwd() 210 211 def test_change_cwd(self): 212 original_cwd = os.getcwd() 213 214 with support.temp_dir() as temp_path: 215 with support.change_cwd(temp_path) as new_cwd: 216 self.assertEqual(new_cwd, temp_path) 217 self.assertEqual(os.getcwd(), new_cwd) 218 219 self.assertEqual(os.getcwd(), original_cwd) 220 221 def test_change_cwd__non_existent_dir(self): 222 """Test passing a non-existent directory.""" 223 original_cwd = os.getcwd() 224 225 def call_change_cwd(path): 226 with support.change_cwd(path) as new_cwd: 227 raise Exception("should not get here") 228 229 with support.temp_dir() as parent_dir: 230 non_existent_dir = os.path.join(parent_dir, 'does_not_exist') 231 with self.assertRaises(OSError) as cm: 232 call_change_cwd(non_existent_dir) 233 self.assertEqual(cm.exception.errno, errno.ENOENT) 234 235 self.assertEqual(os.getcwd(), original_cwd) 236 237 def test_change_cwd__non_existent_dir__quiet_true(self): 238 """Test passing a non-existent directory with quiet=True.""" 239 original_cwd = os.getcwd() 240 241 with support.temp_dir() as parent_dir: 242 bad_dir = os.path.join(parent_dir, 'does_not_exist') 243 with support.check_warnings() as recorder: 244 with support.change_cwd(bad_dir, quiet=True) as new_cwd: 245 self.assertEqual(new_cwd, original_cwd) 246 self.assertEqual(os.getcwd(), new_cwd) 247 warnings = [str(w.message) for w in recorder.warnings] 248 249 expected = ['tests may fail, unable to change CWD to: ' + bad_dir] 250 self.assertEqual(warnings, expected) 251 252 # Tests for change_cwd() 253 254 def test_change_cwd__chdir_warning(self): 255 """Check the warning message when os.chdir() fails.""" 256 path = TESTFN + '_does_not_exist' 257 with support.check_warnings() as recorder: 258 with support.change_cwd(path=path, quiet=True): 259 pass 260 messages = [str(w.message) for w in recorder.warnings] 261 self.assertEqual(messages, ['tests may fail, unable to change CWD to: ' + path]) 262 263 # Tests for temp_cwd() 264 265 def test_temp_cwd(self): 266 here = os.getcwd() 267 with support.temp_cwd(name=TESTFN): 268 self.assertEqual(os.path.basename(os.getcwd()), TESTFN) 269 self.assertFalse(os.path.exists(TESTFN)) 270 self.assertEqual(os.getcwd(), here) 271 272 273 def test_temp_cwd__name_none(self): 274 """Test passing None to temp_cwd().""" 275 original_cwd = os.getcwd() 276 with support.temp_cwd(name=None) as new_cwd: 277 self.assertNotEqual(new_cwd, original_cwd) 278 self.assertTrue(os.path.isdir(new_cwd)) 279 self.assertEqual(os.getcwd(), new_cwd) 280 self.assertEqual(os.getcwd(), original_cwd) 281 282 def test_sortdict(self): 283 self.assertEqual(support.sortdict({3:3, 2:2, 1:1}), "{1: 1, 2: 2, 3: 3}") 284 285 def test_make_bad_fd(self): 286 fd = support.make_bad_fd() 287 with self.assertRaises(OSError) as cm: 288 os.write(fd, b"foo") 289 self.assertEqual(cm.exception.errno, errno.EBADF) 290 291 def test_check_syntax_error(self): 292 support.check_syntax_error(self, "def class", lineno=1, offset=9) 293 with self.assertRaises(AssertionError): 294 support.check_syntax_error(self, "x=1") 295 296 def test_CleanImport(self): 297 import importlib 298 with support.CleanImport("asyncore"): 299 importlib.import_module("asyncore") 300 301 def test_DirsOnSysPath(self): 302 with support.DirsOnSysPath('foo', 'bar'): 303 self.assertIn("foo", sys.path) 304 self.assertIn("bar", sys.path) 305 self.assertNotIn("foo", sys.path) 306 self.assertNotIn("bar", sys.path) 307 308 def test_captured_stdout(self): 309 with support.captured_stdout() as stdout: 310 print "hello" 311 self.assertEqual(stdout.getvalue(), "hello\n") 312 313 def test_captured_stderr(self): 314 with support.captured_stderr() as stderr: 315 print >>sys.stderr, "hello" 316 self.assertEqual(stderr.getvalue(), "hello\n") 317 318 def test_captured_stdin(self): 319 with support.captured_stdin() as stdin: 320 stdin.write('hello\n') 321 stdin.seek(0) 322 # call test code that consumes from sys.stdin 323 captured = raw_input() 324 self.assertEqual(captured, "hello") 325 326 def test_gc_collect(self): 327 support.gc_collect() 328 329 def test_python_is_optimized(self): 330 self.assertIsInstance(support.python_is_optimized(), bool) 331 332 def test_swap_attr(self): 333 class Obj: 334 pass 335 obj = Obj() 336 obj.x = 1 337 with support.swap_attr(obj, "x", 5) as x: 338 self.assertEqual(obj.x, 5) 339 self.assertEqual(x, 1) 340 self.assertEqual(obj.x, 1) 341 with support.swap_attr(obj, "y", 5) as y: 342 self.assertEqual(obj.y, 5) 343 self.assertIsNone(y) 344 self.assertFalse(hasattr(obj, 'y')) 345 with support.swap_attr(obj, "y", 5): 346 del obj.y 347 self.assertFalse(hasattr(obj, 'y')) 348 349 def test_swap_item(self): 350 D = {"x":1} 351 with support.swap_item(D, "x", 5) as x: 352 self.assertEqual(D["x"], 5) 353 self.assertEqual(x, 1) 354 self.assertEqual(D["x"], 1) 355 with support.swap_item(D, "y", 5) as y: 356 self.assertEqual(D["y"], 5) 357 self.assertIsNone(y) 358 self.assertNotIn("y", D) 359 with support.swap_item(D, "y", 5): 360 del D["y"] 361 self.assertNotIn("y", D) 362 363 def test_match_test(self): 364 class Test: 365 def __init__(self, test_id): 366 self.test_id = test_id 367 368 def id(self): 369 return self.test_id 370 371 test_access = Test('test.test_os.FileTests.test_access') 372 test_chdir = Test('test.test_os.Win32ErrorTests.test_chdir') 373 374 with support.swap_attr(support, '_match_test_func', None): 375 # match all 376 support.set_match_tests([]) 377 self.assertTrue(support.match_test(test_access)) 378 self.assertTrue(support.match_test(test_chdir)) 379 380 # match all using None 381 support.set_match_tests(None) 382 self.assertTrue(support.match_test(test_access)) 383 self.assertTrue(support.match_test(test_chdir)) 384 385 # match the full test identifier 386 support.set_match_tests([test_access.id()]) 387 self.assertTrue(support.match_test(test_access)) 388 self.assertFalse(support.match_test(test_chdir)) 389 390 # match the module name 391 support.set_match_tests(['test_os']) 392 self.assertTrue(support.match_test(test_access)) 393 self.assertTrue(support.match_test(test_chdir)) 394 395 # Test '*' pattern 396 support.set_match_tests(['test_*']) 397 self.assertTrue(support.match_test(test_access)) 398 self.assertTrue(support.match_test(test_chdir)) 399 400 # Test case sensitivity 401 support.set_match_tests(['filetests']) 402 self.assertFalse(support.match_test(test_access)) 403 support.set_match_tests(['FileTests']) 404 self.assertTrue(support.match_test(test_access)) 405 406 # Test pattern containing '.' and a '*' metacharacter 407 support.set_match_tests(['*test_os.*.test_*']) 408 self.assertTrue(support.match_test(test_access)) 409 self.assertTrue(support.match_test(test_chdir)) 410 411 # Multiple patterns 412 support.set_match_tests([test_access.id(), test_chdir.id()]) 413 self.assertTrue(support.match_test(test_access)) 414 self.assertTrue(support.match_test(test_chdir)) 415 416 support.set_match_tests(['test_access', 'DONTMATCH']) 417 self.assertTrue(support.match_test(test_access)) 418 self.assertFalse(support.match_test(test_chdir)) 419 420 def test_fd_count(self): 421 # We cannot test the absolute value of fd_count(): on old Linux 422 # kernel or glibc versions, os.urandom() keeps a FD open on 423 # /dev/urandom device and Python has 4 FD opens instead of 3. 424 start = support.fd_count() 425 fd = os.open(__file__, os.O_RDONLY) 426 try: 427 more = support.fd_count() 428 finally: 429 os.close(fd) 430 self.assertEqual(more - start, 1) 431 432 # XXX -follows a list of untested API 433 # make_legacy_pyc 434 # is_resource_enabled 435 # requires 436 # fcmp 437 # umaks 438 # findfile 439 # check_warnings 440 # EnvironmentVarGuard 441 # TransientResource 442 # transient_internet 443 # run_with_locale 444 # set_memlimit 445 # bigmemtest 446 # precisionbigmemtest 447 # bigaddrspacetest 448 # requires_resource 449 # run_doctest 450 # threading_cleanup 451 # reap_threads 452 # reap_children 453 # strip_python_stderr 454 # args_from_interpreter_flags 455 # can_symlink 456 # skip_unless_symlink 457 # SuppressCrashReport 458 459 460def test_main(): 461 tests = [TestSupport] 462 support.run_unittest(*tests) 463 464if __name__ == '__main__': 465 test_main() 466