1'''
2Tests for fileinput module.
3Nick Mathewson
4'''
5import io
6import os
7import sys
8import re
9import fileinput
10import collections
11import builtins
12import tempfile
13import unittest
14
15try:
16    import bz2
17except ImportError:
18    bz2 = None
19try:
20    import gzip
21except ImportError:
22    gzip = None
23
24from io import BytesIO, StringIO
25from fileinput import FileInput, hook_encoded
26from pathlib import Path
27
28from test.support import verbose
29from test.support.os_helper import TESTFN
30from test.support.os_helper import unlink as safe_unlink
31from test.support import os_helper
32from test import support
33from unittest import mock
34
35
36# The fileinput module has 2 interfaces: the FileInput class which does
37# all the work, and a few functions (input, etc.) that use a global _state
38# variable.
39
40class BaseTests:
41    # Write a content (str or bytes) to temp file, and return the
42    # temp file's name.
43    def writeTmp(self, content, *, mode='w'):  # opening in text mode is the default
44        fd, name = tempfile.mkstemp()
45        self.addCleanup(os_helper.unlink, name)
46        encoding = None if "b" in mode else "utf-8"
47        with open(fd, mode, encoding=encoding) as f:
48            f.write(content)
49        return name
50
51class LineReader:
52
53    def __init__(self):
54        self._linesread = []
55
56    @property
57    def linesread(self):
58        try:
59            return self._linesread[:]
60        finally:
61            self._linesread = []
62
63    def openhook(self, filename, mode):
64        self.it = iter(filename.splitlines(True))
65        return self
66
67    def readline(self, size=None):
68        line = next(self.it, '')
69        self._linesread.append(line)
70        return line
71
72    def readlines(self, hint=-1):
73        lines = []
74        size = 0
75        while True:
76            line = self.readline()
77            if not line:
78                return lines
79            lines.append(line)
80            size += len(line)
81            if size >= hint:
82                return lines
83
84    def close(self):
85        pass
86
87class BufferSizesTests(BaseTests, unittest.TestCase):
88    def test_buffer_sizes(self):
89
90        t1 = self.writeTmp(''.join("Line %s of file 1\n" % (i+1) for i in range(15)))
91        t2 = self.writeTmp(''.join("Line %s of file 2\n" % (i+1) for i in range(10)))
92        t3 = self.writeTmp(''.join("Line %s of file 3\n" % (i+1) for i in range(5)))
93        t4 = self.writeTmp(''.join("Line %s of file 4\n" % (i+1) for i in range(1)))
94
95        pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
96
97        if verbose:
98            print('1. Simple iteration')
99        fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8")
100        lines = list(fi)
101        fi.close()
102        self.assertEqual(len(lines), 31)
103        self.assertEqual(lines[4], 'Line 5 of file 1\n')
104        self.assertEqual(lines[30], 'Line 1 of file 4\n')
105        self.assertEqual(fi.lineno(), 31)
106        self.assertEqual(fi.filename(), t4)
107
108        if verbose:
109            print('2. Status variables')
110        fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8")
111        s = "x"
112        while s and s != 'Line 6 of file 2\n':
113            s = fi.readline()
114        self.assertEqual(fi.filename(), t2)
115        self.assertEqual(fi.lineno(), 21)
116        self.assertEqual(fi.filelineno(), 6)
117        self.assertFalse(fi.isfirstline())
118        self.assertFalse(fi.isstdin())
119
120        if verbose:
121            print('3. Nextfile')
122        fi.nextfile()
123        self.assertEqual(fi.readline(), 'Line 1 of file 3\n')
124        self.assertEqual(fi.lineno(), 22)
125        fi.close()
126
127        if verbose:
128            print('4. Stdin')
129        fi = FileInput(files=(t1, t2, t3, t4, '-'), encoding="utf-8")
130        savestdin = sys.stdin
131        try:
132            sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
133            lines = list(fi)
134            self.assertEqual(len(lines), 33)
135            self.assertEqual(lines[32], 'Line 2 of stdin\n')
136            self.assertEqual(fi.filename(), '<stdin>')
137            fi.nextfile()
138        finally:
139            sys.stdin = savestdin
140
141        if verbose:
142            print('5. Boundary conditions')
143        fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8")
144        self.assertEqual(fi.lineno(), 0)
145        self.assertEqual(fi.filename(), None)
146        fi.nextfile()
147        self.assertEqual(fi.lineno(), 0)
148        self.assertEqual(fi.filename(), None)
149
150        if verbose:
151            print('6. Inplace')
152        savestdout = sys.stdout
153        try:
154            fi = FileInput(files=(t1, t2, t3, t4), inplace=1, encoding="utf-8")
155            for line in fi:
156                line = line[:-1].upper()
157                print(line)
158            fi.close()
159        finally:
160            sys.stdout = savestdout
161
162        fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8")
163        for line in fi:
164            self.assertEqual(line[-1], '\n')
165            m = pat.match(line[:-1])
166            self.assertNotEqual(m, None)
167            self.assertEqual(int(m.group(1)), fi.filelineno())
168        fi.close()
169
170class UnconditionallyRaise:
171    def __init__(self, exception_type):
172        self.exception_type = exception_type
173        self.invoked = False
174    def __call__(self, *args, **kwargs):
175        self.invoked = True
176        raise self.exception_type()
177
178class FileInputTests(BaseTests, unittest.TestCase):
179
180    def test_zero_byte_files(self):
181        t1 = self.writeTmp("")
182        t2 = self.writeTmp("")
183        t3 = self.writeTmp("The only line there is.\n")
184        t4 = self.writeTmp("")
185        fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8")
186
187        line = fi.readline()
188        self.assertEqual(line, 'The only line there is.\n')
189        self.assertEqual(fi.lineno(), 1)
190        self.assertEqual(fi.filelineno(), 1)
191        self.assertEqual(fi.filename(), t3)
192
193        line = fi.readline()
194        self.assertFalse(line)
195        self.assertEqual(fi.lineno(), 1)
196        self.assertEqual(fi.filelineno(), 0)
197        self.assertEqual(fi.filename(), t4)
198        fi.close()
199
200    def test_files_that_dont_end_with_newline(self):
201        t1 = self.writeTmp("A\nB\nC")
202        t2 = self.writeTmp("D\nE\nF")
203        fi = FileInput(files=(t1, t2), encoding="utf-8")
204        lines = list(fi)
205        self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
206        self.assertEqual(fi.filelineno(), 3)
207        self.assertEqual(fi.lineno(), 6)
208
209##     def test_unicode_filenames(self):
210##         # XXX A unicode string is always returned by writeTmp.
211##         #     So is this needed?
212##         t1 = self.writeTmp("A\nB")
213##         encoding = sys.getfilesystemencoding()
214##         if encoding is None:
215##             encoding = 'ascii'
216##         fi = FileInput(files=str(t1, encoding), encoding="utf-8")
217##         lines = list(fi)
218##         self.assertEqual(lines, ["A\n", "B"])
219
220    def test_fileno(self):
221        t1 = self.writeTmp("A\nB")
222        t2 = self.writeTmp("C\nD")
223        fi = FileInput(files=(t1, t2), encoding="utf-8")
224        self.assertEqual(fi.fileno(), -1)
225        line = next(fi)
226        self.assertNotEqual(fi.fileno(), -1)
227        fi.nextfile()
228        self.assertEqual(fi.fileno(), -1)
229        line = list(fi)
230        self.assertEqual(fi.fileno(), -1)
231
232    def test_invalid_opening_mode(self):
233        for mode in ('w', 'rU', 'U'):
234            with self.subTest(mode=mode):
235                with self.assertRaises(ValueError):
236                    FileInput(mode=mode)
237
238    def test_stdin_binary_mode(self):
239        with mock.patch('sys.stdin') as m_stdin:
240            m_stdin.buffer = BytesIO(b'spam, bacon, sausage, and spam')
241            fi = FileInput(files=['-'], mode='rb')
242            lines = list(fi)
243            self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
244
245    def test_detached_stdin_binary_mode(self):
246        orig_stdin = sys.stdin
247        try:
248            sys.stdin = BytesIO(b'spam, bacon, sausage, and spam')
249            self.assertFalse(hasattr(sys.stdin, 'buffer'))
250            fi = FileInput(files=['-'], mode='rb')
251            lines = list(fi)
252            self.assertEqual(lines, [b'spam, bacon, sausage, and spam'])
253        finally:
254            sys.stdin = orig_stdin
255
256    def test_file_opening_hook(self):
257        try:
258            # cannot use openhook and inplace mode
259            fi = FileInput(inplace=1, openhook=lambda f, m: None)
260            self.fail("FileInput should raise if both inplace "
261                             "and openhook arguments are given")
262        except ValueError:
263            pass
264        try:
265            fi = FileInput(openhook=1)
266            self.fail("FileInput should check openhook for being callable")
267        except ValueError:
268            pass
269
270        class CustomOpenHook:
271            def __init__(self):
272                self.invoked = False
273            def __call__(self, *args, **kargs):
274                self.invoked = True
275                return open(*args, encoding="utf-8")
276
277        t = self.writeTmp("\n")
278        custom_open_hook = CustomOpenHook()
279        with FileInput([t], openhook=custom_open_hook) as fi:
280            fi.readline()
281        self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
282
283    def test_readline(self):
284        with open(TESTFN, 'wb') as f:
285            f.write(b'A\nB\r\nC\r')
286            # Fill TextIOWrapper buffer.
287            f.write(b'123456789\n' * 1000)
288            # Issue #20501: readline() shouldn't read whole file.
289            f.write(b'\x80')
290        self.addCleanup(safe_unlink, TESTFN)
291
292        with FileInput(files=TESTFN,
293                       openhook=hook_encoded('ascii')) as fi:
294            try:
295                self.assertEqual(fi.readline(), 'A\n')
296                self.assertEqual(fi.readline(), 'B\n')
297                self.assertEqual(fi.readline(), 'C\n')
298            except UnicodeDecodeError:
299                self.fail('Read to end of file')
300            with self.assertRaises(UnicodeDecodeError):
301                # Read to the end of file.
302                list(fi)
303            self.assertEqual(fi.readline(), '')
304            self.assertEqual(fi.readline(), '')
305
306    def test_readline_binary_mode(self):
307        with open(TESTFN, 'wb') as f:
308            f.write(b'A\nB\r\nC\rD')
309        self.addCleanup(safe_unlink, TESTFN)
310
311        with FileInput(files=TESTFN, mode='rb') as fi:
312            self.assertEqual(fi.readline(), b'A\n')
313            self.assertEqual(fi.readline(), b'B\r\n')
314            self.assertEqual(fi.readline(), b'C\rD')
315            # Read to the end of file.
316            self.assertEqual(fi.readline(), b'')
317            self.assertEqual(fi.readline(), b'')
318
319    def test_inplace_binary_write_mode(self):
320        temp_file = self.writeTmp(b'Initial text.', mode='wb')
321        with FileInput(temp_file, mode='rb', inplace=True) as fobj:
322            line = fobj.readline()
323            self.assertEqual(line, b'Initial text.')
324            # print() cannot be used with files opened in binary mode.
325            sys.stdout.write(b'New line.')
326        with open(temp_file, 'rb') as f:
327            self.assertEqual(f.read(), b'New line.')
328
329    def test_inplace_encoding_errors(self):
330        temp_file = self.writeTmp(b'Initial text \x88', mode='wb')
331        with FileInput(temp_file, inplace=True,
332                       encoding="ascii", errors="replace") as fobj:
333            line = fobj.readline()
334            self.assertEqual(line, 'Initial text \ufffd')
335            print("New line \x88")
336        with open(temp_file, 'rb') as f:
337            self.assertEqual(f.read().rstrip(b'\r\n'), b'New line ?')
338
339    def test_file_hook_backward_compatibility(self):
340        def old_hook(filename, mode):
341            return io.StringIO("I used to receive only filename and mode")
342        t = self.writeTmp("\n")
343        with FileInput([t], openhook=old_hook) as fi:
344            result = fi.readline()
345        self.assertEqual(result, "I used to receive only filename and mode")
346
347    def test_context_manager(self):
348        t1 = self.writeTmp("A\nB\nC")
349        t2 = self.writeTmp("D\nE\nF")
350        with FileInput(files=(t1, t2), encoding="utf-8") as fi:
351            lines = list(fi)
352        self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
353        self.assertEqual(fi.filelineno(), 3)
354        self.assertEqual(fi.lineno(), 6)
355        self.assertEqual(fi._files, ())
356
357    def test_close_on_exception(self):
358        t1 = self.writeTmp("")
359        try:
360            with FileInput(files=t1, encoding="utf-8") as fi:
361                raise OSError
362        except OSError:
363            self.assertEqual(fi._files, ())
364
365    def test_empty_files_list_specified_to_constructor(self):
366        with FileInput(files=[], encoding="utf-8") as fi:
367            self.assertEqual(fi._files, ('-',))
368
369    def test_nextfile_oserror_deleting_backup(self):
370        """Tests invoking FileInput.nextfile() when the attempt to delete
371           the backup file would raise OSError.  This error is expected to be
372           silently ignored"""
373
374        os_unlink_orig = os.unlink
375        os_unlink_replacement = UnconditionallyRaise(OSError)
376        try:
377            t = self.writeTmp("\n")
378            self.addCleanup(safe_unlink, t + '.bak')
379            with FileInput(files=[t], inplace=True, encoding="utf-8") as fi:
380                next(fi) # make sure the file is opened
381                os.unlink = os_unlink_replacement
382                fi.nextfile()
383        finally:
384            os.unlink = os_unlink_orig
385
386        # sanity check to make sure that our test scenario was actually hit
387        self.assertTrue(os_unlink_replacement.invoked,
388                        "os.unlink() was not invoked")
389
390    def test_readline_os_fstat_raises_OSError(self):
391        """Tests invoking FileInput.readline() when os.fstat() raises OSError.
392           This exception should be silently discarded."""
393
394        os_fstat_orig = os.fstat
395        os_fstat_replacement = UnconditionallyRaise(OSError)
396        try:
397            t = self.writeTmp("\n")
398            with FileInput(files=[t], inplace=True, encoding="utf-8") as fi:
399                os.fstat = os_fstat_replacement
400                fi.readline()
401        finally:
402            os.fstat = os_fstat_orig
403
404        # sanity check to make sure that our test scenario was actually hit
405        self.assertTrue(os_fstat_replacement.invoked,
406                        "os.fstat() was not invoked")
407
408    def test_readline_os_chmod_raises_OSError(self):
409        """Tests invoking FileInput.readline() when os.chmod() raises OSError.
410           This exception should be silently discarded."""
411
412        os_chmod_orig = os.chmod
413        os_chmod_replacement = UnconditionallyRaise(OSError)
414        try:
415            t = self.writeTmp("\n")
416            with FileInput(files=[t], inplace=True, encoding="utf-8") as fi:
417                os.chmod = os_chmod_replacement
418                fi.readline()
419        finally:
420            os.chmod = os_chmod_orig
421
422        # sanity check to make sure that our test scenario was actually hit
423        self.assertTrue(os_chmod_replacement.invoked,
424                        "os.fstat() was not invoked")
425
426    def test_fileno_when_ValueError_raised(self):
427        class FilenoRaisesValueError(UnconditionallyRaise):
428            def __init__(self):
429                UnconditionallyRaise.__init__(self, ValueError)
430            def fileno(self):
431                self.__call__()
432
433        unconditionally_raise_ValueError = FilenoRaisesValueError()
434        t = self.writeTmp("\n")
435        with FileInput(files=[t], encoding="utf-8") as fi:
436            file_backup = fi._file
437            try:
438                fi._file = unconditionally_raise_ValueError
439                result = fi.fileno()
440            finally:
441                fi._file = file_backup # make sure the file gets cleaned up
442
443        # sanity check to make sure that our test scenario was actually hit
444        self.assertTrue(unconditionally_raise_ValueError.invoked,
445                        "_file.fileno() was not invoked")
446
447        self.assertEqual(result, -1, "fileno() should return -1")
448
449    def test_readline_buffering(self):
450        src = LineReader()
451        with FileInput(files=['line1\nline2', 'line3\n'],
452                       openhook=src.openhook) as fi:
453            self.assertEqual(src.linesread, [])
454            self.assertEqual(fi.readline(), 'line1\n')
455            self.assertEqual(src.linesread, ['line1\n'])
456            self.assertEqual(fi.readline(), 'line2')
457            self.assertEqual(src.linesread, ['line2'])
458            self.assertEqual(fi.readline(), 'line3\n')
459            self.assertEqual(src.linesread, ['', 'line3\n'])
460            self.assertEqual(fi.readline(), '')
461            self.assertEqual(src.linesread, [''])
462            self.assertEqual(fi.readline(), '')
463            self.assertEqual(src.linesread, [])
464
465    def test_iteration_buffering(self):
466        src = LineReader()
467        with FileInput(files=['line1\nline2', 'line3\n'],
468                       openhook=src.openhook) as fi:
469            self.assertEqual(src.linesread, [])
470            self.assertEqual(next(fi), 'line1\n')
471            self.assertEqual(src.linesread, ['line1\n'])
472            self.assertEqual(next(fi), 'line2')
473            self.assertEqual(src.linesread, ['line2'])
474            self.assertEqual(next(fi), 'line3\n')
475            self.assertEqual(src.linesread, ['', 'line3\n'])
476            self.assertRaises(StopIteration, next, fi)
477            self.assertEqual(src.linesread, [''])
478            self.assertRaises(StopIteration, next, fi)
479            self.assertEqual(src.linesread, [])
480
481    def test_pathlib_file(self):
482        t1 = Path(self.writeTmp("Pathlib file."))
483        with FileInput(t1, encoding="utf-8") as fi:
484            line = fi.readline()
485            self.assertEqual(line, 'Pathlib file.')
486            self.assertEqual(fi.lineno(), 1)
487            self.assertEqual(fi.filelineno(), 1)
488            self.assertEqual(fi.filename(), os.fspath(t1))
489
490    def test_pathlib_file_inplace(self):
491        t1 = Path(self.writeTmp('Pathlib file.'))
492        with FileInput(t1, inplace=True, encoding="utf-8") as fi:
493            line = fi.readline()
494            self.assertEqual(line, 'Pathlib file.')
495            print('Modified %s' % line)
496        with open(t1, encoding="utf-8") as f:
497            self.assertEqual(f.read(), 'Modified Pathlib file.\n')
498
499
500class MockFileInput:
501    """A class that mocks out fileinput.FileInput for use during unit tests"""
502
503    def __init__(self, files=None, inplace=False, backup="", *,
504                 mode="r", openhook=None, encoding=None, errors=None):
505        self.files = files
506        self.inplace = inplace
507        self.backup = backup
508        self.mode = mode
509        self.openhook = openhook
510        self.encoding = encoding
511        self.errors = errors
512        self._file = None
513        self.invocation_counts = collections.defaultdict(lambda: 0)
514        self.return_values = {}
515
516    def close(self):
517        self.invocation_counts["close"] += 1
518
519    def nextfile(self):
520        self.invocation_counts["nextfile"] += 1
521        return self.return_values["nextfile"]
522
523    def filename(self):
524        self.invocation_counts["filename"] += 1
525        return self.return_values["filename"]
526
527    def lineno(self):
528        self.invocation_counts["lineno"] += 1
529        return self.return_values["lineno"]
530
531    def filelineno(self):
532        self.invocation_counts["filelineno"] += 1
533        return self.return_values["filelineno"]
534
535    def fileno(self):
536        self.invocation_counts["fileno"] += 1
537        return self.return_values["fileno"]
538
539    def isfirstline(self):
540        self.invocation_counts["isfirstline"] += 1
541        return self.return_values["isfirstline"]
542
543    def isstdin(self):
544        self.invocation_counts["isstdin"] += 1
545        return self.return_values["isstdin"]
546
547class BaseFileInputGlobalMethodsTest(unittest.TestCase):
548    """Base class for unit tests for the global function of
549       the fileinput module."""
550
551    def setUp(self):
552        self._orig_state = fileinput._state
553        self._orig_FileInput = fileinput.FileInput
554        fileinput.FileInput = MockFileInput
555
556    def tearDown(self):
557        fileinput.FileInput = self._orig_FileInput
558        fileinput._state = self._orig_state
559
560    def assertExactlyOneInvocation(self, mock_file_input, method_name):
561        # assert that the method with the given name was invoked once
562        actual_count = mock_file_input.invocation_counts[method_name]
563        self.assertEqual(actual_count, 1, method_name)
564        # assert that no other unexpected methods were invoked
565        actual_total_count = len(mock_file_input.invocation_counts)
566        self.assertEqual(actual_total_count, 1)
567
568class Test_fileinput_input(BaseFileInputGlobalMethodsTest):
569    """Unit tests for fileinput.input()"""
570
571    def test_state_is_not_None_and_state_file_is_not_None(self):
572        """Tests invoking fileinput.input() when fileinput._state is not None
573           and its _file attribute is also not None.  Expect RuntimeError to
574           be raised with a meaningful error message and for fileinput._state
575           to *not* be modified."""
576        instance = MockFileInput()
577        instance._file = object()
578        fileinput._state = instance
579        with self.assertRaises(RuntimeError) as cm:
580            fileinput.input()
581        self.assertEqual(("input() already active",), cm.exception.args)
582        self.assertIs(instance, fileinput._state, "fileinput._state")
583
584    def test_state_is_not_None_and_state_file_is_None(self):
585        """Tests invoking fileinput.input() when fileinput._state is not None
586           but its _file attribute *is* None.  Expect it to create and return
587           a new fileinput.FileInput object with all method parameters passed
588           explicitly to the __init__() method; also ensure that
589           fileinput._state is set to the returned instance."""
590        instance = MockFileInput()
591        instance._file = None
592        fileinput._state = instance
593        self.do_test_call_input()
594
595    def test_state_is_None(self):
596        """Tests invoking fileinput.input() when fileinput._state is None
597           Expect it to create and return a new fileinput.FileInput object
598           with all method parameters passed explicitly to the __init__()
599           method; also ensure that fileinput._state is set to the returned
600           instance."""
601        fileinput._state = None
602        self.do_test_call_input()
603
604    def do_test_call_input(self):
605        """Tests that fileinput.input() creates a new fileinput.FileInput
606           object, passing the given parameters unmodified to
607           fileinput.FileInput.__init__().  Note that this test depends on the
608           monkey patching of fileinput.FileInput done by setUp()."""
609        files = object()
610        inplace = object()
611        backup = object()
612        mode = object()
613        openhook = object()
614        encoding = object()
615
616        # call fileinput.input() with different values for each argument
617        result = fileinput.input(files=files, inplace=inplace, backup=backup,
618            mode=mode, openhook=openhook, encoding=encoding)
619
620        # ensure fileinput._state was set to the returned object
621        self.assertIs(result, fileinput._state, "fileinput._state")
622
623        # ensure the parameters to fileinput.input() were passed directly
624        # to FileInput.__init__()
625        self.assertIs(files, result.files, "files")
626        self.assertIs(inplace, result.inplace, "inplace")
627        self.assertIs(backup, result.backup, "backup")
628        self.assertIs(mode, result.mode, "mode")
629        self.assertIs(openhook, result.openhook, "openhook")
630
631class Test_fileinput_close(BaseFileInputGlobalMethodsTest):
632    """Unit tests for fileinput.close()"""
633
634    def test_state_is_None(self):
635        """Tests that fileinput.close() does nothing if fileinput._state
636           is None"""
637        fileinput._state = None
638        fileinput.close()
639        self.assertIsNone(fileinput._state)
640
641    def test_state_is_not_None(self):
642        """Tests that fileinput.close() invokes close() on fileinput._state
643           and sets _state=None"""
644        instance = MockFileInput()
645        fileinput._state = instance
646        fileinput.close()
647        self.assertExactlyOneInvocation(instance, "close")
648        self.assertIsNone(fileinput._state)
649
650class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest):
651    """Unit tests for fileinput.nextfile()"""
652
653    def test_state_is_None(self):
654        """Tests fileinput.nextfile() when fileinput._state is None.
655           Ensure that it raises RuntimeError with a meaningful error message
656           and does not modify fileinput._state"""
657        fileinput._state = None
658        with self.assertRaises(RuntimeError) as cm:
659            fileinput.nextfile()
660        self.assertEqual(("no active input()",), cm.exception.args)
661        self.assertIsNone(fileinput._state)
662
663    def test_state_is_not_None(self):
664        """Tests fileinput.nextfile() when fileinput._state is not None.
665           Ensure that it invokes fileinput._state.nextfile() exactly once,
666           returns whatever it returns, and does not modify fileinput._state
667           to point to a different object."""
668        nextfile_retval = object()
669        instance = MockFileInput()
670        instance.return_values["nextfile"] = nextfile_retval
671        fileinput._state = instance
672        retval = fileinput.nextfile()
673        self.assertExactlyOneInvocation(instance, "nextfile")
674        self.assertIs(retval, nextfile_retval)
675        self.assertIs(fileinput._state, instance)
676
677class Test_fileinput_filename(BaseFileInputGlobalMethodsTest):
678    """Unit tests for fileinput.filename()"""
679
680    def test_state_is_None(self):
681        """Tests fileinput.filename() when fileinput._state is None.
682           Ensure that it raises RuntimeError with a meaningful error message
683           and does not modify fileinput._state"""
684        fileinput._state = None
685        with self.assertRaises(RuntimeError) as cm:
686            fileinput.filename()
687        self.assertEqual(("no active input()",), cm.exception.args)
688        self.assertIsNone(fileinput._state)
689
690    def test_state_is_not_None(self):
691        """Tests fileinput.filename() when fileinput._state is not None.
692           Ensure that it invokes fileinput._state.filename() exactly once,
693           returns whatever it returns, and does not modify fileinput._state
694           to point to a different object."""
695        filename_retval = object()
696        instance = MockFileInput()
697        instance.return_values["filename"] = filename_retval
698        fileinput._state = instance
699        retval = fileinput.filename()
700        self.assertExactlyOneInvocation(instance, "filename")
701        self.assertIs(retval, filename_retval)
702        self.assertIs(fileinput._state, instance)
703
704class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest):
705    """Unit tests for fileinput.lineno()"""
706
707    def test_state_is_None(self):
708        """Tests fileinput.lineno() when fileinput._state is None.
709           Ensure that it raises RuntimeError with a meaningful error message
710           and does not modify fileinput._state"""
711        fileinput._state = None
712        with self.assertRaises(RuntimeError) as cm:
713            fileinput.lineno()
714        self.assertEqual(("no active input()",), cm.exception.args)
715        self.assertIsNone(fileinput._state)
716
717    def test_state_is_not_None(self):
718        """Tests fileinput.lineno() when fileinput._state is not None.
719           Ensure that it invokes fileinput._state.lineno() exactly once,
720           returns whatever it returns, and does not modify fileinput._state
721           to point to a different object."""
722        lineno_retval = object()
723        instance = MockFileInput()
724        instance.return_values["lineno"] = lineno_retval
725        fileinput._state = instance
726        retval = fileinput.lineno()
727        self.assertExactlyOneInvocation(instance, "lineno")
728        self.assertIs(retval, lineno_retval)
729        self.assertIs(fileinput._state, instance)
730
731class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest):
732    """Unit tests for fileinput.filelineno()"""
733
734    def test_state_is_None(self):
735        """Tests fileinput.filelineno() when fileinput._state is None.
736           Ensure that it raises RuntimeError with a meaningful error message
737           and does not modify fileinput._state"""
738        fileinput._state = None
739        with self.assertRaises(RuntimeError) as cm:
740            fileinput.filelineno()
741        self.assertEqual(("no active input()",), cm.exception.args)
742        self.assertIsNone(fileinput._state)
743
744    def test_state_is_not_None(self):
745        """Tests fileinput.filelineno() when fileinput._state is not None.
746           Ensure that it invokes fileinput._state.filelineno() exactly once,
747           returns whatever it returns, and does not modify fileinput._state
748           to point to a different object."""
749        filelineno_retval = object()
750        instance = MockFileInput()
751        instance.return_values["filelineno"] = filelineno_retval
752        fileinput._state = instance
753        retval = fileinput.filelineno()
754        self.assertExactlyOneInvocation(instance, "filelineno")
755        self.assertIs(retval, filelineno_retval)
756        self.assertIs(fileinput._state, instance)
757
758class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest):
759    """Unit tests for fileinput.fileno()"""
760
761    def test_state_is_None(self):
762        """Tests fileinput.fileno() when fileinput._state is None.
763           Ensure that it raises RuntimeError with a meaningful error message
764           and does not modify fileinput._state"""
765        fileinput._state = None
766        with self.assertRaises(RuntimeError) as cm:
767            fileinput.fileno()
768        self.assertEqual(("no active input()",), cm.exception.args)
769        self.assertIsNone(fileinput._state)
770
771    def test_state_is_not_None(self):
772        """Tests fileinput.fileno() when fileinput._state is not None.
773           Ensure that it invokes fileinput._state.fileno() exactly once,
774           returns whatever it returns, and does not modify fileinput._state
775           to point to a different object."""
776        fileno_retval = object()
777        instance = MockFileInput()
778        instance.return_values["fileno"] = fileno_retval
779        instance.fileno_retval = fileno_retval
780        fileinput._state = instance
781        retval = fileinput.fileno()
782        self.assertExactlyOneInvocation(instance, "fileno")
783        self.assertIs(retval, fileno_retval)
784        self.assertIs(fileinput._state, instance)
785
786class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest):
787    """Unit tests for fileinput.isfirstline()"""
788
789    def test_state_is_None(self):
790        """Tests fileinput.isfirstline() when fileinput._state is None.
791           Ensure that it raises RuntimeError with a meaningful error message
792           and does not modify fileinput._state"""
793        fileinput._state = None
794        with self.assertRaises(RuntimeError) as cm:
795            fileinput.isfirstline()
796        self.assertEqual(("no active input()",), cm.exception.args)
797        self.assertIsNone(fileinput._state)
798
799    def test_state_is_not_None(self):
800        """Tests fileinput.isfirstline() when fileinput._state is not None.
801           Ensure that it invokes fileinput._state.isfirstline() exactly once,
802           returns whatever it returns, and does not modify fileinput._state
803           to point to a different object."""
804        isfirstline_retval = object()
805        instance = MockFileInput()
806        instance.return_values["isfirstline"] = isfirstline_retval
807        fileinput._state = instance
808        retval = fileinput.isfirstline()
809        self.assertExactlyOneInvocation(instance, "isfirstline")
810        self.assertIs(retval, isfirstline_retval)
811        self.assertIs(fileinput._state, instance)
812
813class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest):
814    """Unit tests for fileinput.isstdin()"""
815
816    def test_state_is_None(self):
817        """Tests fileinput.isstdin() when fileinput._state is None.
818           Ensure that it raises RuntimeError with a meaningful error message
819           and does not modify fileinput._state"""
820        fileinput._state = None
821        with self.assertRaises(RuntimeError) as cm:
822            fileinput.isstdin()
823        self.assertEqual(("no active input()",), cm.exception.args)
824        self.assertIsNone(fileinput._state)
825
826    def test_state_is_not_None(self):
827        """Tests fileinput.isstdin() when fileinput._state is not None.
828           Ensure that it invokes fileinput._state.isstdin() exactly once,
829           returns whatever it returns, and does not modify fileinput._state
830           to point to a different object."""
831        isstdin_retval = object()
832        instance = MockFileInput()
833        instance.return_values["isstdin"] = isstdin_retval
834        fileinput._state = instance
835        retval = fileinput.isstdin()
836        self.assertExactlyOneInvocation(instance, "isstdin")
837        self.assertIs(retval, isstdin_retval)
838        self.assertIs(fileinput._state, instance)
839
840class InvocationRecorder:
841
842    def __init__(self):
843        self.invocation_count = 0
844
845    def __call__(self, *args, **kwargs):
846        self.invocation_count += 1
847        self.last_invocation = (args, kwargs)
848        return io.BytesIO(b'some bytes')
849
850
851class Test_hook_compressed(unittest.TestCase):
852    """Unit tests for fileinput.hook_compressed()"""
853
854    def setUp(self):
855        self.fake_open = InvocationRecorder()
856
857    def test_empty_string(self):
858        self.do_test_use_builtin_open_text("", "r")
859
860    def test_no_ext(self):
861        self.do_test_use_builtin_open_text("abcd", "r")
862
863    @unittest.skipUnless(gzip, "Requires gzip and zlib")
864    def test_gz_ext_fake(self):
865        original_open = gzip.open
866        gzip.open = self.fake_open
867        try:
868            result = fileinput.hook_compressed("test.gz", "r")
869        finally:
870            gzip.open = original_open
871
872        self.assertEqual(self.fake_open.invocation_count, 1)
873        self.assertEqual(self.fake_open.last_invocation, (("test.gz", "r"), {}))
874
875    @unittest.skipUnless(gzip, "Requires gzip and zlib")
876    def test_gz_with_encoding_fake(self):
877        original_open = gzip.open
878        gzip.open = lambda filename, mode: io.BytesIO(b'Ex-binary string')
879        try:
880            result = fileinput.hook_compressed("test.gz", "r", encoding="utf-8")
881        finally:
882            gzip.open = original_open
883        self.assertEqual(list(result), ['Ex-binary string'])
884
885    @unittest.skipUnless(bz2, "Requires bz2")
886    def test_bz2_ext_fake(self):
887        original_open = bz2.BZ2File
888        bz2.BZ2File = self.fake_open
889        try:
890            result = fileinput.hook_compressed("test.bz2", "r")
891        finally:
892            bz2.BZ2File = original_open
893
894        self.assertEqual(self.fake_open.invocation_count, 1)
895        self.assertEqual(self.fake_open.last_invocation, (("test.bz2", "r"), {}))
896
897    def test_blah_ext(self):
898        self.do_test_use_builtin_open_binary("abcd.blah", "rb")
899
900    def test_gz_ext_builtin(self):
901        self.do_test_use_builtin_open_binary("abcd.Gz", "rb")
902
903    def test_bz2_ext_builtin(self):
904        self.do_test_use_builtin_open_binary("abcd.Bz2", "rb")
905
906    def test_binary_mode_encoding(self):
907        self.do_test_use_builtin_open_binary("abcd", "rb")
908
909    def test_text_mode_encoding(self):
910        self.do_test_use_builtin_open_text("abcd", "r")
911
912    def do_test_use_builtin_open_binary(self, filename, mode):
913        original_open = self.replace_builtin_open(self.fake_open)
914        try:
915            result = fileinput.hook_compressed(filename, mode)
916        finally:
917            self.replace_builtin_open(original_open)
918
919        self.assertEqual(self.fake_open.invocation_count, 1)
920        self.assertEqual(self.fake_open.last_invocation,
921                         ((filename, mode), {'encoding': None, 'errors': None}))
922
923    def do_test_use_builtin_open_text(self, filename, mode):
924        original_open = self.replace_builtin_open(self.fake_open)
925        try:
926            result = fileinput.hook_compressed(filename, mode)
927        finally:
928            self.replace_builtin_open(original_open)
929
930        self.assertEqual(self.fake_open.invocation_count, 1)
931        self.assertEqual(self.fake_open.last_invocation,
932                         ((filename, mode), {'encoding': 'locale', 'errors': None}))
933
934    @staticmethod
935    def replace_builtin_open(new_open_func):
936        original_open = builtins.open
937        builtins.open = new_open_func
938        return original_open
939
940class Test_hook_encoded(unittest.TestCase):
941    """Unit tests for fileinput.hook_encoded()"""
942
943    def test(self):
944        encoding = object()
945        errors = object()
946        result = fileinput.hook_encoded(encoding, errors=errors)
947
948        fake_open = InvocationRecorder()
949        original_open = builtins.open
950        builtins.open = fake_open
951        try:
952            filename = object()
953            mode = object()
954            open_result = result(filename, mode)
955        finally:
956            builtins.open = original_open
957
958        self.assertEqual(fake_open.invocation_count, 1)
959
960        args, kwargs = fake_open.last_invocation
961        self.assertIs(args[0], filename)
962        self.assertIs(args[1], mode)
963        self.assertIs(kwargs.pop('encoding'), encoding)
964        self.assertIs(kwargs.pop('errors'), errors)
965        self.assertFalse(kwargs)
966
967    def test_errors(self):
968        with open(TESTFN, 'wb') as f:
969            f.write(b'\x80abc')
970        self.addCleanup(safe_unlink, TESTFN)
971
972        def check(errors, expected_lines):
973            with FileInput(files=TESTFN, mode='r',
974                           openhook=hook_encoded('utf-8', errors=errors)) as fi:
975                lines = list(fi)
976            self.assertEqual(lines, expected_lines)
977
978        check('ignore', ['abc'])
979        with self.assertRaises(UnicodeDecodeError):
980            check('strict', ['abc'])
981        check('replace', ['\ufffdabc'])
982        check('backslashreplace', ['\\x80abc'])
983
984    def test_modes(self):
985        with open(TESTFN, 'wb') as f:
986            # UTF-7 is a convenient, seldom used encoding
987            f.write(b'A\nB\r\nC\rD+IKw-')
988        self.addCleanup(safe_unlink, TESTFN)
989
990        def check(mode, expected_lines):
991            with FileInput(files=TESTFN, mode=mode,
992                           openhook=hook_encoded('utf-7')) as fi:
993                lines = list(fi)
994            self.assertEqual(lines, expected_lines)
995
996        check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
997        with self.assertRaises(ValueError):
998            check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
999
1000
1001class MiscTest(unittest.TestCase):
1002
1003    def test_all(self):
1004        support.check__all__(self, fileinput)
1005
1006
1007if __name__ == "__main__":
1008    unittest.main()
1009