1''' 2Tests for fileinput module. 3Nick Mathewson 4''' 5import io 6import os 7import sys 8import re 9import fileinput 10import collections 11import builtins 12import tempfile 13import unittest 14 15try: 16 import bz2 17except ImportError: 18 bz2 = None 19try: 20 import gzip 21except ImportError: 22 gzip = None 23 24from io import BytesIO, StringIO 25from fileinput import FileInput, hook_encoded 26from pathlib import Path 27 28from test.support import verbose 29from test.support.os_helper import TESTFN 30from test.support.os_helper import unlink as safe_unlink 31from test.support import os_helper 32from test import support 33from unittest import mock 34 35 36# The fileinput module has 2 interfaces: the FileInput class which does 37# all the work, and a few functions (input, etc.) that use a global _state 38# variable. 39 40class BaseTests: 41 # Write a content (str or bytes) to temp file, and return the 42 # temp file's name. 43 def writeTmp(self, content, *, mode='w'): # opening in text mode is the default 44 fd, name = tempfile.mkstemp() 45 self.addCleanup(os_helper.unlink, name) 46 encoding = None if "b" in mode else "utf-8" 47 with open(fd, mode, encoding=encoding) as f: 48 f.write(content) 49 return name 50 51class LineReader: 52 53 def __init__(self): 54 self._linesread = [] 55 56 @property 57 def linesread(self): 58 try: 59 return self._linesread[:] 60 finally: 61 self._linesread = [] 62 63 def openhook(self, filename, mode): 64 self.it = iter(filename.splitlines(True)) 65 return self 66 67 def readline(self, size=None): 68 line = next(self.it, '') 69 self._linesread.append(line) 70 return line 71 72 def readlines(self, hint=-1): 73 lines = [] 74 size = 0 75 while True: 76 line = self.readline() 77 if not line: 78 return lines 79 lines.append(line) 80 size += len(line) 81 if size >= hint: 82 return lines 83 84 def close(self): 85 pass 86 87class BufferSizesTests(BaseTests, unittest.TestCase): 88 def test_buffer_sizes(self): 89 90 t1 = self.writeTmp(''.join("Line %s of file 1\n" % (i+1) for i in range(15))) 91 t2 = self.writeTmp(''.join("Line %s of file 2\n" % (i+1) for i in range(10))) 92 t3 = self.writeTmp(''.join("Line %s of file 3\n" % (i+1) for i in range(5))) 93 t4 = self.writeTmp(''.join("Line %s of file 4\n" % (i+1) for i in range(1))) 94 95 pat = re.compile(r'LINE (\d+) OF FILE (\d+)') 96 97 if verbose: 98 print('1. Simple iteration') 99 fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8") 100 lines = list(fi) 101 fi.close() 102 self.assertEqual(len(lines), 31) 103 self.assertEqual(lines[4], 'Line 5 of file 1\n') 104 self.assertEqual(lines[30], 'Line 1 of file 4\n') 105 self.assertEqual(fi.lineno(), 31) 106 self.assertEqual(fi.filename(), t4) 107 108 if verbose: 109 print('2. Status variables') 110 fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8") 111 s = "x" 112 while s and s != 'Line 6 of file 2\n': 113 s = fi.readline() 114 self.assertEqual(fi.filename(), t2) 115 self.assertEqual(fi.lineno(), 21) 116 self.assertEqual(fi.filelineno(), 6) 117 self.assertFalse(fi.isfirstline()) 118 self.assertFalse(fi.isstdin()) 119 120 if verbose: 121 print('3. Nextfile') 122 fi.nextfile() 123 self.assertEqual(fi.readline(), 'Line 1 of file 3\n') 124 self.assertEqual(fi.lineno(), 22) 125 fi.close() 126 127 if verbose: 128 print('4. Stdin') 129 fi = FileInput(files=(t1, t2, t3, t4, '-'), encoding="utf-8") 130 savestdin = sys.stdin 131 try: 132 sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n") 133 lines = list(fi) 134 self.assertEqual(len(lines), 33) 135 self.assertEqual(lines[32], 'Line 2 of stdin\n') 136 self.assertEqual(fi.filename(), '<stdin>') 137 fi.nextfile() 138 finally: 139 sys.stdin = savestdin 140 141 if verbose: 142 print('5. Boundary conditions') 143 fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8") 144 self.assertEqual(fi.lineno(), 0) 145 self.assertEqual(fi.filename(), None) 146 fi.nextfile() 147 self.assertEqual(fi.lineno(), 0) 148 self.assertEqual(fi.filename(), None) 149 150 if verbose: 151 print('6. Inplace') 152 savestdout = sys.stdout 153 try: 154 fi = FileInput(files=(t1, t2, t3, t4), inplace=1, encoding="utf-8") 155 for line in fi: 156 line = line[:-1].upper() 157 print(line) 158 fi.close() 159 finally: 160 sys.stdout = savestdout 161 162 fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8") 163 for line in fi: 164 self.assertEqual(line[-1], '\n') 165 m = pat.match(line[:-1]) 166 self.assertNotEqual(m, None) 167 self.assertEqual(int(m.group(1)), fi.filelineno()) 168 fi.close() 169 170class UnconditionallyRaise: 171 def __init__(self, exception_type): 172 self.exception_type = exception_type 173 self.invoked = False 174 def __call__(self, *args, **kwargs): 175 self.invoked = True 176 raise self.exception_type() 177 178class FileInputTests(BaseTests, unittest.TestCase): 179 180 def test_zero_byte_files(self): 181 t1 = self.writeTmp("") 182 t2 = self.writeTmp("") 183 t3 = self.writeTmp("The only line there is.\n") 184 t4 = self.writeTmp("") 185 fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8") 186 187 line = fi.readline() 188 self.assertEqual(line, 'The only line there is.\n') 189 self.assertEqual(fi.lineno(), 1) 190 self.assertEqual(fi.filelineno(), 1) 191 self.assertEqual(fi.filename(), t3) 192 193 line = fi.readline() 194 self.assertFalse(line) 195 self.assertEqual(fi.lineno(), 1) 196 self.assertEqual(fi.filelineno(), 0) 197 self.assertEqual(fi.filename(), t4) 198 fi.close() 199 200 def test_files_that_dont_end_with_newline(self): 201 t1 = self.writeTmp("A\nB\nC") 202 t2 = self.writeTmp("D\nE\nF") 203 fi = FileInput(files=(t1, t2), encoding="utf-8") 204 lines = list(fi) 205 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) 206 self.assertEqual(fi.filelineno(), 3) 207 self.assertEqual(fi.lineno(), 6) 208 209## def test_unicode_filenames(self): 210## # XXX A unicode string is always returned by writeTmp. 211## # So is this needed? 212## t1 = self.writeTmp("A\nB") 213## encoding = sys.getfilesystemencoding() 214## if encoding is None: 215## encoding = 'ascii' 216## fi = FileInput(files=str(t1, encoding), encoding="utf-8") 217## lines = list(fi) 218## self.assertEqual(lines, ["A\n", "B"]) 219 220 def test_fileno(self): 221 t1 = self.writeTmp("A\nB") 222 t2 = self.writeTmp("C\nD") 223 fi = FileInput(files=(t1, t2), encoding="utf-8") 224 self.assertEqual(fi.fileno(), -1) 225 line = next(fi) 226 self.assertNotEqual(fi.fileno(), -1) 227 fi.nextfile() 228 self.assertEqual(fi.fileno(), -1) 229 line = list(fi) 230 self.assertEqual(fi.fileno(), -1) 231 232 def test_invalid_opening_mode(self): 233 for mode in ('w', 'rU', 'U'): 234 with self.subTest(mode=mode): 235 with self.assertRaises(ValueError): 236 FileInput(mode=mode) 237 238 def test_stdin_binary_mode(self): 239 with mock.patch('sys.stdin') as m_stdin: 240 m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam') 241 fi = FileInput(files=['-'], mode='rb') 242 lines = list(fi) 243 self.assertEqual(lines, [b'spam, bacon, sausage, and spam']) 244 245 def test_detached_stdin_binary_mode(self): 246 orig_stdin = sys.stdin 247 try: 248 sys.stdin = BytesIO(b'spam, bacon, sausage, and spam') 249 self.assertFalse(hasattr(sys.stdin, 'buffer')) 250 fi = FileInput(files=['-'], mode='rb') 251 lines = list(fi) 252 self.assertEqual(lines, [b'spam, bacon, sausage, and spam']) 253 finally: 254 sys.stdin = orig_stdin 255 256 def test_file_opening_hook(self): 257 try: 258 # cannot use openhook and inplace mode 259 fi = FileInput(inplace=1, openhook=lambda f, m: None) 260 self.fail("FileInput should raise if both inplace " 261 "and openhook arguments are given") 262 except ValueError: 263 pass 264 try: 265 fi = FileInput(openhook=1) 266 self.fail("FileInput should check openhook for being callable") 267 except ValueError: 268 pass 269 270 class CustomOpenHook: 271 def __init__(self): 272 self.invoked = False 273 def __call__(self, *args, **kargs): 274 self.invoked = True 275 return open(*args, encoding="utf-8") 276 277 t = self.writeTmp("\n") 278 custom_open_hook = CustomOpenHook() 279 with FileInput([t], openhook=custom_open_hook) as fi: 280 fi.readline() 281 self.assertTrue(custom_open_hook.invoked, "openhook not invoked") 282 283 def test_readline(self): 284 with open(TESTFN, 'wb') as f: 285 f.write(b'A\nB\r\nC\r') 286 # Fill TextIOWrapper buffer. 287 f.write(b'123456789\n' * 1000) 288 # Issue #20501: readline() shouldn't read whole file. 289 f.write(b'\x80') 290 self.addCleanup(safe_unlink, TESTFN) 291 292 with FileInput(files=TESTFN, 293 openhook=hook_encoded('ascii')) as fi: 294 try: 295 self.assertEqual(fi.readline(), 'A\n') 296 self.assertEqual(fi.readline(), 'B\n') 297 self.assertEqual(fi.readline(), 'C\n') 298 except UnicodeDecodeError: 299 self.fail('Read to end of file') 300 with self.assertRaises(UnicodeDecodeError): 301 # Read to the end of file. 302 list(fi) 303 self.assertEqual(fi.readline(), '') 304 self.assertEqual(fi.readline(), '') 305 306 def test_readline_binary_mode(self): 307 with open(TESTFN, 'wb') as f: 308 f.write(b'A\nB\r\nC\rD') 309 self.addCleanup(safe_unlink, TESTFN) 310 311 with FileInput(files=TESTFN, mode='rb') as fi: 312 self.assertEqual(fi.readline(), b'A\n') 313 self.assertEqual(fi.readline(), b'B\r\n') 314 self.assertEqual(fi.readline(), b'C\rD') 315 # Read to the end of file. 316 self.assertEqual(fi.readline(), b'') 317 self.assertEqual(fi.readline(), b'') 318 319 def test_inplace_binary_write_mode(self): 320 temp_file = self.writeTmp(b'Initial text.', mode='wb') 321 with FileInput(temp_file, mode='rb', inplace=True) as fobj: 322 line = fobj.readline() 323 self.assertEqual(line, b'Initial text.') 324 # print() cannot be used with files opened in binary mode. 325 sys.stdout.write(b'New line.') 326 with open(temp_file, 'rb') as f: 327 self.assertEqual(f.read(), b'New line.') 328 329 def test_inplace_encoding_errors(self): 330 temp_file = self.writeTmp(b'Initial text \x88', mode='wb') 331 with FileInput(temp_file, inplace=True, 332 encoding="ascii", errors="replace") as fobj: 333 line = fobj.readline() 334 self.assertEqual(line, 'Initial text \ufffd') 335 print("New line \x88") 336 with open(temp_file, 'rb') as f: 337 self.assertEqual(f.read().rstrip(b'\r\n'), b'New line ?') 338 339 def test_file_hook_backward_compatibility(self): 340 def old_hook(filename, mode): 341 return io.StringIO("I used to receive only filename and mode") 342 t = self.writeTmp("\n") 343 with FileInput([t], openhook=old_hook) as fi: 344 result = fi.readline() 345 self.assertEqual(result, "I used to receive only filename and mode") 346 347 def test_context_manager(self): 348 t1 = self.writeTmp("A\nB\nC") 349 t2 = self.writeTmp("D\nE\nF") 350 with FileInput(files=(t1, t2), encoding="utf-8") as fi: 351 lines = list(fi) 352 self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) 353 self.assertEqual(fi.filelineno(), 3) 354 self.assertEqual(fi.lineno(), 6) 355 self.assertEqual(fi._files, ()) 356 357 def test_close_on_exception(self): 358 t1 = self.writeTmp("") 359 try: 360 with FileInput(files=t1, encoding="utf-8") as fi: 361 raise OSError 362 except OSError: 363 self.assertEqual(fi._files, ()) 364 365 def test_empty_files_list_specified_to_constructor(self): 366 with FileInput(files=[], encoding="utf-8") as fi: 367 self.assertEqual(fi._files, ('-',)) 368 369 def test_nextfile_oserror_deleting_backup(self): 370 """Tests invoking FileInput.nextfile() when the attempt to delete 371 the backup file would raise OSError. This error is expected to be 372 silently ignored""" 373 374 os_unlink_orig = os.unlink 375 os_unlink_replacement = UnconditionallyRaise(OSError) 376 try: 377 t = self.writeTmp("\n") 378 self.addCleanup(safe_unlink, t + '.bak') 379 with FileInput(files=[t], inplace=True, encoding="utf-8") as fi: 380 next(fi) # make sure the file is opened 381 os.unlink = os_unlink_replacement 382 fi.nextfile() 383 finally: 384 os.unlink = os_unlink_orig 385 386 # sanity check to make sure that our test scenario was actually hit 387 self.assertTrue(os_unlink_replacement.invoked, 388 "os.unlink() was not invoked") 389 390 def test_readline_os_fstat_raises_OSError(self): 391 """Tests invoking FileInput.readline() when os.fstat() raises OSError. 392 This exception should be silently discarded.""" 393 394 os_fstat_orig = os.fstat 395 os_fstat_replacement = UnconditionallyRaise(OSError) 396 try: 397 t = self.writeTmp("\n") 398 with FileInput(files=[t], inplace=True, encoding="utf-8") as fi: 399 os.fstat = os_fstat_replacement 400 fi.readline() 401 finally: 402 os.fstat = os_fstat_orig 403 404 # sanity check to make sure that our test scenario was actually hit 405 self.assertTrue(os_fstat_replacement.invoked, 406 "os.fstat() was not invoked") 407 408 def test_readline_os_chmod_raises_OSError(self): 409 """Tests invoking FileInput.readline() when os.chmod() raises OSError. 410 This exception should be silently discarded.""" 411 412 os_chmod_orig = os.chmod 413 os_chmod_replacement = UnconditionallyRaise(OSError) 414 try: 415 t = self.writeTmp("\n") 416 with FileInput(files=[t], inplace=True, encoding="utf-8") as fi: 417 os.chmod = os_chmod_replacement 418 fi.readline() 419 finally: 420 os.chmod = os_chmod_orig 421 422 # sanity check to make sure that our test scenario was actually hit 423 self.assertTrue(os_chmod_replacement.invoked, 424 "os.fstat() was not invoked") 425 426 def test_fileno_when_ValueError_raised(self): 427 class FilenoRaisesValueError(UnconditionallyRaise): 428 def __init__(self): 429 UnconditionallyRaise.__init__(self, ValueError) 430 def fileno(self): 431 self.__call__() 432 433 unconditionally_raise_ValueError = FilenoRaisesValueError() 434 t = self.writeTmp("\n") 435 with FileInput(files=[t], encoding="utf-8") as fi: 436 file_backup = fi._file 437 try: 438 fi._file = unconditionally_raise_ValueError 439 result = fi.fileno() 440 finally: 441 fi._file = file_backup # make sure the file gets cleaned up 442 443 # sanity check to make sure that our test scenario was actually hit 444 self.assertTrue(unconditionally_raise_ValueError.invoked, 445 "_file.fileno() was not invoked") 446 447 self.assertEqual(result, -1, "fileno() should return -1") 448 449 def test_readline_buffering(self): 450 src = LineReader() 451 with FileInput(files=['line1\nline2', 'line3\n'], 452 openhook=src.openhook) as fi: 453 self.assertEqual(src.linesread, []) 454 self.assertEqual(fi.readline(), 'line1\n') 455 self.assertEqual(src.linesread, ['line1\n']) 456 self.assertEqual(fi.readline(), 'line2') 457 self.assertEqual(src.linesread, ['line2']) 458 self.assertEqual(fi.readline(), 'line3\n') 459 self.assertEqual(src.linesread, ['', 'line3\n']) 460 self.assertEqual(fi.readline(), '') 461 self.assertEqual(src.linesread, ['']) 462 self.assertEqual(fi.readline(), '') 463 self.assertEqual(src.linesread, []) 464 465 def test_iteration_buffering(self): 466 src = LineReader() 467 with FileInput(files=['line1\nline2', 'line3\n'], 468 openhook=src.openhook) as fi: 469 self.assertEqual(src.linesread, []) 470 self.assertEqual(next(fi), 'line1\n') 471 self.assertEqual(src.linesread, ['line1\n']) 472 self.assertEqual(next(fi), 'line2') 473 self.assertEqual(src.linesread, ['line2']) 474 self.assertEqual(next(fi), 'line3\n') 475 self.assertEqual(src.linesread, ['', 'line3\n']) 476 self.assertRaises(StopIteration, next, fi) 477 self.assertEqual(src.linesread, ['']) 478 self.assertRaises(StopIteration, next, fi) 479 self.assertEqual(src.linesread, []) 480 481 def test_pathlib_file(self): 482 t1 = Path(self.writeTmp("Pathlib file.")) 483 with FileInput(t1, encoding="utf-8") as fi: 484 line = fi.readline() 485 self.assertEqual(line, 'Pathlib file.') 486 self.assertEqual(fi.lineno(), 1) 487 self.assertEqual(fi.filelineno(), 1) 488 self.assertEqual(fi.filename(), os.fspath(t1)) 489 490 def test_pathlib_file_inplace(self): 491 t1 = Path(self.writeTmp('Pathlib file.')) 492 with FileInput(t1, inplace=True, encoding="utf-8") as fi: 493 line = fi.readline() 494 self.assertEqual(line, 'Pathlib file.') 495 print('Modified %s' % line) 496 with open(t1, encoding="utf-8") as f: 497 self.assertEqual(f.read(), 'Modified Pathlib file.\n') 498 499 500class MockFileInput: 501 """A class that mocks out fileinput.FileInput for use during unit tests""" 502 503 def __init__(self, files=None, inplace=False, backup="", *, 504 mode="r", openhook=None, encoding=None, errors=None): 505 self.files = files 506 self.inplace = inplace 507 self.backup = backup 508 self.mode = mode 509 self.openhook = openhook 510 self.encoding = encoding 511 self.errors = errors 512 self._file = None 513 self.invocation_counts = collections.defaultdict(lambda: 0) 514 self.return_values = {} 515 516 def close(self): 517 self.invocation_counts["close"] += 1 518 519 def nextfile(self): 520 self.invocation_counts["nextfile"] += 1 521 return self.return_values["nextfile"] 522 523 def filename(self): 524 self.invocation_counts["filename"] += 1 525 return self.return_values["filename"] 526 527 def lineno(self): 528 self.invocation_counts["lineno"] += 1 529 return self.return_values["lineno"] 530 531 def filelineno(self): 532 self.invocation_counts["filelineno"] += 1 533 return self.return_values["filelineno"] 534 535 def fileno(self): 536 self.invocation_counts["fileno"] += 1 537 return self.return_values["fileno"] 538 539 def isfirstline(self): 540 self.invocation_counts["isfirstline"] += 1 541 return self.return_values["isfirstline"] 542 543 def isstdin(self): 544 self.invocation_counts["isstdin"] += 1 545 return self.return_values["isstdin"] 546 547class BaseFileInputGlobalMethodsTest(unittest.TestCase): 548 """Base class for unit tests for the global function of 549 the fileinput module.""" 550 551 def setUp(self): 552 self._orig_state = fileinput._state 553 self._orig_FileInput = fileinput.FileInput 554 fileinput.FileInput = MockFileInput 555 556 def tearDown(self): 557 fileinput.FileInput = self._orig_FileInput 558 fileinput._state = self._orig_state 559 560 def assertExactlyOneInvocation(self, mock_file_input, method_name): 561 # assert that the method with the given name was invoked once 562 actual_count = mock_file_input.invocation_counts[method_name] 563 self.assertEqual(actual_count, 1, method_name) 564 # assert that no other unexpected methods were invoked 565 actual_total_count = len(mock_file_input.invocation_counts) 566 self.assertEqual(actual_total_count, 1) 567 568class Test_fileinput_input(BaseFileInputGlobalMethodsTest): 569 """Unit tests for fileinput.input()""" 570 571 def test_state_is_not_None_and_state_file_is_not_None(self): 572 """Tests invoking fileinput.input() when fileinput._state is not None 573 and its _file attribute is also not None. Expect RuntimeError to 574 be raised with a meaningful error message and for fileinput._state 575 to *not* be modified.""" 576 instance = MockFileInput() 577 instance._file = object() 578 fileinput._state = instance 579 with self.assertRaises(RuntimeError) as cm: 580 fileinput.input() 581 self.assertEqual(("input() already active",), cm.exception.args) 582 self.assertIs(instance, fileinput._state, "fileinput._state") 583 584 def test_state_is_not_None_and_state_file_is_None(self): 585 """Tests invoking fileinput.input() when fileinput._state is not None 586 but its _file attribute *is* None. Expect it to create and return 587 a new fileinput.FileInput object with all method parameters passed 588 explicitly to the __init__() method; also ensure that 589 fileinput._state is set to the returned instance.""" 590 instance = MockFileInput() 591 instance._file = None 592 fileinput._state = instance 593 self.do_test_call_input() 594 595 def test_state_is_None(self): 596 """Tests invoking fileinput.input() when fileinput._state is None 597 Expect it to create and return a new fileinput.FileInput object 598 with all method parameters passed explicitly to the __init__() 599 method; also ensure that fileinput._state is set to the returned 600 instance.""" 601 fileinput._state = None 602 self.do_test_call_input() 603 604 def do_test_call_input(self): 605 """Tests that fileinput.input() creates a new fileinput.FileInput 606 object, passing the given parameters unmodified to 607 fileinput.FileInput.__init__(). Note that this test depends on the 608 monkey patching of fileinput.FileInput done by setUp().""" 609 files = object() 610 inplace = object() 611 backup = object() 612 mode = object() 613 openhook = object() 614 encoding = object() 615 616 # call fileinput.input() with different values for each argument 617 result = fileinput.input(files=files, inplace=inplace, backup=backup, 618 mode=mode, openhook=openhook, encoding=encoding) 619 620 # ensure fileinput._state was set to the returned object 621 self.assertIs(result, fileinput._state, "fileinput._state") 622 623 # ensure the parameters to fileinput.input() were passed directly 624 # to FileInput.__init__() 625 self.assertIs(files, result.files, "files") 626 self.assertIs(inplace, result.inplace, "inplace") 627 self.assertIs(backup, result.backup, "backup") 628 self.assertIs(mode, result.mode, "mode") 629 self.assertIs(openhook, result.openhook, "openhook") 630 631class Test_fileinput_close(BaseFileInputGlobalMethodsTest): 632 """Unit tests for fileinput.close()""" 633 634 def test_state_is_None(self): 635 """Tests that fileinput.close() does nothing if fileinput._state 636 is None""" 637 fileinput._state = None 638 fileinput.close() 639 self.assertIsNone(fileinput._state) 640 641 def test_state_is_not_None(self): 642 """Tests that fileinput.close() invokes close() on fileinput._state 643 and sets _state=None""" 644 instance = MockFileInput() 645 fileinput._state = instance 646 fileinput.close() 647 self.assertExactlyOneInvocation(instance, "close") 648 self.assertIsNone(fileinput._state) 649 650class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest): 651 """Unit tests for fileinput.nextfile()""" 652 653 def test_state_is_None(self): 654 """Tests fileinput.nextfile() when fileinput._state is None. 655 Ensure that it raises RuntimeError with a meaningful error message 656 and does not modify fileinput._state""" 657 fileinput._state = None 658 with self.assertRaises(RuntimeError) as cm: 659 fileinput.nextfile() 660 self.assertEqual(("no active input()",), cm.exception.args) 661 self.assertIsNone(fileinput._state) 662 663 def test_state_is_not_None(self): 664 """Tests fileinput.nextfile() when fileinput._state is not None. 665 Ensure that it invokes fileinput._state.nextfile() exactly once, 666 returns whatever it returns, and does not modify fileinput._state 667 to point to a different object.""" 668 nextfile_retval = object() 669 instance = MockFileInput() 670 instance.return_values["nextfile"] = nextfile_retval 671 fileinput._state = instance 672 retval = fileinput.nextfile() 673 self.assertExactlyOneInvocation(instance, "nextfile") 674 self.assertIs(retval, nextfile_retval) 675 self.assertIs(fileinput._state, instance) 676 677class Test_fileinput_filename(BaseFileInputGlobalMethodsTest): 678 """Unit tests for fileinput.filename()""" 679 680 def test_state_is_None(self): 681 """Tests fileinput.filename() when fileinput._state is None. 682 Ensure that it raises RuntimeError with a meaningful error message 683 and does not modify fileinput._state""" 684 fileinput._state = None 685 with self.assertRaises(RuntimeError) as cm: 686 fileinput.filename() 687 self.assertEqual(("no active input()",), cm.exception.args) 688 self.assertIsNone(fileinput._state) 689 690 def test_state_is_not_None(self): 691 """Tests fileinput.filename() when fileinput._state is not None. 692 Ensure that it invokes fileinput._state.filename() exactly once, 693 returns whatever it returns, and does not modify fileinput._state 694 to point to a different object.""" 695 filename_retval = object() 696 instance = MockFileInput() 697 instance.return_values["filename"] = filename_retval 698 fileinput._state = instance 699 retval = fileinput.filename() 700 self.assertExactlyOneInvocation(instance, "filename") 701 self.assertIs(retval, filename_retval) 702 self.assertIs(fileinput._state, instance) 703 704class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest): 705 """Unit tests for fileinput.lineno()""" 706 707 def test_state_is_None(self): 708 """Tests fileinput.lineno() when fileinput._state is None. 709 Ensure that it raises RuntimeError with a meaningful error message 710 and does not modify fileinput._state""" 711 fileinput._state = None 712 with self.assertRaises(RuntimeError) as cm: 713 fileinput.lineno() 714 self.assertEqual(("no active input()",), cm.exception.args) 715 self.assertIsNone(fileinput._state) 716 717 def test_state_is_not_None(self): 718 """Tests fileinput.lineno() when fileinput._state is not None. 719 Ensure that it invokes fileinput._state.lineno() exactly once, 720 returns whatever it returns, and does not modify fileinput._state 721 to point to a different object.""" 722 lineno_retval = object() 723 instance = MockFileInput() 724 instance.return_values["lineno"] = lineno_retval 725 fileinput._state = instance 726 retval = fileinput.lineno() 727 self.assertExactlyOneInvocation(instance, "lineno") 728 self.assertIs(retval, lineno_retval) 729 self.assertIs(fileinput._state, instance) 730 731class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest): 732 """Unit tests for fileinput.filelineno()""" 733 734 def test_state_is_None(self): 735 """Tests fileinput.filelineno() when fileinput._state is None. 736 Ensure that it raises RuntimeError with a meaningful error message 737 and does not modify fileinput._state""" 738 fileinput._state = None 739 with self.assertRaises(RuntimeError) as cm: 740 fileinput.filelineno() 741 self.assertEqual(("no active input()",), cm.exception.args) 742 self.assertIsNone(fileinput._state) 743 744 def test_state_is_not_None(self): 745 """Tests fileinput.filelineno() when fileinput._state is not None. 746 Ensure that it invokes fileinput._state.filelineno() exactly once, 747 returns whatever it returns, and does not modify fileinput._state 748 to point to a different object.""" 749 filelineno_retval = object() 750 instance = MockFileInput() 751 instance.return_values["filelineno"] = filelineno_retval 752 fileinput._state = instance 753 retval = fileinput.filelineno() 754 self.assertExactlyOneInvocation(instance, "filelineno") 755 self.assertIs(retval, filelineno_retval) 756 self.assertIs(fileinput._state, instance) 757 758class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest): 759 """Unit tests for fileinput.fileno()""" 760 761 def test_state_is_None(self): 762 """Tests fileinput.fileno() when fileinput._state is None. 763 Ensure that it raises RuntimeError with a meaningful error message 764 and does not modify fileinput._state""" 765 fileinput._state = None 766 with self.assertRaises(RuntimeError) as cm: 767 fileinput.fileno() 768 self.assertEqual(("no active input()",), cm.exception.args) 769 self.assertIsNone(fileinput._state) 770 771 def test_state_is_not_None(self): 772 """Tests fileinput.fileno() when fileinput._state is not None. 773 Ensure that it invokes fileinput._state.fileno() exactly once, 774 returns whatever it returns, and does not modify fileinput._state 775 to point to a different object.""" 776 fileno_retval = object() 777 instance = MockFileInput() 778 instance.return_values["fileno"] = fileno_retval 779 instance.fileno_retval = fileno_retval 780 fileinput._state = instance 781 retval = fileinput.fileno() 782 self.assertExactlyOneInvocation(instance, "fileno") 783 self.assertIs(retval, fileno_retval) 784 self.assertIs(fileinput._state, instance) 785 786class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest): 787 """Unit tests for fileinput.isfirstline()""" 788 789 def test_state_is_None(self): 790 """Tests fileinput.isfirstline() when fileinput._state is None. 791 Ensure that it raises RuntimeError with a meaningful error message 792 and does not modify fileinput._state""" 793 fileinput._state = None 794 with self.assertRaises(RuntimeError) as cm: 795 fileinput.isfirstline() 796 self.assertEqual(("no active input()",), cm.exception.args) 797 self.assertIsNone(fileinput._state) 798 799 def test_state_is_not_None(self): 800 """Tests fileinput.isfirstline() when fileinput._state is not None. 801 Ensure that it invokes fileinput._state.isfirstline() exactly once, 802 returns whatever it returns, and does not modify fileinput._state 803 to point to a different object.""" 804 isfirstline_retval = object() 805 instance = MockFileInput() 806 instance.return_values["isfirstline"] = isfirstline_retval 807 fileinput._state = instance 808 retval = fileinput.isfirstline() 809 self.assertExactlyOneInvocation(instance, "isfirstline") 810 self.assertIs(retval, isfirstline_retval) 811 self.assertIs(fileinput._state, instance) 812 813class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest): 814 """Unit tests for fileinput.isstdin()""" 815 816 def test_state_is_None(self): 817 """Tests fileinput.isstdin() when fileinput._state is None. 818 Ensure that it raises RuntimeError with a meaningful error message 819 and does not modify fileinput._state""" 820 fileinput._state = None 821 with self.assertRaises(RuntimeError) as cm: 822 fileinput.isstdin() 823 self.assertEqual(("no active input()",), cm.exception.args) 824 self.assertIsNone(fileinput._state) 825 826 def test_state_is_not_None(self): 827 """Tests fileinput.isstdin() when fileinput._state is not None. 828 Ensure that it invokes fileinput._state.isstdin() exactly once, 829 returns whatever it returns, and does not modify fileinput._state 830 to point to a different object.""" 831 isstdin_retval = object() 832 instance = MockFileInput() 833 instance.return_values["isstdin"] = isstdin_retval 834 fileinput._state = instance 835 retval = fileinput.isstdin() 836 self.assertExactlyOneInvocation(instance, "isstdin") 837 self.assertIs(retval, isstdin_retval) 838 self.assertIs(fileinput._state, instance) 839 840class InvocationRecorder: 841 842 def __init__(self): 843 self.invocation_count = 0 844 845 def __call__(self, *args, **kwargs): 846 self.invocation_count += 1 847 self.last_invocation = (args, kwargs) 848 return io.BytesIO(b'some bytes') 849 850 851class Test_hook_compressed(unittest.TestCase): 852 """Unit tests for fileinput.hook_compressed()""" 853 854 def setUp(self): 855 self.fake_open = InvocationRecorder() 856 857 def test_empty_string(self): 858 self.do_test_use_builtin_open_text("", "r") 859 860 def test_no_ext(self): 861 self.do_test_use_builtin_open_text("abcd", "r") 862 863 @unittest.skipUnless(gzip, "Requires gzip and zlib") 864 def test_gz_ext_fake(self): 865 original_open = gzip.open 866 gzip.open = self.fake_open 867 try: 868 result = fileinput.hook_compressed("test.gz", "r") 869 finally: 870 gzip.open = original_open 871 872 self.assertEqual(self.fake_open.invocation_count, 1) 873 self.assertEqual(self.fake_open.last_invocation, (("test.gz", "r"), {})) 874 875 @unittest.skipUnless(gzip, "Requires gzip and zlib") 876 def test_gz_with_encoding_fake(self): 877 original_open = gzip.open 878 gzip.open = lambda filename, mode: io.BytesIO(b'Ex-binary string') 879 try: 880 result = fileinput.hook_compressed("test.gz", "r", encoding="utf-8") 881 finally: 882 gzip.open = original_open 883 self.assertEqual(list(result), ['Ex-binary string']) 884 885 @unittest.skipUnless(bz2, "Requires bz2") 886 def test_bz2_ext_fake(self): 887 original_open = bz2.BZ2File 888 bz2.BZ2File = self.fake_open 889 try: 890 result = fileinput.hook_compressed("test.bz2", "r") 891 finally: 892 bz2.BZ2File = original_open 893 894 self.assertEqual(self.fake_open.invocation_count, 1) 895 self.assertEqual(self.fake_open.last_invocation, (("test.bz2", "r"), {})) 896 897 def test_blah_ext(self): 898 self.do_test_use_builtin_open_binary("abcd.blah", "rb") 899 900 def test_gz_ext_builtin(self): 901 self.do_test_use_builtin_open_binary("abcd.Gz", "rb") 902 903 def test_bz2_ext_builtin(self): 904 self.do_test_use_builtin_open_binary("abcd.Bz2", "rb") 905 906 def test_binary_mode_encoding(self): 907 self.do_test_use_builtin_open_binary("abcd", "rb") 908 909 def test_text_mode_encoding(self): 910 self.do_test_use_builtin_open_text("abcd", "r") 911 912 def do_test_use_builtin_open_binary(self, filename, mode): 913 original_open = self.replace_builtin_open(self.fake_open) 914 try: 915 result = fileinput.hook_compressed(filename, mode) 916 finally: 917 self.replace_builtin_open(original_open) 918 919 self.assertEqual(self.fake_open.invocation_count, 1) 920 self.assertEqual(self.fake_open.last_invocation, 921 ((filename, mode), {'encoding': None, 'errors': None})) 922 923 def do_test_use_builtin_open_text(self, filename, mode): 924 original_open = self.replace_builtin_open(self.fake_open) 925 try: 926 result = fileinput.hook_compressed(filename, mode) 927 finally: 928 self.replace_builtin_open(original_open) 929 930 self.assertEqual(self.fake_open.invocation_count, 1) 931 self.assertEqual(self.fake_open.last_invocation, 932 ((filename, mode), {'encoding': 'locale', 'errors': None})) 933 934 @staticmethod 935 def replace_builtin_open(new_open_func): 936 original_open = builtins.open 937 builtins.open = new_open_func 938 return original_open 939 940class Test_hook_encoded(unittest.TestCase): 941 """Unit tests for fileinput.hook_encoded()""" 942 943 def test(self): 944 encoding = object() 945 errors = object() 946 result = fileinput.hook_encoded(encoding, errors=errors) 947 948 fake_open = InvocationRecorder() 949 original_open = builtins.open 950 builtins.open = fake_open 951 try: 952 filename = object() 953 mode = object() 954 open_result = result(filename, mode) 955 finally: 956 builtins.open = original_open 957 958 self.assertEqual(fake_open.invocation_count, 1) 959 960 args, kwargs = fake_open.last_invocation 961 self.assertIs(args[0], filename) 962 self.assertIs(args[1], mode) 963 self.assertIs(kwargs.pop('encoding'), encoding) 964 self.assertIs(kwargs.pop('errors'), errors) 965 self.assertFalse(kwargs) 966 967 def test_errors(self): 968 with open(TESTFN, 'wb') as f: 969 f.write(b'\x80abc') 970 self.addCleanup(safe_unlink, TESTFN) 971 972 def check(errors, expected_lines): 973 with FileInput(files=TESTFN, mode='r', 974 openhook=hook_encoded('utf-8', errors=errors)) as fi: 975 lines = list(fi) 976 self.assertEqual(lines, expected_lines) 977 978 check('ignore', ['abc']) 979 with self.assertRaises(UnicodeDecodeError): 980 check('strict', ['abc']) 981 check('replace', ['\ufffdabc']) 982 check('backslashreplace', ['\\x80abc']) 983 984 def test_modes(self): 985 with open(TESTFN, 'wb') as f: 986 # UTF-7 is a convenient, seldom used encoding 987 f.write(b'A\nB\r\nC\rD+IKw-') 988 self.addCleanup(safe_unlink, TESTFN) 989 990 def check(mode, expected_lines): 991 with FileInput(files=TESTFN, mode=mode, 992 openhook=hook_encoded('utf-7')) as fi: 993 lines = list(fi) 994 self.assertEqual(lines, expected_lines) 995 996 check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac']) 997 with self.assertRaises(ValueError): 998 check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac']) 999 1000 1001class MiscTest(unittest.TestCase): 1002 1003 def test_all(self): 1004 support.check__all__(self, fileinput) 1005 1006 1007if __name__ == "__main__": 1008 unittest.main() 1009