1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11#     (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as an attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22import abc
23import array
24import errno
25import locale
26import os
27import pickle
28import random
29import signal
30import sys
31import textwrap
32import threading
33import time
34import unittest
35import warnings
36import weakref
37from collections import deque, UserList
38from itertools import cycle, count
39from test import support
40from test.support.script_helper import (
41    assert_python_ok, assert_python_failure, run_python_until_end)
42from test.support import import_helper
43from test.support import os_helper
44from test.support import threading_helper
45from test.support import warnings_helper
46from test.support import skip_if_sanitizer
47from test.support.os_helper import FakePath
48
49import codecs
50import io  # C implementation of io
51import _pyio as pyio # Python implementation of io
52
53try:
54    import ctypes
55except ImportError:
56    def byteslike(*pos, **kw):
57        return array.array("b", bytes(*pos, **kw))
58else:
59    def byteslike(*pos, **kw):
60        """Create a bytes-like object having no string or sequence methods"""
61        data = bytes(*pos, **kw)
62        obj = EmptyStruct()
63        ctypes.resize(obj, len(data))
64        memoryview(obj).cast("B")[:] = data
65        return obj
66    class EmptyStruct(ctypes.Structure):
67        pass
68
69# Does io.IOBase finalizer log the exception if the close() method fails?
70# The exception is ignored silently by default in release build.
71IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
72
73
74def _default_chunk_size():
75    """Get the default TextIOWrapper chunk size"""
76    with open(__file__, "r", encoding="latin-1") as f:
77        return f._CHUNK_SIZE
78
79requires_alarm = unittest.skipUnless(
80    hasattr(signal, "alarm"), "test requires signal.alarm()"
81)
82
83
84class MockRawIOWithoutRead:
85    """A RawIO implementation without read(), so as to exercise the default
86    RawIO.read() which calls readinto()."""
87
88    def __init__(self, read_stack=()):
89        self._read_stack = list(read_stack)
90        self._write_stack = []
91        self._reads = 0
92        self._extraneous_reads = 0
93
94    def write(self, b):
95        self._write_stack.append(bytes(b))
96        return len(b)
97
98    def writable(self):
99        return True
100
101    def fileno(self):
102        return 42
103
104    def readable(self):
105        return True
106
107    def seekable(self):
108        return True
109
110    def seek(self, pos, whence):
111        return 0   # wrong but we gotta return something
112
113    def tell(self):
114        return 0   # same comment as above
115
116    def readinto(self, buf):
117        self._reads += 1
118        max_len = len(buf)
119        try:
120            data = self._read_stack[0]
121        except IndexError:
122            self._extraneous_reads += 1
123            return 0
124        if data is None:
125            del self._read_stack[0]
126            return None
127        n = len(data)
128        if len(data) <= max_len:
129            del self._read_stack[0]
130            buf[:n] = data
131            return n
132        else:
133            buf[:] = data[:max_len]
134            self._read_stack[0] = data[max_len:]
135            return max_len
136
137    def truncate(self, pos=None):
138        return pos
139
140class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
141    pass
142
143class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
144    pass
145
146
147class MockRawIO(MockRawIOWithoutRead):
148
149    def read(self, n=None):
150        self._reads += 1
151        try:
152            return self._read_stack.pop(0)
153        except:
154            self._extraneous_reads += 1
155            return b""
156
157class CMockRawIO(MockRawIO, io.RawIOBase):
158    pass
159
160class PyMockRawIO(MockRawIO, pyio.RawIOBase):
161    pass
162
163
164class MisbehavedRawIO(MockRawIO):
165    def write(self, b):
166        return super().write(b) * 2
167
168    def read(self, n=None):
169        return super().read(n) * 2
170
171    def seek(self, pos, whence):
172        return -123
173
174    def tell(self):
175        return -456
176
177    def readinto(self, buf):
178        super().readinto(buf)
179        return len(buf) * 5
180
181class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
182    pass
183
184class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
185    pass
186
187
188class SlowFlushRawIO(MockRawIO):
189    def __init__(self):
190        super().__init__()
191        self.in_flush = threading.Event()
192
193    def flush(self):
194        self.in_flush.set()
195        time.sleep(0.25)
196
197class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
198    pass
199
200class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
201    pass
202
203
204class CloseFailureIO(MockRawIO):
205    closed = 0
206
207    def close(self):
208        if not self.closed:
209            self.closed = 1
210            raise OSError
211
212class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
213    pass
214
215class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
216    pass
217
218
219class MockFileIO:
220
221    def __init__(self, data):
222        self.read_history = []
223        super().__init__(data)
224
225    def read(self, n=None):
226        res = super().read(n)
227        self.read_history.append(None if res is None else len(res))
228        return res
229
230    def readinto(self, b):
231        res = super().readinto(b)
232        self.read_history.append(res)
233        return res
234
235class CMockFileIO(MockFileIO, io.BytesIO):
236    pass
237
238class PyMockFileIO(MockFileIO, pyio.BytesIO):
239    pass
240
241
242class MockUnseekableIO:
243    def seekable(self):
244        return False
245
246    def seek(self, *args):
247        raise self.UnsupportedOperation("not seekable")
248
249    def tell(self, *args):
250        raise self.UnsupportedOperation("not seekable")
251
252    def truncate(self, *args):
253        raise self.UnsupportedOperation("not seekable")
254
255class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
256    UnsupportedOperation = io.UnsupportedOperation
257
258class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
259    UnsupportedOperation = pyio.UnsupportedOperation
260
261
262class MockNonBlockWriterIO:
263
264    def __init__(self):
265        self._write_stack = []
266        self._blocker_char = None
267
268    def pop_written(self):
269        s = b"".join(self._write_stack)
270        self._write_stack[:] = []
271        return s
272
273    def block_on(self, char):
274        """Block when a given char is encountered."""
275        self._blocker_char = char
276
277    def readable(self):
278        return True
279
280    def seekable(self):
281        return True
282
283    def seek(self, pos, whence=0):
284        # naive implementation, enough for tests
285        return 0
286
287    def writable(self):
288        return True
289
290    def write(self, b):
291        b = bytes(b)
292        n = -1
293        if self._blocker_char:
294            try:
295                n = b.index(self._blocker_char)
296            except ValueError:
297                pass
298            else:
299                if n > 0:
300                    # write data up to the first blocker
301                    self._write_stack.append(b[:n])
302                    return n
303                else:
304                    # cancel blocker and indicate would block
305                    self._blocker_char = None
306                    return None
307        self._write_stack.append(b)
308        return len(b)
309
310class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
311    BlockingIOError = io.BlockingIOError
312
313class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
314    BlockingIOError = pyio.BlockingIOError
315
316
317class IOTest(unittest.TestCase):
318
319    def setUp(self):
320        os_helper.unlink(os_helper.TESTFN)
321
322    def tearDown(self):
323        os_helper.unlink(os_helper.TESTFN)
324
325    def write_ops(self, f):
326        self.assertEqual(f.write(b"blah."), 5)
327        f.truncate(0)
328        self.assertEqual(f.tell(), 5)
329        f.seek(0)
330
331        self.assertEqual(f.write(b"blah."), 5)
332        self.assertEqual(f.seek(0), 0)
333        self.assertEqual(f.write(b"Hello."), 6)
334        self.assertEqual(f.tell(), 6)
335        self.assertEqual(f.seek(-1, 1), 5)
336        self.assertEqual(f.tell(), 5)
337        buffer = bytearray(b" world\n\n\n")
338        self.assertEqual(f.write(buffer), 9)
339        buffer[:] = b"*" * 9  # Overwrite our copy of the data
340        self.assertEqual(f.seek(0), 0)
341        self.assertEqual(f.write(b"h"), 1)
342        self.assertEqual(f.seek(-1, 2), 13)
343        self.assertEqual(f.tell(), 13)
344
345        self.assertEqual(f.truncate(12), 12)
346        self.assertEqual(f.tell(), 13)
347        self.assertRaises(TypeError, f.seek, 0.0)
348
349    def read_ops(self, f, buffered=False):
350        data = f.read(5)
351        self.assertEqual(data, b"hello")
352        data = byteslike(data)
353        self.assertEqual(f.readinto(data), 5)
354        self.assertEqual(bytes(data), b" worl")
355        data = bytearray(5)
356        self.assertEqual(f.readinto(data), 2)
357        self.assertEqual(len(data), 5)
358        self.assertEqual(data[:2], b"d\n")
359        self.assertEqual(f.seek(0), 0)
360        self.assertEqual(f.read(20), b"hello world\n")
361        self.assertEqual(f.read(1), b"")
362        self.assertEqual(f.readinto(byteslike(b"x")), 0)
363        self.assertEqual(f.seek(-6, 2), 6)
364        self.assertEqual(f.read(5), b"world")
365        self.assertEqual(f.read(0), b"")
366        self.assertEqual(f.readinto(byteslike()), 0)
367        self.assertEqual(f.seek(-6, 1), 5)
368        self.assertEqual(f.read(5), b" worl")
369        self.assertEqual(f.tell(), 10)
370        self.assertRaises(TypeError, f.seek, 0.0)
371        if buffered:
372            f.seek(0)
373            self.assertEqual(f.read(), b"hello world\n")
374            f.seek(6)
375            self.assertEqual(f.read(), b"world\n")
376            self.assertEqual(f.read(), b"")
377            f.seek(0)
378            data = byteslike(5)
379            self.assertEqual(f.readinto1(data), 5)
380            self.assertEqual(bytes(data), b"hello")
381
382    LARGE = 2**31
383
384    def large_file_ops(self, f):
385        assert f.readable()
386        assert f.writable()
387        try:
388            self.assertEqual(f.seek(self.LARGE), self.LARGE)
389        except (OverflowError, ValueError):
390            self.skipTest("no largefile support")
391        self.assertEqual(f.tell(), self.LARGE)
392        self.assertEqual(f.write(b"xxx"), 3)
393        self.assertEqual(f.tell(), self.LARGE + 3)
394        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
395        self.assertEqual(f.truncate(), self.LARGE + 2)
396        self.assertEqual(f.tell(), self.LARGE + 2)
397        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
398        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
399        self.assertEqual(f.tell(), self.LARGE + 2)
400        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
401        self.assertEqual(f.seek(-1, 2), self.LARGE)
402        self.assertEqual(f.read(2), b"x")
403
404    def test_invalid_operations(self):
405        # Try writing on a file opened in read mode and vice-versa.
406        exc = self.UnsupportedOperation
407        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp:
408            self.assertRaises(exc, fp.read)
409            self.assertRaises(exc, fp.readline)
410        with self.open(os_helper.TESTFN, "wb") as fp:
411            self.assertRaises(exc, fp.read)
412            self.assertRaises(exc, fp.readline)
413        with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
414            self.assertRaises(exc, fp.read)
415            self.assertRaises(exc, fp.readline)
416        with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
417            self.assertRaises(exc, fp.write, b"blah")
418            self.assertRaises(exc, fp.writelines, [b"blah\n"])
419        with self.open(os_helper.TESTFN, "rb") as fp:
420            self.assertRaises(exc, fp.write, b"blah")
421            self.assertRaises(exc, fp.writelines, [b"blah\n"])
422        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp:
423            self.assertRaises(exc, fp.write, "blah")
424            self.assertRaises(exc, fp.writelines, ["blah\n"])
425            # Non-zero seeking from current or end pos
426            self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
427            self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
428
429    @unittest.skipIf(
430        support.is_emscripten, "fstat() of a pipe fd is not supported"
431    )
432    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
433    def test_optional_abilities(self):
434        # Test for OSError when optional APIs are not supported
435        # The purpose of this test is to try fileno(), reading, writing and
436        # seeking operations with various objects that indicate they do not
437        # support these operations.
438
439        def pipe_reader():
440            [r, w] = os.pipe()
441            os.close(w)  # So that read() is harmless
442            return self.FileIO(r, "r")
443
444        def pipe_writer():
445            [r, w] = os.pipe()
446            self.addCleanup(os.close, r)
447            # Guarantee that we can write into the pipe without blocking
448            thread = threading.Thread(target=os.read, args=(r, 100))
449            thread.start()
450            self.addCleanup(thread.join)
451            return self.FileIO(w, "w")
452
453        def buffered_reader():
454            return self.BufferedReader(self.MockUnseekableIO())
455
456        def buffered_writer():
457            return self.BufferedWriter(self.MockUnseekableIO())
458
459        def buffered_random():
460            return self.BufferedRandom(self.BytesIO())
461
462        def buffered_rw_pair():
463            return self.BufferedRWPair(self.MockUnseekableIO(),
464                self.MockUnseekableIO())
465
466        def text_reader():
467            class UnseekableReader(self.MockUnseekableIO):
468                writable = self.BufferedIOBase.writable
469                write = self.BufferedIOBase.write
470            return self.TextIOWrapper(UnseekableReader(), "ascii")
471
472        def text_writer():
473            class UnseekableWriter(self.MockUnseekableIO):
474                readable = self.BufferedIOBase.readable
475                read = self.BufferedIOBase.read
476            return self.TextIOWrapper(UnseekableWriter(), "ascii")
477
478        tests = (
479            (pipe_reader, "fr"), (pipe_writer, "fw"),
480            (buffered_reader, "r"), (buffered_writer, "w"),
481            (buffered_random, "rws"), (buffered_rw_pair, "rw"),
482            (text_reader, "r"), (text_writer, "w"),
483            (self.BytesIO, "rws"), (self.StringIO, "rws"),
484        )
485        for [test, abilities] in tests:
486            with self.subTest(test), test() as obj:
487                readable = "r" in abilities
488                self.assertEqual(obj.readable(), readable)
489                writable = "w" in abilities
490                self.assertEqual(obj.writable(), writable)
491
492                if isinstance(obj, self.TextIOBase):
493                    data = "3"
494                elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
495                    data = b"3"
496                else:
497                    self.fail("Unknown base class")
498
499                if "f" in abilities:
500                    obj.fileno()
501                else:
502                    self.assertRaises(OSError, obj.fileno)
503
504                if readable:
505                    obj.read(1)
506                    obj.read()
507                else:
508                    self.assertRaises(OSError, obj.read, 1)
509                    self.assertRaises(OSError, obj.read)
510
511                if writable:
512                    obj.write(data)
513                else:
514                    self.assertRaises(OSError, obj.write, data)
515
516                if sys.platform.startswith("win") and test in (
517                        pipe_reader, pipe_writer):
518                    # Pipes seem to appear as seekable on Windows
519                    continue
520                seekable = "s" in abilities
521                self.assertEqual(obj.seekable(), seekable)
522
523                if seekable:
524                    obj.tell()
525                    obj.seek(0)
526                else:
527                    self.assertRaises(OSError, obj.tell)
528                    self.assertRaises(OSError, obj.seek, 0)
529
530                if writable and seekable:
531                    obj.truncate()
532                    obj.truncate(0)
533                else:
534                    self.assertRaises(OSError, obj.truncate)
535                    self.assertRaises(OSError, obj.truncate, 0)
536
537    def test_open_handles_NUL_chars(self):
538        fn_with_NUL = 'foo\0bar'
539        self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8")
540
541        bytes_fn = bytes(fn_with_NUL, 'ascii')
542        with warnings.catch_warnings():
543            warnings.simplefilter("ignore", DeprecationWarning)
544            self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8")
545
546    def test_raw_file_io(self):
547        with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
548            self.assertEqual(f.readable(), False)
549            self.assertEqual(f.writable(), True)
550            self.assertEqual(f.seekable(), True)
551            self.write_ops(f)
552        with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
553            self.assertEqual(f.readable(), True)
554            self.assertEqual(f.writable(), False)
555            self.assertEqual(f.seekable(), True)
556            self.read_ops(f)
557
558    def test_buffered_file_io(self):
559        with self.open(os_helper.TESTFN, "wb") as f:
560            self.assertEqual(f.readable(), False)
561            self.assertEqual(f.writable(), True)
562            self.assertEqual(f.seekable(), True)
563            self.write_ops(f)
564        with self.open(os_helper.TESTFN, "rb") as f:
565            self.assertEqual(f.readable(), True)
566            self.assertEqual(f.writable(), False)
567            self.assertEqual(f.seekable(), True)
568            self.read_ops(f, True)
569
570    def test_readline(self):
571        with self.open(os_helper.TESTFN, "wb") as f:
572            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
573        with self.open(os_helper.TESTFN, "rb") as f:
574            self.assertEqual(f.readline(), b"abc\n")
575            self.assertEqual(f.readline(10), b"def\n")
576            self.assertEqual(f.readline(2), b"xy")
577            self.assertEqual(f.readline(4), b"zzy\n")
578            self.assertEqual(f.readline(), b"foo\x00bar\n")
579            self.assertEqual(f.readline(None), b"another line")
580            self.assertRaises(TypeError, f.readline, 5.3)
581        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
582            self.assertRaises(TypeError, f.readline, 5.3)
583
584    def test_readline_nonsizeable(self):
585        # Issue #30061
586        # Crash when readline() returns an object without __len__
587        class R(self.IOBase):
588            def readline(self):
589                return None
590        self.assertRaises((TypeError, StopIteration), next, R())
591
592    def test_next_nonsizeable(self):
593        # Issue #30061
594        # Crash when __next__() returns an object without __len__
595        class R(self.IOBase):
596            def __next__(self):
597                return None
598        self.assertRaises(TypeError, R().readlines, 1)
599
600    def test_raw_bytes_io(self):
601        f = self.BytesIO()
602        self.write_ops(f)
603        data = f.getvalue()
604        self.assertEqual(data, b"hello world\n")
605        f = self.BytesIO(data)
606        self.read_ops(f, True)
607
608    def test_large_file_ops(self):
609        # On Windows and Mac OSX this test consumes large resources; It takes
610        # a long time to build the >2 GiB file and takes >2 GiB of disk space
611        # therefore the resource must be enabled to run this test.
612        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
613            support.requires(
614                'largefile',
615                'test requires %s bytes and a long time to run' % self.LARGE)
616        with self.open(os_helper.TESTFN, "w+b", 0) as f:
617            self.large_file_ops(f)
618        with self.open(os_helper.TESTFN, "w+b") as f:
619            self.large_file_ops(f)
620
621    def test_with_open(self):
622        for bufsize in (0, 100):
623            f = None
624            with self.open(os_helper.TESTFN, "wb", bufsize) as f:
625                f.write(b"xxx")
626            self.assertEqual(f.closed, True)
627            f = None
628            try:
629                with self.open(os_helper.TESTFN, "wb", bufsize) as f:
630                    1/0
631            except ZeroDivisionError:
632                self.assertEqual(f.closed, True)
633            else:
634                self.fail("1/0 didn't raise an exception")
635
636    # issue 5008
637    def test_append_mode_tell(self):
638        with self.open(os_helper.TESTFN, "wb") as f:
639            f.write(b"xxx")
640        with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
641            self.assertEqual(f.tell(), 3)
642        with self.open(os_helper.TESTFN, "ab") as f:
643            self.assertEqual(f.tell(), 3)
644        with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f:
645            self.assertGreater(f.tell(), 0)
646
647    def test_destructor(self):
648        record = []
649        class MyFileIO(self.FileIO):
650            def __del__(self):
651                record.append(1)
652                try:
653                    f = super().__del__
654                except AttributeError:
655                    pass
656                else:
657                    f()
658            def close(self):
659                record.append(2)
660                super().close()
661            def flush(self):
662                record.append(3)
663                super().flush()
664        with warnings_helper.check_warnings(('', ResourceWarning)):
665            f = MyFileIO(os_helper.TESTFN, "wb")
666            f.write(b"xxx")
667            del f
668            support.gc_collect()
669            self.assertEqual(record, [1, 2, 3])
670            with self.open(os_helper.TESTFN, "rb") as f:
671                self.assertEqual(f.read(), b"xxx")
672
673    def _check_base_destructor(self, base):
674        record = []
675        class MyIO(base):
676            def __init__(self):
677                # This exercises the availability of attributes on object
678                # destruction.
679                # (in the C version, close() is called by the tp_dealloc
680                # function, not by __del__)
681                self.on_del = 1
682                self.on_close = 2
683                self.on_flush = 3
684            def __del__(self):
685                record.append(self.on_del)
686                try:
687                    f = super().__del__
688                except AttributeError:
689                    pass
690                else:
691                    f()
692            def close(self):
693                record.append(self.on_close)
694                super().close()
695            def flush(self):
696                record.append(self.on_flush)
697                super().flush()
698        f = MyIO()
699        del f
700        support.gc_collect()
701        self.assertEqual(record, [1, 2, 3])
702
703    def test_IOBase_destructor(self):
704        self._check_base_destructor(self.IOBase)
705
706    def test_RawIOBase_destructor(self):
707        self._check_base_destructor(self.RawIOBase)
708
709    def test_BufferedIOBase_destructor(self):
710        self._check_base_destructor(self.BufferedIOBase)
711
712    def test_TextIOBase_destructor(self):
713        self._check_base_destructor(self.TextIOBase)
714
715    def test_close_flushes(self):
716        with self.open(os_helper.TESTFN, "wb") as f:
717            f.write(b"xxx")
718        with self.open(os_helper.TESTFN, "rb") as f:
719            self.assertEqual(f.read(), b"xxx")
720
721    def test_array_writes(self):
722        a = array.array('i', range(10))
723        n = len(a.tobytes())
724        def check(f):
725            with f:
726                self.assertEqual(f.write(a), n)
727                f.writelines((a,))
728        check(self.BytesIO())
729        check(self.FileIO(os_helper.TESTFN, "w"))
730        check(self.BufferedWriter(self.MockRawIO()))
731        check(self.BufferedRandom(self.MockRawIO()))
732        check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
733
734    def test_closefd(self):
735        self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
736                          encoding="utf-8", closefd=False)
737
738    def test_read_closed(self):
739        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
740            f.write("egg\n")
741        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
742            file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
743            self.assertEqual(file.read(), "egg\n")
744            file.seek(0)
745            file.close()
746            self.assertRaises(ValueError, file.read)
747        with self.open(os_helper.TESTFN, "rb") as f:
748            file = self.open(f.fileno(), "rb", closefd=False)
749            self.assertEqual(file.read()[:3], b"egg")
750            file.close()
751            self.assertRaises(ValueError, file.readinto, bytearray(1))
752
753    def test_no_closefd_with_filename(self):
754        # can't use closefd in combination with a file name
755        self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r",
756                          encoding="utf-8", closefd=False)
757
758    def test_closefd_attr(self):
759        with self.open(os_helper.TESTFN, "wb") as f:
760            f.write(b"egg\n")
761        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
762            self.assertEqual(f.buffer.raw.closefd, True)
763            file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
764            self.assertEqual(file.buffer.raw.closefd, False)
765
766    def test_garbage_collection(self):
767        # FileIO objects are collected, and collecting them flushes
768        # all data to disk.
769        with warnings_helper.check_warnings(('', ResourceWarning)):
770            f = self.FileIO(os_helper.TESTFN, "wb")
771            f.write(b"abcxxx")
772            f.f = f
773            wr = weakref.ref(f)
774            del f
775            support.gc_collect()
776        self.assertIsNone(wr(), wr)
777        with self.open(os_helper.TESTFN, "rb") as f:
778            self.assertEqual(f.read(), b"abcxxx")
779
780    def test_unbounded_file(self):
781        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
782        zero = "/dev/zero"
783        if not os.path.exists(zero):
784            self.skipTest("{0} does not exist".format(zero))
785        if sys.maxsize > 0x7FFFFFFF:
786            self.skipTest("test can only run in a 32-bit address space")
787        if support.real_max_memuse < support._2G:
788            self.skipTest("test requires at least 2 GiB of memory")
789        with self.open(zero, "rb", buffering=0) as f:
790            self.assertRaises(OverflowError, f.read)
791        with self.open(zero, "rb") as f:
792            self.assertRaises(OverflowError, f.read)
793        with self.open(zero, "r") as f:
794            self.assertRaises(OverflowError, f.read)
795
796    def check_flush_error_on_close(self, *args, **kwargs):
797        # Test that the file is closed despite failed flush
798        # and that flush() is called before file closed.
799        f = self.open(*args, **kwargs)
800        closed = []
801        def bad_flush():
802            closed[:] = [f.closed]
803            raise OSError()
804        f.flush = bad_flush
805        self.assertRaises(OSError, f.close) # exception not swallowed
806        self.assertTrue(f.closed)
807        self.assertTrue(closed)      # flush() called
808        self.assertFalse(closed[0])  # flush() called before file closed
809        f.flush = lambda: None  # break reference loop
810
811    def test_flush_error_on_close(self):
812        # raw file
813        # Issue #5700: io.FileIO calls flush() after file closed
814        self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
815        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
816        self.check_flush_error_on_close(fd, 'wb', buffering=0)
817        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
818        self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
819        os.close(fd)
820        # buffered io
821        self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
822        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
823        self.check_flush_error_on_close(fd, 'wb')
824        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
825        self.check_flush_error_on_close(fd, 'wb', closefd=False)
826        os.close(fd)
827        # text io
828        self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8")
829        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
830        self.check_flush_error_on_close(fd, 'w', encoding="utf-8")
831        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
832        self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False)
833        os.close(fd)
834
835    def test_multi_close(self):
836        f = self.open(os_helper.TESTFN, "wb", buffering=0)
837        f.close()
838        f.close()
839        f.close()
840        self.assertRaises(ValueError, f.flush)
841
842    def test_RawIOBase_read(self):
843        # Exercise the default limited RawIOBase.read(n) implementation (which
844        # calls readinto() internally).
845        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
846        self.assertEqual(rawio.read(2), b"ab")
847        self.assertEqual(rawio.read(2), b"c")
848        self.assertEqual(rawio.read(2), b"d")
849        self.assertEqual(rawio.read(2), None)
850        self.assertEqual(rawio.read(2), b"ef")
851        self.assertEqual(rawio.read(2), b"g")
852        self.assertEqual(rawio.read(2), None)
853        self.assertEqual(rawio.read(2), b"")
854
855    def test_types_have_dict(self):
856        test = (
857            self.IOBase(),
858            self.RawIOBase(),
859            self.TextIOBase(),
860            self.StringIO(),
861            self.BytesIO()
862        )
863        for obj in test:
864            self.assertTrue(hasattr(obj, "__dict__"))
865
866    def test_opener(self):
867        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
868            f.write("egg\n")
869        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
870        def opener(path, flags):
871            return fd
872        with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f:
873            self.assertEqual(f.read(), "egg\n")
874
875    def test_bad_opener_negative_1(self):
876        # Issue #27066.
877        def badopener(fname, flags):
878            return -1
879        with self.assertRaises(ValueError) as cm:
880            open('non-existent', 'r', opener=badopener)
881        self.assertEqual(str(cm.exception), 'opener returned -1')
882
883    def test_bad_opener_other_negative(self):
884        # Issue #27066.
885        def badopener(fname, flags):
886            return -2
887        with self.assertRaises(ValueError) as cm:
888            open('non-existent', 'r', opener=badopener)
889        self.assertEqual(str(cm.exception), 'opener returned -2')
890
891    def test_opener_invalid_fd(self):
892        # Check that OSError is raised with error code EBADF if the
893        # opener returns an invalid file descriptor (see gh-82212).
894        fd = os_helper.make_bad_fd()
895        with self.assertRaises(OSError) as cm:
896            self.open('foo', opener=lambda name, flags: fd)
897        self.assertEqual(cm.exception.errno, errno.EBADF)
898
899    def test_fileio_closefd(self):
900        # Issue #4841
901        with self.open(__file__, 'rb') as f1, \
902             self.open(__file__, 'rb') as f2:
903            fileio = self.FileIO(f1.fileno(), closefd=False)
904            # .__init__() must not close f1
905            fileio.__init__(f2.fileno(), closefd=False)
906            f1.readline()
907            # .close() must not close f2
908            fileio.close()
909            f2.readline()
910
911    def test_nonbuffered_textio(self):
912        with warnings_helper.check_no_resource_warning(self):
913            with self.assertRaises(ValueError):
914                self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0)
915
916    def test_invalid_newline(self):
917        with warnings_helper.check_no_resource_warning(self):
918            with self.assertRaises(ValueError):
919                self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid')
920
921    def test_buffered_readinto_mixin(self):
922        # Test the implementation provided by BufferedIOBase
923        class Stream(self.BufferedIOBase):
924            def read(self, size):
925                return b"12345"
926            read1 = read
927        stream = Stream()
928        for method in ("readinto", "readinto1"):
929            with self.subTest(method):
930                buffer = byteslike(5)
931                self.assertEqual(getattr(stream, method)(buffer), 5)
932                self.assertEqual(bytes(buffer), b"12345")
933
934    def test_fspath_support(self):
935        def check_path_succeeds(path):
936            with self.open(path, "w", encoding="utf-8") as f:
937                f.write("egg\n")
938
939            with self.open(path, "r", encoding="utf-8") as f:
940                self.assertEqual(f.read(), "egg\n")
941
942        check_path_succeeds(FakePath(os_helper.TESTFN))
943        check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
944
945        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
946            bad_path = FakePath(f.fileno())
947            with self.assertRaises(TypeError):
948                self.open(bad_path, 'w', encoding="utf-8")
949
950        bad_path = FakePath(None)
951        with self.assertRaises(TypeError):
952            self.open(bad_path, 'w', encoding="utf-8")
953
954        bad_path = FakePath(FloatingPointError)
955        with self.assertRaises(FloatingPointError):
956            self.open(bad_path, 'w', encoding="utf-8")
957
958        # ensure that refcounting is correct with some error conditions
959        with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
960            self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8")
961
962    def test_RawIOBase_readall(self):
963        # Exercise the default unlimited RawIOBase.read() and readall()
964        # implementations.
965        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
966        self.assertEqual(rawio.read(), b"abcdefg")
967        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
968        self.assertEqual(rawio.readall(), b"abcdefg")
969
970    def test_BufferedIOBase_readinto(self):
971        # Exercise the default BufferedIOBase.readinto() and readinto1()
972        # implementations (which call read() or read1() internally).
973        class Reader(self.BufferedIOBase):
974            def __init__(self, avail):
975                self.avail = avail
976            def read(self, size):
977                result = self.avail[:size]
978                self.avail = self.avail[size:]
979                return result
980            def read1(self, size):
981                """Returns no more than 5 bytes at once"""
982                return self.read(min(size, 5))
983        tests = (
984            # (test method, total data available, read buffer size, expected
985            #     read size)
986            ("readinto", 10, 5, 5),
987            ("readinto", 10, 6, 6),  # More than read1() can return
988            ("readinto", 5, 6, 5),  # Buffer larger than total available
989            ("readinto", 6, 7, 6),
990            ("readinto", 10, 0, 0),  # Empty buffer
991            ("readinto1", 10, 5, 5),  # Result limited to single read1() call
992            ("readinto1", 10, 6, 5),  # Buffer larger than read1() can return
993            ("readinto1", 5, 6, 5),  # Buffer larger than total available
994            ("readinto1", 6, 7, 5),
995            ("readinto1", 10, 0, 0),  # Empty buffer
996        )
997        UNUSED_BYTE = 0x81
998        for test in tests:
999            with self.subTest(test):
1000                method, avail, request, result = test
1001                reader = Reader(bytes(range(avail)))
1002                buffer = bytearray((UNUSED_BYTE,) * request)
1003                method = getattr(reader, method)
1004                self.assertEqual(method(buffer), result)
1005                self.assertEqual(len(buffer), request)
1006                self.assertSequenceEqual(buffer[:result], range(result))
1007                unused = (UNUSED_BYTE,) * (request - result)
1008                self.assertSequenceEqual(buffer[result:], unused)
1009                self.assertEqual(len(reader.avail), avail - result)
1010
1011    def test_close_assert(self):
1012        class R(self.IOBase):
1013            def __setattr__(self, name, value):
1014                pass
1015            def flush(self):
1016                raise OSError()
1017        f = R()
1018        # This would cause an assertion failure.
1019        self.assertRaises(OSError, f.close)
1020
1021        # Silence destructor error
1022        R.flush = lambda self: None
1023
1024
1025class CIOTest(IOTest):
1026
1027    def test_IOBase_finalize(self):
1028        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1029        # class which inherits IOBase and an object of this class are caught
1030        # in a reference cycle and close() is already in the method cache.
1031        class MyIO(self.IOBase):
1032            def close(self):
1033                pass
1034
1035        # create an instance to populate the method cache
1036        MyIO()
1037        obj = MyIO()
1038        obj.obj = obj
1039        wr = weakref.ref(obj)
1040        del MyIO
1041        del obj
1042        support.gc_collect()
1043        self.assertIsNone(wr(), wr)
1044
1045class PyIOTest(IOTest):
1046    pass
1047
1048
1049@support.cpython_only
1050class APIMismatchTest(unittest.TestCase):
1051
1052    def test_RawIOBase_io_in_pyio_match(self):
1053        """Test that pyio RawIOBase class has all c RawIOBase methods"""
1054        mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1055                                               ignore=('__weakref__',))
1056        self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1057
1058    def test_RawIOBase_pyio_in_io_match(self):
1059        """Test that c RawIOBase class has all pyio RawIOBase methods"""
1060        mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1061        self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1062
1063
1064class CommonBufferedTests:
1065    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1066
1067    def test_detach(self):
1068        raw = self.MockRawIO()
1069        buf = self.tp(raw)
1070        self.assertIs(buf.detach(), raw)
1071        self.assertRaises(ValueError, buf.detach)
1072
1073        repr(buf)  # Should still work
1074
1075    def test_fileno(self):
1076        rawio = self.MockRawIO()
1077        bufio = self.tp(rawio)
1078
1079        self.assertEqual(42, bufio.fileno())
1080
1081    def test_invalid_args(self):
1082        rawio = self.MockRawIO()
1083        bufio = self.tp(rawio)
1084        # Invalid whence
1085        self.assertRaises(ValueError, bufio.seek, 0, -1)
1086        self.assertRaises(ValueError, bufio.seek, 0, 9)
1087
1088    def test_override_destructor(self):
1089        tp = self.tp
1090        record = []
1091        class MyBufferedIO(tp):
1092            def __del__(self):
1093                record.append(1)
1094                try:
1095                    f = super().__del__
1096                except AttributeError:
1097                    pass
1098                else:
1099                    f()
1100            def close(self):
1101                record.append(2)
1102                super().close()
1103            def flush(self):
1104                record.append(3)
1105                super().flush()
1106        rawio = self.MockRawIO()
1107        bufio = MyBufferedIO(rawio)
1108        del bufio
1109        support.gc_collect()
1110        self.assertEqual(record, [1, 2, 3])
1111
1112    def test_context_manager(self):
1113        # Test usability as a context manager
1114        rawio = self.MockRawIO()
1115        bufio = self.tp(rawio)
1116        def _with():
1117            with bufio:
1118                pass
1119        _with()
1120        # bufio should now be closed, and using it a second time should raise
1121        # a ValueError.
1122        self.assertRaises(ValueError, _with)
1123
1124    def test_error_through_destructor(self):
1125        # Test that the exception state is not modified by a destructor,
1126        # even if close() fails.
1127        rawio = self.CloseFailureIO()
1128        with support.catch_unraisable_exception() as cm:
1129            with self.assertRaises(AttributeError):
1130                self.tp(rawio).xyzzy
1131
1132            if not IOBASE_EMITS_UNRAISABLE:
1133                self.assertIsNone(cm.unraisable)
1134            elif cm.unraisable is not None:
1135                self.assertEqual(cm.unraisable.exc_type, OSError)
1136
1137    def test_repr(self):
1138        raw = self.MockRawIO()
1139        b = self.tp(raw)
1140        clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1141        self.assertRegex(repr(b), "<%s>" % clsname)
1142        raw.name = "dummy"
1143        self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
1144        raw.name = b"dummy"
1145        self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
1146
1147    def test_recursive_repr(self):
1148        # Issue #25455
1149        raw = self.MockRawIO()
1150        b = self.tp(raw)
1151        with support.swap_attr(raw, 'name', b):
1152            try:
1153                repr(b)  # Should not crash
1154            except RuntimeError:
1155                pass
1156
1157    def test_flush_error_on_close(self):
1158        # Test that buffered file is closed despite failed flush
1159        # and that flush() is called before file closed.
1160        raw = self.MockRawIO()
1161        closed = []
1162        def bad_flush():
1163            closed[:] = [b.closed, raw.closed]
1164            raise OSError()
1165        raw.flush = bad_flush
1166        b = self.tp(raw)
1167        self.assertRaises(OSError, b.close) # exception not swallowed
1168        self.assertTrue(b.closed)
1169        self.assertTrue(raw.closed)
1170        self.assertTrue(closed)      # flush() called
1171        self.assertFalse(closed[0])  # flush() called before file closed
1172        self.assertFalse(closed[1])
1173        raw.flush = lambda: None  # break reference loop
1174
1175    def test_close_error_on_close(self):
1176        raw = self.MockRawIO()
1177        def bad_flush():
1178            raise OSError('flush')
1179        def bad_close():
1180            raise OSError('close')
1181        raw.close = bad_close
1182        b = self.tp(raw)
1183        b.flush = bad_flush
1184        with self.assertRaises(OSError) as err: # exception not swallowed
1185            b.close()
1186        self.assertEqual(err.exception.args, ('close',))
1187        self.assertIsInstance(err.exception.__context__, OSError)
1188        self.assertEqual(err.exception.__context__.args, ('flush',))
1189        self.assertFalse(b.closed)
1190
1191        # Silence destructor error
1192        raw.close = lambda: None
1193        b.flush = lambda: None
1194
1195    def test_nonnormalized_close_error_on_close(self):
1196        # Issue #21677
1197        raw = self.MockRawIO()
1198        def bad_flush():
1199            raise non_existing_flush
1200        def bad_close():
1201            raise non_existing_close
1202        raw.close = bad_close
1203        b = self.tp(raw)
1204        b.flush = bad_flush
1205        with self.assertRaises(NameError) as err: # exception not swallowed
1206            b.close()
1207        self.assertIn('non_existing_close', str(err.exception))
1208        self.assertIsInstance(err.exception.__context__, NameError)
1209        self.assertIn('non_existing_flush', str(err.exception.__context__))
1210        self.assertFalse(b.closed)
1211
1212        # Silence destructor error
1213        b.flush = lambda: None
1214        raw.close = lambda: None
1215
1216    def test_multi_close(self):
1217        raw = self.MockRawIO()
1218        b = self.tp(raw)
1219        b.close()
1220        b.close()
1221        b.close()
1222        self.assertRaises(ValueError, b.flush)
1223
1224    def test_unseekable(self):
1225        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1226        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1227        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1228
1229    def test_readonly_attributes(self):
1230        raw = self.MockRawIO()
1231        buf = self.tp(raw)
1232        x = self.MockRawIO()
1233        with self.assertRaises(AttributeError):
1234            buf.raw = x
1235
1236
1237class SizeofTest:
1238
1239    @support.cpython_only
1240    def test_sizeof(self):
1241        bufsize1 = 4096
1242        bufsize2 = 8192
1243        rawio = self.MockRawIO()
1244        bufio = self.tp(rawio, buffer_size=bufsize1)
1245        size = sys.getsizeof(bufio) - bufsize1
1246        rawio = self.MockRawIO()
1247        bufio = self.tp(rawio, buffer_size=bufsize2)
1248        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1249
1250    @support.cpython_only
1251    def test_buffer_freeing(self) :
1252        bufsize = 4096
1253        rawio = self.MockRawIO()
1254        bufio = self.tp(rawio, buffer_size=bufsize)
1255        size = sys.getsizeof(bufio) - bufsize
1256        bufio.close()
1257        self.assertEqual(sys.getsizeof(bufio), size)
1258
1259class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1260    read_mode = "rb"
1261
1262    def test_constructor(self):
1263        rawio = self.MockRawIO([b"abc"])
1264        bufio = self.tp(rawio)
1265        bufio.__init__(rawio)
1266        bufio.__init__(rawio, buffer_size=1024)
1267        bufio.__init__(rawio, buffer_size=16)
1268        self.assertEqual(b"abc", bufio.read())
1269        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1270        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1271        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1272        rawio = self.MockRawIO([b"abc"])
1273        bufio.__init__(rawio)
1274        self.assertEqual(b"abc", bufio.read())
1275
1276    def test_uninitialized(self):
1277        bufio = self.tp.__new__(self.tp)
1278        del bufio
1279        bufio = self.tp.__new__(self.tp)
1280        self.assertRaisesRegex((ValueError, AttributeError),
1281                               'uninitialized|has no attribute',
1282                               bufio.read, 0)
1283        bufio.__init__(self.MockRawIO())
1284        self.assertEqual(bufio.read(0), b'')
1285
1286    def test_read(self):
1287        for arg in (None, 7):
1288            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1289            bufio = self.tp(rawio)
1290            self.assertEqual(b"abcdefg", bufio.read(arg))
1291        # Invalid args
1292        self.assertRaises(ValueError, bufio.read, -2)
1293
1294    def test_read1(self):
1295        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1296        bufio = self.tp(rawio)
1297        self.assertEqual(b"a", bufio.read(1))
1298        self.assertEqual(b"b", bufio.read1(1))
1299        self.assertEqual(rawio._reads, 1)
1300        self.assertEqual(b"", bufio.read1(0))
1301        self.assertEqual(b"c", bufio.read1(100))
1302        self.assertEqual(rawio._reads, 1)
1303        self.assertEqual(b"d", bufio.read1(100))
1304        self.assertEqual(rawio._reads, 2)
1305        self.assertEqual(b"efg", bufio.read1(100))
1306        self.assertEqual(rawio._reads, 3)
1307        self.assertEqual(b"", bufio.read1(100))
1308        self.assertEqual(rawio._reads, 4)
1309
1310    def test_read1_arbitrary(self):
1311        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1312        bufio = self.tp(rawio)
1313        self.assertEqual(b"a", bufio.read(1))
1314        self.assertEqual(b"bc", bufio.read1())
1315        self.assertEqual(b"d", bufio.read1())
1316        self.assertEqual(b"efg", bufio.read1(-1))
1317        self.assertEqual(rawio._reads, 3)
1318        self.assertEqual(b"", bufio.read1())
1319        self.assertEqual(rawio._reads, 4)
1320
1321    def test_readinto(self):
1322        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1323        bufio = self.tp(rawio)
1324        b = bytearray(2)
1325        self.assertEqual(bufio.readinto(b), 2)
1326        self.assertEqual(b, b"ab")
1327        self.assertEqual(bufio.readinto(b), 2)
1328        self.assertEqual(b, b"cd")
1329        self.assertEqual(bufio.readinto(b), 2)
1330        self.assertEqual(b, b"ef")
1331        self.assertEqual(bufio.readinto(b), 1)
1332        self.assertEqual(b, b"gf")
1333        self.assertEqual(bufio.readinto(b), 0)
1334        self.assertEqual(b, b"gf")
1335        rawio = self.MockRawIO((b"abc", None))
1336        bufio = self.tp(rawio)
1337        self.assertEqual(bufio.readinto(b), 2)
1338        self.assertEqual(b, b"ab")
1339        self.assertEqual(bufio.readinto(b), 1)
1340        self.assertEqual(b, b"cb")
1341
1342    def test_readinto1(self):
1343        buffer_size = 10
1344        rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1345        bufio = self.tp(rawio, buffer_size=buffer_size)
1346        b = bytearray(2)
1347        self.assertEqual(bufio.peek(3), b'abc')
1348        self.assertEqual(rawio._reads, 1)
1349        self.assertEqual(bufio.readinto1(b), 2)
1350        self.assertEqual(b, b"ab")
1351        self.assertEqual(rawio._reads, 1)
1352        self.assertEqual(bufio.readinto1(b), 1)
1353        self.assertEqual(b[:1], b"c")
1354        self.assertEqual(rawio._reads, 1)
1355        self.assertEqual(bufio.readinto1(b), 2)
1356        self.assertEqual(b, b"de")
1357        self.assertEqual(rawio._reads, 2)
1358        b = bytearray(2*buffer_size)
1359        self.assertEqual(bufio.peek(3), b'fgh')
1360        self.assertEqual(rawio._reads, 3)
1361        self.assertEqual(bufio.readinto1(b), 6)
1362        self.assertEqual(b[:6], b"fghjkl")
1363        self.assertEqual(rawio._reads, 4)
1364
1365    def test_readinto_array(self):
1366        buffer_size = 60
1367        data = b"a" * 26
1368        rawio = self.MockRawIO((data,))
1369        bufio = self.tp(rawio, buffer_size=buffer_size)
1370
1371        # Create an array with element size > 1 byte
1372        b = array.array('i', b'x' * 32)
1373        assert len(b) != 16
1374
1375        # Read into it. We should get as many *bytes* as we can fit into b
1376        # (which is more than the number of elements)
1377        n = bufio.readinto(b)
1378        self.assertGreater(n, len(b))
1379
1380        # Check that old contents of b are preserved
1381        bm = memoryview(b).cast('B')
1382        self.assertLess(n, len(bm))
1383        self.assertEqual(bm[:n], data[:n])
1384        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1385
1386    def test_readinto1_array(self):
1387        buffer_size = 60
1388        data = b"a" * 26
1389        rawio = self.MockRawIO((data,))
1390        bufio = self.tp(rawio, buffer_size=buffer_size)
1391
1392        # Create an array with element size > 1 byte
1393        b = array.array('i', b'x' * 32)
1394        assert len(b) != 16
1395
1396        # Read into it. We should get as many *bytes* as we can fit into b
1397        # (which is more than the number of elements)
1398        n = bufio.readinto1(b)
1399        self.assertGreater(n, len(b))
1400
1401        # Check that old contents of b are preserved
1402        bm = memoryview(b).cast('B')
1403        self.assertLess(n, len(bm))
1404        self.assertEqual(bm[:n], data[:n])
1405        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1406
1407    def test_readlines(self):
1408        def bufio():
1409            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1410            return self.tp(rawio)
1411        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1412        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1413        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
1414
1415    def test_buffering(self):
1416        data = b"abcdefghi"
1417        dlen = len(data)
1418
1419        tests = [
1420            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1421            [ 100, [ 3, 3, 3],     [ dlen ]    ],
1422            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1423        ]
1424
1425        for bufsize, buf_read_sizes, raw_read_sizes in tests:
1426            rawio = self.MockFileIO(data)
1427            bufio = self.tp(rawio, buffer_size=bufsize)
1428            pos = 0
1429            for nbytes in buf_read_sizes:
1430                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
1431                pos += nbytes
1432            # this is mildly implementation-dependent
1433            self.assertEqual(rawio.read_history, raw_read_sizes)
1434
1435    def test_read_non_blocking(self):
1436        # Inject some None's in there to simulate EWOULDBLOCK
1437        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1438        bufio = self.tp(rawio)
1439        self.assertEqual(b"abcd", bufio.read(6))
1440        self.assertEqual(b"e", bufio.read(1))
1441        self.assertEqual(b"fg", bufio.read())
1442        self.assertEqual(b"", bufio.peek(1))
1443        self.assertIsNone(bufio.read())
1444        self.assertEqual(b"", bufio.read())
1445
1446        rawio = self.MockRawIO((b"a", None, None))
1447        self.assertEqual(b"a", rawio.readall())
1448        self.assertIsNone(rawio.readall())
1449
1450    def test_read_past_eof(self):
1451        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1452        bufio = self.tp(rawio)
1453
1454        self.assertEqual(b"abcdefg", bufio.read(9000))
1455
1456    def test_read_all(self):
1457        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1458        bufio = self.tp(rawio)
1459
1460        self.assertEqual(b"abcdefg", bufio.read())
1461
1462    @support.requires_resource('cpu')
1463    @threading_helper.requires_working_threading()
1464    def test_threads(self):
1465        try:
1466            # Write out many bytes with exactly the same number of 0's,
1467            # 1's... 255's. This will help us check that concurrent reading
1468            # doesn't duplicate or forget contents.
1469            N = 1000
1470            l = list(range(256)) * N
1471            random.shuffle(l)
1472            s = bytes(bytearray(l))
1473            with self.open(os_helper.TESTFN, "wb") as f:
1474                f.write(s)
1475            with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
1476                bufio = self.tp(raw, 8)
1477                errors = []
1478                results = []
1479                def f():
1480                    try:
1481                        # Intra-buffer read then buffer-flushing read
1482                        for n in cycle([1, 19]):
1483                            s = bufio.read(n)
1484                            if not s:
1485                                break
1486                            # list.append() is atomic
1487                            results.append(s)
1488                    except Exception as e:
1489                        errors.append(e)
1490                        raise
1491                threads = [threading.Thread(target=f) for x in range(20)]
1492                with threading_helper.start_threads(threads):
1493                    time.sleep(0.02) # yield
1494                self.assertFalse(errors,
1495                    "the following exceptions were caught: %r" % errors)
1496                s = b''.join(results)
1497                for i in range(256):
1498                    c = bytes(bytearray([i]))
1499                    self.assertEqual(s.count(c), N)
1500        finally:
1501            os_helper.unlink(os_helper.TESTFN)
1502
1503    def test_unseekable(self):
1504        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1505        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1506        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1507        bufio.read(1)
1508        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1509        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1510
1511    def test_misbehaved_io(self):
1512        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1513        bufio = self.tp(rawio)
1514        self.assertRaises(OSError, bufio.seek, 0)
1515        self.assertRaises(OSError, bufio.tell)
1516
1517        # Silence destructor error
1518        bufio.close = lambda: None
1519
1520    def test_no_extraneous_read(self):
1521        # Issue #9550; when the raw IO object has satisfied the read request,
1522        # we should not issue any additional reads, otherwise it may block
1523        # (e.g. socket).
1524        bufsize = 16
1525        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1526            rawio = self.MockRawIO([b"x" * n])
1527            bufio = self.tp(rawio, bufsize)
1528            self.assertEqual(bufio.read(n), b"x" * n)
1529            # Simple case: one raw read is enough to satisfy the request.
1530            self.assertEqual(rawio._extraneous_reads, 0,
1531                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1532            # A more complex case where two raw reads are needed to satisfy
1533            # the request.
1534            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1535            bufio = self.tp(rawio, bufsize)
1536            self.assertEqual(bufio.read(n), b"x" * n)
1537            self.assertEqual(rawio._extraneous_reads, 0,
1538                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1539
1540    def test_read_on_closed(self):
1541        # Issue #23796
1542        b = io.BufferedReader(io.BytesIO(b"12"))
1543        b.read(1)
1544        b.close()
1545        self.assertRaises(ValueError, b.peek)
1546        self.assertRaises(ValueError, b.read1, 1)
1547
1548    def test_truncate_on_read_only(self):
1549        rawio = self.MockFileIO(b"abc")
1550        bufio = self.tp(rawio)
1551        self.assertFalse(bufio.writable())
1552        self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1553        self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1554
1555
1556class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
1557    tp = io.BufferedReader
1558
1559    @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing "
1560                       "instead of returning NULL for malloc failure.")
1561    def test_constructor(self):
1562        BufferedReaderTest.test_constructor(self)
1563        # The allocation can succeed on 32-bit builds, e.g. with more
1564        # than 2 GiB RAM and a 64-bit kernel.
1565        if sys.maxsize > 0x7FFFFFFF:
1566            rawio = self.MockRawIO()
1567            bufio = self.tp(rawio)
1568            self.assertRaises((OverflowError, MemoryError, ValueError),
1569                bufio.__init__, rawio, sys.maxsize)
1570
1571    def test_initialization(self):
1572        rawio = self.MockRawIO([b"abc"])
1573        bufio = self.tp(rawio)
1574        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1575        self.assertRaises(ValueError, bufio.read)
1576        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1577        self.assertRaises(ValueError, bufio.read)
1578        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1579        self.assertRaises(ValueError, bufio.read)
1580
1581    def test_misbehaved_io_read(self):
1582        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1583        bufio = self.tp(rawio)
1584        # _pyio.BufferedReader seems to implement reading different, so that
1585        # checking this is not so easy.
1586        self.assertRaises(OSError, bufio.read, 10)
1587
1588    def test_garbage_collection(self):
1589        # C BufferedReader objects are collected.
1590        # The Python version has __del__, so it ends into gc.garbage instead
1591        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1592        with warnings_helper.check_warnings(('', ResourceWarning)):
1593            rawio = self.FileIO(os_helper.TESTFN, "w+b")
1594            f = self.tp(rawio)
1595            f.f = f
1596            wr = weakref.ref(f)
1597            del f
1598            support.gc_collect()
1599        self.assertIsNone(wr(), wr)
1600
1601    def test_args_error(self):
1602        # Issue #17275
1603        with self.assertRaisesRegex(TypeError, "BufferedReader"):
1604            self.tp(io.BytesIO(), 1024, 1024, 1024)
1605
1606    def test_bad_readinto_value(self):
1607        rawio = io.BufferedReader(io.BytesIO(b"12"))
1608        rawio.readinto = lambda buf: -1
1609        bufio = self.tp(rawio)
1610        with self.assertRaises(OSError) as cm:
1611            bufio.readline()
1612        self.assertIsNone(cm.exception.__cause__)
1613
1614    def test_bad_readinto_type(self):
1615        rawio = io.BufferedReader(io.BytesIO(b"12"))
1616        rawio.readinto = lambda buf: b''
1617        bufio = self.tp(rawio)
1618        with self.assertRaises(OSError) as cm:
1619            bufio.readline()
1620        self.assertIsInstance(cm.exception.__cause__, TypeError)
1621
1622
1623class PyBufferedReaderTest(BufferedReaderTest):
1624    tp = pyio.BufferedReader
1625
1626
1627class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1628    write_mode = "wb"
1629
1630    def test_constructor(self):
1631        rawio = self.MockRawIO()
1632        bufio = self.tp(rawio)
1633        bufio.__init__(rawio)
1634        bufio.__init__(rawio, buffer_size=1024)
1635        bufio.__init__(rawio, buffer_size=16)
1636        self.assertEqual(3, bufio.write(b"abc"))
1637        bufio.flush()
1638        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1639        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1640        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1641        bufio.__init__(rawio)
1642        self.assertEqual(3, bufio.write(b"ghi"))
1643        bufio.flush()
1644        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1645
1646    def test_uninitialized(self):
1647        bufio = self.tp.__new__(self.tp)
1648        del bufio
1649        bufio = self.tp.__new__(self.tp)
1650        self.assertRaisesRegex((ValueError, AttributeError),
1651                               'uninitialized|has no attribute',
1652                               bufio.write, b'')
1653        bufio.__init__(self.MockRawIO())
1654        self.assertEqual(bufio.write(b''), 0)
1655
1656    def test_detach_flush(self):
1657        raw = self.MockRawIO()
1658        buf = self.tp(raw)
1659        buf.write(b"howdy!")
1660        self.assertFalse(raw._write_stack)
1661        buf.detach()
1662        self.assertEqual(raw._write_stack, [b"howdy!"])
1663
1664    def test_write(self):
1665        # Write to the buffered IO but don't overflow the buffer.
1666        writer = self.MockRawIO()
1667        bufio = self.tp(writer, 8)
1668        bufio.write(b"abc")
1669        self.assertFalse(writer._write_stack)
1670        buffer = bytearray(b"def")
1671        bufio.write(buffer)
1672        buffer[:] = b"***"  # Overwrite our copy of the data
1673        bufio.flush()
1674        self.assertEqual(b"".join(writer._write_stack), b"abcdef")
1675
1676    def test_write_overflow(self):
1677        writer = self.MockRawIO()
1678        bufio = self.tp(writer, 8)
1679        contents = b"abcdefghijklmnop"
1680        for n in range(0, len(contents), 3):
1681            bufio.write(contents[n:n+3])
1682        flushed = b"".join(writer._write_stack)
1683        # At least (total - 8) bytes were implicitly flushed, perhaps more
1684        # depending on the implementation.
1685        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1686
1687    def check_writes(self, intermediate_func):
1688        # Lots of writes, test the flushed output is as expected.
1689        contents = bytes(range(256)) * 1000
1690        n = 0
1691        writer = self.MockRawIO()
1692        bufio = self.tp(writer, 13)
1693        # Generator of write sizes: repeat each N 15 times then proceed to N+1
1694        def gen_sizes():
1695            for size in count(1):
1696                for i in range(15):
1697                    yield size
1698        sizes = gen_sizes()
1699        while n < len(contents):
1700            size = min(next(sizes), len(contents) - n)
1701            self.assertEqual(bufio.write(contents[n:n+size]), size)
1702            intermediate_func(bufio)
1703            n += size
1704        bufio.flush()
1705        self.assertEqual(contents, b"".join(writer._write_stack))
1706
1707    def test_writes(self):
1708        self.check_writes(lambda bufio: None)
1709
1710    def test_writes_and_flushes(self):
1711        self.check_writes(lambda bufio: bufio.flush())
1712
1713    def test_writes_and_seeks(self):
1714        def _seekabs(bufio):
1715            pos = bufio.tell()
1716            bufio.seek(pos + 1, 0)
1717            bufio.seek(pos - 1, 0)
1718            bufio.seek(pos, 0)
1719        self.check_writes(_seekabs)
1720        def _seekrel(bufio):
1721            pos = bufio.seek(0, 1)
1722            bufio.seek(+1, 1)
1723            bufio.seek(-1, 1)
1724            bufio.seek(pos, 0)
1725        self.check_writes(_seekrel)
1726
1727    def test_writes_and_truncates(self):
1728        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1729
1730    def test_write_non_blocking(self):
1731        raw = self.MockNonBlockWriterIO()
1732        bufio = self.tp(raw, 8)
1733
1734        self.assertEqual(bufio.write(b"abcd"), 4)
1735        self.assertEqual(bufio.write(b"efghi"), 5)
1736        # 1 byte will be written, the rest will be buffered
1737        raw.block_on(b"k")
1738        self.assertEqual(bufio.write(b"jklmn"), 5)
1739
1740        # 8 bytes will be written, 8 will be buffered and the rest will be lost
1741        raw.block_on(b"0")
1742        try:
1743            bufio.write(b"opqrwxyz0123456789")
1744        except self.BlockingIOError as e:
1745            written = e.characters_written
1746        else:
1747            self.fail("BlockingIOError should have been raised")
1748        self.assertEqual(written, 16)
1749        self.assertEqual(raw.pop_written(),
1750            b"abcdefghijklmnopqrwxyz")
1751
1752        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1753        s = raw.pop_written()
1754        # Previously buffered bytes were flushed
1755        self.assertTrue(s.startswith(b"01234567A"), s)
1756
1757    def test_write_and_rewind(self):
1758        raw = io.BytesIO()
1759        bufio = self.tp(raw, 4)
1760        self.assertEqual(bufio.write(b"abcdef"), 6)
1761        self.assertEqual(bufio.tell(), 6)
1762        bufio.seek(0, 0)
1763        self.assertEqual(bufio.write(b"XY"), 2)
1764        bufio.seek(6, 0)
1765        self.assertEqual(raw.getvalue(), b"XYcdef")
1766        self.assertEqual(bufio.write(b"123456"), 6)
1767        bufio.flush()
1768        self.assertEqual(raw.getvalue(), b"XYcdef123456")
1769
1770    def test_flush(self):
1771        writer = self.MockRawIO()
1772        bufio = self.tp(writer, 8)
1773        bufio.write(b"abc")
1774        bufio.flush()
1775        self.assertEqual(b"abc", writer._write_stack[0])
1776
1777    def test_writelines(self):
1778        l = [b'ab', b'cd', b'ef']
1779        writer = self.MockRawIO()
1780        bufio = self.tp(writer, 8)
1781        bufio.writelines(l)
1782        bufio.flush()
1783        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1784
1785    def test_writelines_userlist(self):
1786        l = UserList([b'ab', b'cd', b'ef'])
1787        writer = self.MockRawIO()
1788        bufio = self.tp(writer, 8)
1789        bufio.writelines(l)
1790        bufio.flush()
1791        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1792
1793    def test_writelines_error(self):
1794        writer = self.MockRawIO()
1795        bufio = self.tp(writer, 8)
1796        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1797        self.assertRaises(TypeError, bufio.writelines, None)
1798        self.assertRaises(TypeError, bufio.writelines, 'abc')
1799
1800    def test_destructor(self):
1801        writer = self.MockRawIO()
1802        bufio = self.tp(writer, 8)
1803        bufio.write(b"abc")
1804        del bufio
1805        support.gc_collect()
1806        self.assertEqual(b"abc", writer._write_stack[0])
1807
1808    def test_truncate(self):
1809        # Truncate implicitly flushes the buffer.
1810        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1811        with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
1812            bufio = self.tp(raw, 8)
1813            bufio.write(b"abcdef")
1814            self.assertEqual(bufio.truncate(3), 3)
1815            self.assertEqual(bufio.tell(), 6)
1816        with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
1817            self.assertEqual(f.read(), b"abc")
1818
1819    def test_truncate_after_write(self):
1820        # Ensure that truncate preserves the file position after
1821        # writes longer than the buffer size.
1822        # Issue: https://bugs.python.org/issue32228
1823        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1824        with self.open(os_helper.TESTFN, "wb") as f:
1825            # Fill with some buffer
1826            f.write(b'\x00' * 10000)
1827        buffer_sizes = [8192, 4096, 200]
1828        for buffer_size in buffer_sizes:
1829            with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
1830                f.write(b'\x00' * (buffer_size + 1))
1831                # After write write_pos and write_end are set to 0
1832                f.read(1)
1833                # read operation makes sure that pos != raw_pos
1834                f.truncate()
1835                self.assertEqual(f.tell(), buffer_size + 2)
1836
1837    @support.requires_resource('cpu')
1838    @threading_helper.requires_working_threading()
1839    def test_threads(self):
1840        try:
1841            # Write out many bytes from many threads and test they were
1842            # all flushed.
1843            N = 1000
1844            contents = bytes(range(256)) * N
1845            sizes = cycle([1, 19])
1846            n = 0
1847            queue = deque()
1848            while n < len(contents):
1849                size = next(sizes)
1850                queue.append(contents[n:n+size])
1851                n += size
1852            del contents
1853            # We use a real file object because it allows us to
1854            # exercise situations where the GIL is released before
1855            # writing the buffer to the raw streams. This is in addition
1856            # to concurrency issues due to switching threads in the middle
1857            # of Python code.
1858            with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
1859                bufio = self.tp(raw, 8)
1860                errors = []
1861                def f():
1862                    try:
1863                        while True:
1864                            try:
1865                                s = queue.popleft()
1866                            except IndexError:
1867                                return
1868                            bufio.write(s)
1869                    except Exception as e:
1870                        errors.append(e)
1871                        raise
1872                threads = [threading.Thread(target=f) for x in range(20)]
1873                with threading_helper.start_threads(threads):
1874                    time.sleep(0.02) # yield
1875                self.assertFalse(errors,
1876                    "the following exceptions were caught: %r" % errors)
1877                bufio.close()
1878            with self.open(os_helper.TESTFN, "rb") as f:
1879                s = f.read()
1880            for i in range(256):
1881                self.assertEqual(s.count(bytes([i])), N)
1882        finally:
1883            os_helper.unlink(os_helper.TESTFN)
1884
1885    def test_misbehaved_io(self):
1886        rawio = self.MisbehavedRawIO()
1887        bufio = self.tp(rawio, 5)
1888        self.assertRaises(OSError, bufio.seek, 0)
1889        self.assertRaises(OSError, bufio.tell)
1890        self.assertRaises(OSError, bufio.write, b"abcdef")
1891
1892        # Silence destructor error
1893        bufio.close = lambda: None
1894
1895    def test_max_buffer_size_removal(self):
1896        with self.assertRaises(TypeError):
1897            self.tp(self.MockRawIO(), 8, 12)
1898
1899    def test_write_error_on_close(self):
1900        raw = self.MockRawIO()
1901        def bad_write(b):
1902            raise OSError()
1903        raw.write = bad_write
1904        b = self.tp(raw)
1905        b.write(b'spam')
1906        self.assertRaises(OSError, b.close) # exception not swallowed
1907        self.assertTrue(b.closed)
1908
1909    @threading_helper.requires_working_threading()
1910    def test_slow_close_from_thread(self):
1911        # Issue #31976
1912        rawio = self.SlowFlushRawIO()
1913        bufio = self.tp(rawio, 8)
1914        t = threading.Thread(target=bufio.close)
1915        t.start()
1916        rawio.in_flush.wait()
1917        self.assertRaises(ValueError, bufio.write, b'spam')
1918        self.assertTrue(bufio.closed)
1919        t.join()
1920
1921
1922
1923class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1924    tp = io.BufferedWriter
1925
1926    @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing "
1927                       "instead of returning NULL for malloc failure.")
1928    def test_constructor(self):
1929        BufferedWriterTest.test_constructor(self)
1930        # The allocation can succeed on 32-bit builds, e.g. with more
1931        # than 2 GiB RAM and a 64-bit kernel.
1932        if sys.maxsize > 0x7FFFFFFF:
1933            rawio = self.MockRawIO()
1934            bufio = self.tp(rawio)
1935            self.assertRaises((OverflowError, MemoryError, ValueError),
1936                bufio.__init__, rawio, sys.maxsize)
1937
1938    def test_initialization(self):
1939        rawio = self.MockRawIO()
1940        bufio = self.tp(rawio)
1941        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1942        self.assertRaises(ValueError, bufio.write, b"def")
1943        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1944        self.assertRaises(ValueError, bufio.write, b"def")
1945        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1946        self.assertRaises(ValueError, bufio.write, b"def")
1947
1948    def test_garbage_collection(self):
1949        # C BufferedWriter objects are collected, and collecting them flushes
1950        # all data to disk.
1951        # The Python version has __del__, so it ends into gc.garbage instead
1952        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1953        with warnings_helper.check_warnings(('', ResourceWarning)):
1954            rawio = self.FileIO(os_helper.TESTFN, "w+b")
1955            f = self.tp(rawio)
1956            f.write(b"123xxx")
1957            f.x = f
1958            wr = weakref.ref(f)
1959            del f
1960            support.gc_collect()
1961        self.assertIsNone(wr(), wr)
1962        with self.open(os_helper.TESTFN, "rb") as f:
1963            self.assertEqual(f.read(), b"123xxx")
1964
1965    def test_args_error(self):
1966        # Issue #17275
1967        with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1968            self.tp(io.BytesIO(), 1024, 1024, 1024)
1969
1970
1971class PyBufferedWriterTest(BufferedWriterTest):
1972    tp = pyio.BufferedWriter
1973
1974class BufferedRWPairTest(unittest.TestCase):
1975
1976    def test_constructor(self):
1977        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1978        self.assertFalse(pair.closed)
1979
1980    def test_uninitialized(self):
1981        pair = self.tp.__new__(self.tp)
1982        del pair
1983        pair = self.tp.__new__(self.tp)
1984        self.assertRaisesRegex((ValueError, AttributeError),
1985                               'uninitialized|has no attribute',
1986                               pair.read, 0)
1987        self.assertRaisesRegex((ValueError, AttributeError),
1988                               'uninitialized|has no attribute',
1989                               pair.write, b'')
1990        pair.__init__(self.MockRawIO(), self.MockRawIO())
1991        self.assertEqual(pair.read(0), b'')
1992        self.assertEqual(pair.write(b''), 0)
1993
1994    def test_detach(self):
1995        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1996        self.assertRaises(self.UnsupportedOperation, pair.detach)
1997
1998    def test_constructor_max_buffer_size_removal(self):
1999        with self.assertRaises(TypeError):
2000            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
2001
2002    def test_constructor_with_not_readable(self):
2003        class NotReadable(MockRawIO):
2004            def readable(self):
2005                return False
2006
2007        self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
2008
2009    def test_constructor_with_not_writeable(self):
2010        class NotWriteable(MockRawIO):
2011            def writable(self):
2012                return False
2013
2014        self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
2015
2016    def test_read(self):
2017        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2018
2019        self.assertEqual(pair.read(3), b"abc")
2020        self.assertEqual(pair.read(1), b"d")
2021        self.assertEqual(pair.read(), b"ef")
2022        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
2023        self.assertEqual(pair.read(None), b"abc")
2024
2025    def test_readlines(self):
2026        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
2027        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2028        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2029        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
2030
2031    def test_read1(self):
2032        # .read1() is delegated to the underlying reader object, so this test
2033        # can be shallow.
2034        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2035
2036        self.assertEqual(pair.read1(3), b"abc")
2037        self.assertEqual(pair.read1(), b"def")
2038
2039    def test_readinto(self):
2040        for method in ("readinto", "readinto1"):
2041            with self.subTest(method):
2042                pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2043
2044                data = byteslike(b'\0' * 5)
2045                self.assertEqual(getattr(pair, method)(data), 5)
2046                self.assertEqual(bytes(data), b"abcde")
2047
2048    def test_write(self):
2049        w = self.MockRawIO()
2050        pair = self.tp(self.MockRawIO(), w)
2051
2052        pair.write(b"abc")
2053        pair.flush()
2054        buffer = bytearray(b"def")
2055        pair.write(buffer)
2056        buffer[:] = b"***"  # Overwrite our copy of the data
2057        pair.flush()
2058        self.assertEqual(w._write_stack, [b"abc", b"def"])
2059
2060    def test_peek(self):
2061        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2062
2063        self.assertTrue(pair.peek(3).startswith(b"abc"))
2064        self.assertEqual(pair.read(3), b"abc")
2065
2066    def test_readable(self):
2067        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2068        self.assertTrue(pair.readable())
2069
2070    def test_writeable(self):
2071        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2072        self.assertTrue(pair.writable())
2073
2074    def test_seekable(self):
2075        # BufferedRWPairs are never seekable, even if their readers and writers
2076        # are.
2077        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2078        self.assertFalse(pair.seekable())
2079
2080    # .flush() is delegated to the underlying writer object and has been
2081    # tested in the test_write method.
2082
2083    def test_close_and_closed(self):
2084        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2085        self.assertFalse(pair.closed)
2086        pair.close()
2087        self.assertTrue(pair.closed)
2088
2089    def test_reader_close_error_on_close(self):
2090        def reader_close():
2091            reader_non_existing
2092        reader = self.MockRawIO()
2093        reader.close = reader_close
2094        writer = self.MockRawIO()
2095        pair = self.tp(reader, writer)
2096        with self.assertRaises(NameError) as err:
2097            pair.close()
2098        self.assertIn('reader_non_existing', str(err.exception))
2099        self.assertTrue(pair.closed)
2100        self.assertFalse(reader.closed)
2101        self.assertTrue(writer.closed)
2102
2103        # Silence destructor error
2104        reader.close = lambda: None
2105
2106    def test_writer_close_error_on_close(self):
2107        def writer_close():
2108            writer_non_existing
2109        reader = self.MockRawIO()
2110        writer = self.MockRawIO()
2111        writer.close = writer_close
2112        pair = self.tp(reader, writer)
2113        with self.assertRaises(NameError) as err:
2114            pair.close()
2115        self.assertIn('writer_non_existing', str(err.exception))
2116        self.assertFalse(pair.closed)
2117        self.assertTrue(reader.closed)
2118        self.assertFalse(writer.closed)
2119
2120        # Silence destructor error
2121        writer.close = lambda: None
2122        writer = None
2123
2124        # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
2125        with support.catch_unraisable_exception():
2126            # Ignore BufferedRWPair unraisable exception
2127            with support.catch_unraisable_exception():
2128                pair = None
2129                support.gc_collect()
2130            support.gc_collect()
2131
2132    def test_reader_writer_close_error_on_close(self):
2133        def reader_close():
2134            reader_non_existing
2135        def writer_close():
2136            writer_non_existing
2137        reader = self.MockRawIO()
2138        reader.close = reader_close
2139        writer = self.MockRawIO()
2140        writer.close = writer_close
2141        pair = self.tp(reader, writer)
2142        with self.assertRaises(NameError) as err:
2143            pair.close()
2144        self.assertIn('reader_non_existing', str(err.exception))
2145        self.assertIsInstance(err.exception.__context__, NameError)
2146        self.assertIn('writer_non_existing', str(err.exception.__context__))
2147        self.assertFalse(pair.closed)
2148        self.assertFalse(reader.closed)
2149        self.assertFalse(writer.closed)
2150
2151        # Silence destructor error
2152        reader.close = lambda: None
2153        writer.close = lambda: None
2154
2155    def test_isatty(self):
2156        class SelectableIsAtty(MockRawIO):
2157            def __init__(self, isatty):
2158                MockRawIO.__init__(self)
2159                self._isatty = isatty
2160
2161            def isatty(self):
2162                return self._isatty
2163
2164        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2165        self.assertFalse(pair.isatty())
2166
2167        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2168        self.assertTrue(pair.isatty())
2169
2170        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2171        self.assertTrue(pair.isatty())
2172
2173        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2174        self.assertTrue(pair.isatty())
2175
2176    def test_weakref_clearing(self):
2177        brw = self.tp(self.MockRawIO(), self.MockRawIO())
2178        ref = weakref.ref(brw)
2179        brw = None
2180        ref = None # Shouldn't segfault.
2181
2182class CBufferedRWPairTest(BufferedRWPairTest):
2183    tp = io.BufferedRWPair
2184
2185class PyBufferedRWPairTest(BufferedRWPairTest):
2186    tp = pyio.BufferedRWPair
2187
2188
2189class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2190    read_mode = "rb+"
2191    write_mode = "wb+"
2192
2193    def test_constructor(self):
2194        BufferedReaderTest.test_constructor(self)
2195        BufferedWriterTest.test_constructor(self)
2196
2197    def test_uninitialized(self):
2198        BufferedReaderTest.test_uninitialized(self)
2199        BufferedWriterTest.test_uninitialized(self)
2200
2201    def test_read_and_write(self):
2202        raw = self.MockRawIO((b"asdf", b"ghjk"))
2203        rw = self.tp(raw, 8)
2204
2205        self.assertEqual(b"as", rw.read(2))
2206        rw.write(b"ddd")
2207        rw.write(b"eee")
2208        self.assertFalse(raw._write_stack) # Buffer writes
2209        self.assertEqual(b"ghjk", rw.read())
2210        self.assertEqual(b"dddeee", raw._write_stack[0])
2211
2212    def test_seek_and_tell(self):
2213        raw = self.BytesIO(b"asdfghjkl")
2214        rw = self.tp(raw)
2215
2216        self.assertEqual(b"as", rw.read(2))
2217        self.assertEqual(2, rw.tell())
2218        rw.seek(0, 0)
2219        self.assertEqual(b"asdf", rw.read(4))
2220
2221        rw.write(b"123f")
2222        rw.seek(0, 0)
2223        self.assertEqual(b"asdf123fl", rw.read())
2224        self.assertEqual(9, rw.tell())
2225        rw.seek(-4, 2)
2226        self.assertEqual(5, rw.tell())
2227        rw.seek(2, 1)
2228        self.assertEqual(7, rw.tell())
2229        self.assertEqual(b"fl", rw.read(11))
2230        rw.flush()
2231        self.assertEqual(b"asdf123fl", raw.getvalue())
2232
2233        self.assertRaises(TypeError, rw.seek, 0.0)
2234
2235    def check_flush_and_read(self, read_func):
2236        raw = self.BytesIO(b"abcdefghi")
2237        bufio = self.tp(raw)
2238
2239        self.assertEqual(b"ab", read_func(bufio, 2))
2240        bufio.write(b"12")
2241        self.assertEqual(b"ef", read_func(bufio, 2))
2242        self.assertEqual(6, bufio.tell())
2243        bufio.flush()
2244        self.assertEqual(6, bufio.tell())
2245        self.assertEqual(b"ghi", read_func(bufio))
2246        raw.seek(0, 0)
2247        raw.write(b"XYZ")
2248        # flush() resets the read buffer
2249        bufio.flush()
2250        bufio.seek(0, 0)
2251        self.assertEqual(b"XYZ", read_func(bufio, 3))
2252
2253    def test_flush_and_read(self):
2254        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2255
2256    def test_flush_and_readinto(self):
2257        def _readinto(bufio, n=-1):
2258            b = bytearray(n if n >= 0 else 9999)
2259            n = bufio.readinto(b)
2260            return bytes(b[:n])
2261        self.check_flush_and_read(_readinto)
2262
2263    def test_flush_and_peek(self):
2264        def _peek(bufio, n=-1):
2265            # This relies on the fact that the buffer can contain the whole
2266            # raw stream, otherwise peek() can return less.
2267            b = bufio.peek(n)
2268            if n != -1:
2269                b = b[:n]
2270            bufio.seek(len(b), 1)
2271            return b
2272        self.check_flush_and_read(_peek)
2273
2274    def test_flush_and_write(self):
2275        raw = self.BytesIO(b"abcdefghi")
2276        bufio = self.tp(raw)
2277
2278        bufio.write(b"123")
2279        bufio.flush()
2280        bufio.write(b"45")
2281        bufio.flush()
2282        bufio.seek(0, 0)
2283        self.assertEqual(b"12345fghi", raw.getvalue())
2284        self.assertEqual(b"12345fghi", bufio.read())
2285
2286    def test_threads(self):
2287        BufferedReaderTest.test_threads(self)
2288        BufferedWriterTest.test_threads(self)
2289
2290    def test_writes_and_peek(self):
2291        def _peek(bufio):
2292            bufio.peek(1)
2293        self.check_writes(_peek)
2294        def _peek(bufio):
2295            pos = bufio.tell()
2296            bufio.seek(-1, 1)
2297            bufio.peek(1)
2298            bufio.seek(pos, 0)
2299        self.check_writes(_peek)
2300
2301    def test_writes_and_reads(self):
2302        def _read(bufio):
2303            bufio.seek(-1, 1)
2304            bufio.read(1)
2305        self.check_writes(_read)
2306
2307    def test_writes_and_read1s(self):
2308        def _read1(bufio):
2309            bufio.seek(-1, 1)
2310            bufio.read1(1)
2311        self.check_writes(_read1)
2312
2313    def test_writes_and_readintos(self):
2314        def _read(bufio):
2315            bufio.seek(-1, 1)
2316            bufio.readinto(bytearray(1))
2317        self.check_writes(_read)
2318
2319    def test_write_after_readahead(self):
2320        # Issue #6629: writing after the buffer was filled by readahead should
2321        # first rewind the raw stream.
2322        for overwrite_size in [1, 5]:
2323            raw = self.BytesIO(b"A" * 10)
2324            bufio = self.tp(raw, 4)
2325            # Trigger readahead
2326            self.assertEqual(bufio.read(1), b"A")
2327            self.assertEqual(bufio.tell(), 1)
2328            # Overwriting should rewind the raw stream if it needs so
2329            bufio.write(b"B" * overwrite_size)
2330            self.assertEqual(bufio.tell(), overwrite_size + 1)
2331            # If the write size was smaller than the buffer size, flush() and
2332            # check that rewind happens.
2333            bufio.flush()
2334            self.assertEqual(bufio.tell(), overwrite_size + 1)
2335            s = raw.getvalue()
2336            self.assertEqual(s,
2337                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2338
2339    def test_write_rewind_write(self):
2340        # Various combinations of reading / writing / seeking backwards / writing again
2341        def mutate(bufio, pos1, pos2):
2342            assert pos2 >= pos1
2343            # Fill the buffer
2344            bufio.seek(pos1)
2345            bufio.read(pos2 - pos1)
2346            bufio.write(b'\x02')
2347            # This writes earlier than the previous write, but still inside
2348            # the buffer.
2349            bufio.seek(pos1)
2350            bufio.write(b'\x01')
2351
2352        b = b"\x80\x81\x82\x83\x84"
2353        for i in range(0, len(b)):
2354            for j in range(i, len(b)):
2355                raw = self.BytesIO(b)
2356                bufio = self.tp(raw, 100)
2357                mutate(bufio, i, j)
2358                bufio.flush()
2359                expected = bytearray(b)
2360                expected[j] = 2
2361                expected[i] = 1
2362                self.assertEqual(raw.getvalue(), expected,
2363                                 "failed result for i=%d, j=%d" % (i, j))
2364
2365    def test_truncate_after_read_or_write(self):
2366        raw = self.BytesIO(b"A" * 10)
2367        bufio = self.tp(raw, 100)
2368        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2369        self.assertEqual(bufio.truncate(), 2)
2370        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2371        self.assertEqual(bufio.truncate(), 4)
2372
2373    def test_misbehaved_io(self):
2374        BufferedReaderTest.test_misbehaved_io(self)
2375        BufferedWriterTest.test_misbehaved_io(self)
2376
2377    def test_interleaved_read_write(self):
2378        # Test for issue #12213
2379        with self.BytesIO(b'abcdefgh') as raw:
2380            with self.tp(raw, 100) as f:
2381                f.write(b"1")
2382                self.assertEqual(f.read(1), b'b')
2383                f.write(b'2')
2384                self.assertEqual(f.read1(1), b'd')
2385                f.write(b'3')
2386                buf = bytearray(1)
2387                f.readinto(buf)
2388                self.assertEqual(buf, b'f')
2389                f.write(b'4')
2390                self.assertEqual(f.peek(1), b'h')
2391                f.flush()
2392                self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2393
2394        with self.BytesIO(b'abc') as raw:
2395            with self.tp(raw, 100) as f:
2396                self.assertEqual(f.read(1), b'a')
2397                f.write(b"2")
2398                self.assertEqual(f.read(1), b'c')
2399                f.flush()
2400                self.assertEqual(raw.getvalue(), b'a2c')
2401
2402    def test_interleaved_readline_write(self):
2403        with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2404            with self.tp(raw) as f:
2405                f.write(b'1')
2406                self.assertEqual(f.readline(), b'b\n')
2407                f.write(b'2')
2408                self.assertEqual(f.readline(), b'def\n')
2409                f.write(b'3')
2410                self.assertEqual(f.readline(), b'\n')
2411                f.flush()
2412                self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2413
2414    # You can't construct a BufferedRandom over a non-seekable stream.
2415    test_unseekable = None
2416
2417    # writable() returns True, so there's no point to test it over
2418    # a writable stream.
2419    test_truncate_on_read_only = None
2420
2421
2422class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
2423    tp = io.BufferedRandom
2424
2425    @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing "
2426                       "instead of returning NULL for malloc failure.")
2427    def test_constructor(self):
2428        BufferedRandomTest.test_constructor(self)
2429        # The allocation can succeed on 32-bit builds, e.g. with more
2430        # than 2 GiB RAM and a 64-bit kernel.
2431        if sys.maxsize > 0x7FFFFFFF:
2432            rawio = self.MockRawIO()
2433            bufio = self.tp(rawio)
2434            self.assertRaises((OverflowError, MemoryError, ValueError),
2435                bufio.__init__, rawio, sys.maxsize)
2436
2437    def test_garbage_collection(self):
2438        CBufferedReaderTest.test_garbage_collection(self)
2439        CBufferedWriterTest.test_garbage_collection(self)
2440
2441    def test_args_error(self):
2442        # Issue #17275
2443        with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2444            self.tp(io.BytesIO(), 1024, 1024, 1024)
2445
2446
2447class PyBufferedRandomTest(BufferedRandomTest):
2448    tp = pyio.BufferedRandom
2449
2450
2451# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2452# properties:
2453#   - A single output character can correspond to many bytes of input.
2454#   - The number of input bytes to complete the character can be
2455#     undetermined until the last input byte is received.
2456#   - The number of input bytes can vary depending on previous input.
2457#   - A single input byte can correspond to many characters of output.
2458#   - The number of output characters can be undetermined until the
2459#     last input byte is received.
2460#   - The number of output characters can vary depending on previous input.
2461
2462class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2463    """
2464    For testing seek/tell behavior with a stateful, buffering decoder.
2465
2466    Input is a sequence of words.  Words may be fixed-length (length set
2467    by input) or variable-length (period-terminated).  In variable-length
2468    mode, extra periods are ignored.  Possible words are:
2469      - 'i' followed by a number sets the input length, I (maximum 99).
2470        When I is set to 0, words are space-terminated.
2471      - 'o' followed by a number sets the output length, O (maximum 99).
2472      - Any other word is converted into a word followed by a period on
2473        the output.  The output word consists of the input word truncated
2474        or padded out with hyphens to make its length equal to O.  If O
2475        is 0, the word is output verbatim without truncating or padding.
2476    I and O are initially set to 1.  When I changes, any buffered input is
2477    re-scanned according to the new I.  EOF also terminates the last word.
2478    """
2479
2480    def __init__(self, errors='strict'):
2481        codecs.IncrementalDecoder.__init__(self, errors)
2482        self.reset()
2483
2484    def __repr__(self):
2485        return '<SID %x>' % id(self)
2486
2487    def reset(self):
2488        self.i = 1
2489        self.o = 1
2490        self.buffer = bytearray()
2491
2492    def getstate(self):
2493        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2494        return bytes(self.buffer), i*100 + o
2495
2496    def setstate(self, state):
2497        buffer, io = state
2498        self.buffer = bytearray(buffer)
2499        i, o = divmod(io, 100)
2500        self.i, self.o = i ^ 1, o ^ 1
2501
2502    def decode(self, input, final=False):
2503        output = ''
2504        for b in input:
2505            if self.i == 0: # variable-length, terminated with period
2506                if b == ord('.'):
2507                    if self.buffer:
2508                        output += self.process_word()
2509                else:
2510                    self.buffer.append(b)
2511            else: # fixed-length, terminate after self.i bytes
2512                self.buffer.append(b)
2513                if len(self.buffer) == self.i:
2514                    output += self.process_word()
2515        if final and self.buffer: # EOF terminates the last word
2516            output += self.process_word()
2517        return output
2518
2519    def process_word(self):
2520        output = ''
2521        if self.buffer[0] == ord('i'):
2522            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2523        elif self.buffer[0] == ord('o'):
2524            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2525        else:
2526            output = self.buffer.decode('ascii')
2527            if len(output) < self.o:
2528                output += '-'*self.o # pad out with hyphens
2529            if self.o:
2530                output = output[:self.o] # truncate to output length
2531            output += '.'
2532        self.buffer = bytearray()
2533        return output
2534
2535    codecEnabled = False
2536
2537
2538# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
2539# when registering codecs and cleanup functions.
2540def lookupTestDecoder(name):
2541    if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
2542        latin1 = codecs.lookup('latin-1')
2543        return codecs.CodecInfo(
2544            name='test_decoder', encode=latin1.encode, decode=None,
2545            incrementalencoder=None,
2546            streamreader=None, streamwriter=None,
2547            incrementaldecoder=StatefulIncrementalDecoder)
2548
2549
2550class StatefulIncrementalDecoderTest(unittest.TestCase):
2551    """
2552    Make sure the StatefulIncrementalDecoder actually works.
2553    """
2554
2555    test_cases = [
2556        # I=1, O=1 (fixed-length input == fixed-length output)
2557        (b'abcd', False, 'a.b.c.d.'),
2558        # I=0, O=0 (variable-length input, variable-length output)
2559        (b'oiabcd', True, 'abcd.'),
2560        # I=0, O=0 (should ignore extra periods)
2561        (b'oi...abcd...', True, 'abcd.'),
2562        # I=0, O=6 (variable-length input, fixed-length output)
2563        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2564        # I=2, O=6 (fixed-length input < fixed-length output)
2565        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
2566        # I=6, O=3 (fixed-length input > fixed-length output)
2567        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2568        # I=0, then 3; O=29, then 15 (with longer output)
2569        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2570         'a----------------------------.' +
2571         'b----------------------------.' +
2572         'cde--------------------------.' +
2573         'abcdefghijabcde.' +
2574         'a.b------------.' +
2575         '.c.------------.' +
2576         'd.e------------.' +
2577         'k--------------.' +
2578         'l--------------.' +
2579         'm--------------.')
2580    ]
2581
2582    def test_decoder(self):
2583        # Try a few one-shot test cases.
2584        for input, eof, output in self.test_cases:
2585            d = StatefulIncrementalDecoder()
2586            self.assertEqual(d.decode(input, eof), output)
2587
2588        # Also test an unfinished decode, followed by forcing EOF.
2589        d = StatefulIncrementalDecoder()
2590        self.assertEqual(d.decode(b'oiabcd'), '')
2591        self.assertEqual(d.decode(b'', 1), 'abcd.')
2592
2593class TextIOWrapperTest(unittest.TestCase):
2594
2595    def setUp(self):
2596        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2597        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
2598        os_helper.unlink(os_helper.TESTFN)
2599        codecs.register(lookupTestDecoder)
2600        self.addCleanup(codecs.unregister, lookupTestDecoder)
2601
2602    def tearDown(self):
2603        os_helper.unlink(os_helper.TESTFN)
2604
2605    def test_constructor(self):
2606        r = self.BytesIO(b"\xc3\xa9\n\n")
2607        b = self.BufferedReader(r, 1000)
2608        t = self.TextIOWrapper(b, encoding="utf-8")
2609        t.__init__(b, encoding="latin-1", newline="\r\n")
2610        self.assertEqual(t.encoding, "latin-1")
2611        self.assertEqual(t.line_buffering, False)
2612        t.__init__(b, encoding="utf-8", line_buffering=True)
2613        self.assertEqual(t.encoding, "utf-8")
2614        self.assertEqual(t.line_buffering, True)
2615        self.assertEqual("\xe9\n", t.readline())
2616        self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42)
2617        self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
2618
2619    def test_uninitialized(self):
2620        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2621        del t
2622        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2623        self.assertRaises(Exception, repr, t)
2624        self.assertRaisesRegex((ValueError, AttributeError),
2625                               'uninitialized|has no attribute',
2626                               t.read, 0)
2627        t.__init__(self.MockRawIO(), encoding="utf-8")
2628        self.assertEqual(t.read(0), '')
2629
2630    def test_non_text_encoding_codecs_are_rejected(self):
2631        # Ensure the constructor complains if passed a codec that isn't
2632        # marked as a text encoding
2633        # http://bugs.python.org/issue20404
2634        r = self.BytesIO()
2635        b = self.BufferedWriter(r)
2636        with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2637            self.TextIOWrapper(b, encoding="hex")
2638
2639    def test_detach(self):
2640        r = self.BytesIO()
2641        b = self.BufferedWriter(r)
2642        t = self.TextIOWrapper(b, encoding="ascii")
2643        self.assertIs(t.detach(), b)
2644
2645        t = self.TextIOWrapper(b, encoding="ascii")
2646        t.write("howdy")
2647        self.assertFalse(r.getvalue())
2648        t.detach()
2649        self.assertEqual(r.getvalue(), b"howdy")
2650        self.assertRaises(ValueError, t.detach)
2651
2652        # Operations independent of the detached stream should still work
2653        repr(t)
2654        self.assertEqual(t.encoding, "ascii")
2655        self.assertEqual(t.errors, "strict")
2656        self.assertFalse(t.line_buffering)
2657        self.assertFalse(t.write_through)
2658
2659    def test_repr(self):
2660        raw = self.BytesIO("hello".encode("utf-8"))
2661        b = self.BufferedReader(raw)
2662        t = self.TextIOWrapper(b, encoding="utf-8")
2663        modname = self.TextIOWrapper.__module__
2664        self.assertRegex(repr(t),
2665                         r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
2666        raw.name = "dummy"
2667        self.assertRegex(repr(t),
2668                         r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2669        t.mode = "r"
2670        self.assertRegex(repr(t),
2671                         r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
2672        raw.name = b"dummy"
2673        self.assertRegex(repr(t),
2674                         r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
2675
2676        t.buffer.detach()
2677        repr(t)  # Should not raise an exception
2678
2679    def test_recursive_repr(self):
2680        # Issue #25455
2681        raw = self.BytesIO()
2682        t = self.TextIOWrapper(raw, encoding="utf-8")
2683        with support.swap_attr(raw, 'name', t):
2684            try:
2685                repr(t)  # Should not crash
2686            except RuntimeError:
2687                pass
2688
2689    def test_line_buffering(self):
2690        r = self.BytesIO()
2691        b = self.BufferedWriter(r, 1000)
2692        t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
2693        t.write("X")
2694        self.assertEqual(r.getvalue(), b"")  # No flush happened
2695        t.write("Y\nZ")
2696        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
2697        t.write("A\rB")
2698        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
2699
2700    def test_reconfigure_line_buffering(self):
2701        r = self.BytesIO()
2702        b = self.BufferedWriter(r, 1000)
2703        t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
2704        t.write("AB\nC")
2705        self.assertEqual(r.getvalue(), b"")
2706
2707        t.reconfigure(line_buffering=True)   # implicit flush
2708        self.assertEqual(r.getvalue(), b"AB\nC")
2709        t.write("DEF\nG")
2710        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2711        t.write("H")
2712        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2713        t.reconfigure(line_buffering=False)   # implicit flush
2714        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2715        t.write("IJ")
2716        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2717
2718        # Keeping default value
2719        t.reconfigure()
2720        t.reconfigure(line_buffering=None)
2721        self.assertEqual(t.line_buffering, False)
2722        t.reconfigure(line_buffering=True)
2723        t.reconfigure()
2724        t.reconfigure(line_buffering=None)
2725        self.assertEqual(t.line_buffering, True)
2726
2727    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2728    def test_default_encoding(self):
2729        old_environ = dict(os.environ)
2730        try:
2731            # try to get a user preferred encoding different than the current
2732            # locale encoding to check that TextIOWrapper() uses the current
2733            # locale encoding and not the user preferred encoding
2734            for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2735                if key in os.environ:
2736                    del os.environ[key]
2737
2738            current_locale_encoding = locale.getencoding()
2739            b = self.BytesIO()
2740            with warnings.catch_warnings():
2741                warnings.simplefilter("ignore", EncodingWarning)
2742                t = self.TextIOWrapper(b)
2743            self.assertEqual(t.encoding, current_locale_encoding)
2744        finally:
2745            os.environ.clear()
2746            os.environ.update(old_environ)
2747
2748    def test_encoding(self):
2749        # Check the encoding attribute is always set, and valid
2750        b = self.BytesIO()
2751        t = self.TextIOWrapper(b, encoding="utf-8")
2752        self.assertEqual(t.encoding, "utf-8")
2753        with warnings.catch_warnings():
2754            warnings.simplefilter("ignore", EncodingWarning)
2755            t = self.TextIOWrapper(b)
2756        self.assertIsNotNone(t.encoding)
2757        codecs.lookup(t.encoding)
2758
2759    def test_encoding_errors_reading(self):
2760        # (1) default
2761        b = self.BytesIO(b"abc\n\xff\n")
2762        t = self.TextIOWrapper(b, encoding="ascii")
2763        self.assertRaises(UnicodeError, t.read)
2764        # (2) explicit strict
2765        b = self.BytesIO(b"abc\n\xff\n")
2766        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2767        self.assertRaises(UnicodeError, t.read)
2768        # (3) ignore
2769        b = self.BytesIO(b"abc\n\xff\n")
2770        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
2771        self.assertEqual(t.read(), "abc\n\n")
2772        # (4) replace
2773        b = self.BytesIO(b"abc\n\xff\n")
2774        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
2775        self.assertEqual(t.read(), "abc\n\ufffd\n")
2776
2777    def test_encoding_errors_writing(self):
2778        # (1) default
2779        b = self.BytesIO()
2780        t = self.TextIOWrapper(b, encoding="ascii")
2781        self.assertRaises(UnicodeError, t.write, "\xff")
2782        # (2) explicit strict
2783        b = self.BytesIO()
2784        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2785        self.assertRaises(UnicodeError, t.write, "\xff")
2786        # (3) ignore
2787        b = self.BytesIO()
2788        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
2789                             newline="\n")
2790        t.write("abc\xffdef\n")
2791        t.flush()
2792        self.assertEqual(b.getvalue(), b"abcdef\n")
2793        # (4) replace
2794        b = self.BytesIO()
2795        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
2796                             newline="\n")
2797        t.write("abc\xffdef\n")
2798        t.flush()
2799        self.assertEqual(b.getvalue(), b"abc?def\n")
2800
2801    def test_newlines(self):
2802        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2803
2804        tests = [
2805            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2806            [ '', input_lines ],
2807            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2808            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2809            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2810        ]
2811        encodings = (
2812            'utf-8', 'latin-1',
2813            'utf-16', 'utf-16-le', 'utf-16-be',
2814            'utf-32', 'utf-32-le', 'utf-32-be',
2815        )
2816
2817        # Try a range of buffer sizes to test the case where \r is the last
2818        # character in TextIOWrapper._pending_line.
2819        for encoding in encodings:
2820            # XXX: str.encode() should return bytes
2821            data = bytes(''.join(input_lines).encode(encoding))
2822            for do_reads in (False, True):
2823                for bufsize in range(1, 10):
2824                    for newline, exp_lines in tests:
2825                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2826                        textio = self.TextIOWrapper(bufio, newline=newline,
2827                                                  encoding=encoding)
2828                        if do_reads:
2829                            got_lines = []
2830                            while True:
2831                                c2 = textio.read(2)
2832                                if c2 == '':
2833                                    break
2834                                self.assertEqual(len(c2), 2)
2835                                got_lines.append(c2 + textio.readline())
2836                        else:
2837                            got_lines = list(textio)
2838
2839                        for got_line, exp_line in zip(got_lines, exp_lines):
2840                            self.assertEqual(got_line, exp_line)
2841                        self.assertEqual(len(got_lines), len(exp_lines))
2842
2843    def test_newlines_input(self):
2844        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
2845        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2846        for newline, expected in [
2847            (None, normalized.decode("ascii").splitlines(keepends=True)),
2848            ("", testdata.decode("ascii").splitlines(keepends=True)),
2849            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2850            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2851            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
2852            ]:
2853            buf = self.BytesIO(testdata)
2854            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2855            self.assertEqual(txt.readlines(), expected)
2856            txt.seek(0)
2857            self.assertEqual(txt.read(), "".join(expected))
2858
2859    def test_newlines_output(self):
2860        testdict = {
2861            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2862            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2863            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2864            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2865            }
2866        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2867        for newline, expected in tests:
2868            buf = self.BytesIO()
2869            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2870            txt.write("AAA\nB")
2871            txt.write("BB\nCCC\n")
2872            txt.write("X\rY\r\nZ")
2873            txt.flush()
2874            self.assertEqual(buf.closed, False)
2875            self.assertEqual(buf.getvalue(), expected)
2876
2877    def test_destructor(self):
2878        l = []
2879        base = self.BytesIO
2880        class MyBytesIO(base):
2881            def close(self):
2882                l.append(self.getvalue())
2883                base.close(self)
2884        b = MyBytesIO()
2885        t = self.TextIOWrapper(b, encoding="ascii")
2886        t.write("abc")
2887        del t
2888        support.gc_collect()
2889        self.assertEqual([b"abc"], l)
2890
2891    def test_override_destructor(self):
2892        record = []
2893        class MyTextIO(self.TextIOWrapper):
2894            def __del__(self):
2895                record.append(1)
2896                try:
2897                    f = super().__del__
2898                except AttributeError:
2899                    pass
2900                else:
2901                    f()
2902            def close(self):
2903                record.append(2)
2904                super().close()
2905            def flush(self):
2906                record.append(3)
2907                super().flush()
2908        b = self.BytesIO()
2909        t = MyTextIO(b, encoding="ascii")
2910        del t
2911        support.gc_collect()
2912        self.assertEqual(record, [1, 2, 3])
2913
2914    def test_error_through_destructor(self):
2915        # Test that the exception state is not modified by a destructor,
2916        # even if close() fails.
2917        rawio = self.CloseFailureIO()
2918        with support.catch_unraisable_exception() as cm:
2919            with self.assertRaises(AttributeError):
2920                self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
2921
2922            if not IOBASE_EMITS_UNRAISABLE:
2923                self.assertIsNone(cm.unraisable)
2924            elif cm.unraisable is not None:
2925                self.assertEqual(cm.unraisable.exc_type, OSError)
2926
2927    # Systematic tests of the text I/O API
2928
2929    def test_basic_io(self):
2930        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2931            for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
2932                f = self.open(os_helper.TESTFN, "w+", encoding=enc)
2933                f._CHUNK_SIZE = chunksize
2934                self.assertEqual(f.write("abc"), 3)
2935                f.close()
2936                f = self.open(os_helper.TESTFN, "r+", encoding=enc)
2937                f._CHUNK_SIZE = chunksize
2938                self.assertEqual(f.tell(), 0)
2939                self.assertEqual(f.read(), "abc")
2940                cookie = f.tell()
2941                self.assertEqual(f.seek(0), 0)
2942                self.assertEqual(f.read(None), "abc")
2943                f.seek(0)
2944                self.assertEqual(f.read(2), "ab")
2945                self.assertEqual(f.read(1), "c")
2946                self.assertEqual(f.read(1), "")
2947                self.assertEqual(f.read(), "")
2948                self.assertEqual(f.tell(), cookie)
2949                self.assertEqual(f.seek(0), 0)
2950                self.assertEqual(f.seek(0, 2), cookie)
2951                self.assertEqual(f.write("def"), 3)
2952                self.assertEqual(f.seek(cookie), cookie)
2953                self.assertEqual(f.read(), "def")
2954                if enc.startswith("utf"):
2955                    self.multi_line_test(f, enc)
2956                f.close()
2957
2958    def multi_line_test(self, f, enc):
2959        f.seek(0)
2960        f.truncate()
2961        sample = "s\xff\u0fff\uffff"
2962        wlines = []
2963        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2964            chars = []
2965            for i in range(size):
2966                chars.append(sample[i % len(sample)])
2967            line = "".join(chars) + "\n"
2968            wlines.append((f.tell(), line))
2969            f.write(line)
2970        f.seek(0)
2971        rlines = []
2972        while True:
2973            pos = f.tell()
2974            line = f.readline()
2975            if not line:
2976                break
2977            rlines.append((pos, line))
2978        self.assertEqual(rlines, wlines)
2979
2980    def test_telling(self):
2981        f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
2982        p0 = f.tell()
2983        f.write("\xff\n")
2984        p1 = f.tell()
2985        f.write("\xff\n")
2986        p2 = f.tell()
2987        f.seek(0)
2988        self.assertEqual(f.tell(), p0)
2989        self.assertEqual(f.readline(), "\xff\n")
2990        self.assertEqual(f.tell(), p1)
2991        self.assertEqual(f.readline(), "\xff\n")
2992        self.assertEqual(f.tell(), p2)
2993        f.seek(0)
2994        for line in f:
2995            self.assertEqual(line, "\xff\n")
2996            self.assertRaises(OSError, f.tell)
2997        self.assertEqual(f.tell(), p2)
2998        f.close()
2999
3000    def test_seeking(self):
3001        chunk_size = _default_chunk_size()
3002        prefix_size = chunk_size - 2
3003        u_prefix = "a" * prefix_size
3004        prefix = bytes(u_prefix.encode("utf-8"))
3005        self.assertEqual(len(u_prefix), len(prefix))
3006        u_suffix = "\u8888\n"
3007        suffix = bytes(u_suffix.encode("utf-8"))
3008        line = prefix + suffix
3009        with self.open(os_helper.TESTFN, "wb") as f:
3010            f.write(line*2)
3011        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
3012            s = f.read(prefix_size)
3013            self.assertEqual(s, str(prefix, "ascii"))
3014            self.assertEqual(f.tell(), prefix_size)
3015            self.assertEqual(f.readline(), u_suffix)
3016
3017    def test_seeking_too(self):
3018        # Regression test for a specific bug
3019        data = b'\xe0\xbf\xbf\n'
3020        with self.open(os_helper.TESTFN, "wb") as f:
3021            f.write(data)
3022        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
3023            f._CHUNK_SIZE  # Just test that it exists
3024            f._CHUNK_SIZE = 2
3025            f.readline()
3026            f.tell()
3027
3028    def test_seek_and_tell(self):
3029        #Test seek/tell using the StatefulIncrementalDecoder.
3030        # Make test faster by doing smaller seeks
3031        CHUNK_SIZE = 128
3032
3033        def test_seek_and_tell_with_data(data, min_pos=0):
3034            """Tell/seek to various points within a data stream and ensure
3035            that the decoded data returned by read() is consistent."""
3036            f = self.open(os_helper.TESTFN, 'wb')
3037            f.write(data)
3038            f.close()
3039            f = self.open(os_helper.TESTFN, encoding='test_decoder')
3040            f._CHUNK_SIZE = CHUNK_SIZE
3041            decoded = f.read()
3042            f.close()
3043
3044            for i in range(min_pos, len(decoded) + 1): # seek positions
3045                for j in [1, 5, len(decoded) - i]: # read lengths
3046                    f = self.open(os_helper.TESTFN, encoding='test_decoder')
3047                    self.assertEqual(f.read(i), decoded[:i])
3048                    cookie = f.tell()
3049                    self.assertEqual(f.read(j), decoded[i:i + j])
3050                    f.seek(cookie)
3051                    self.assertEqual(f.read(), decoded[i:])
3052                    f.close()
3053
3054        # Enable the test decoder.
3055        StatefulIncrementalDecoder.codecEnabled = 1
3056
3057        # Run the tests.
3058        try:
3059            # Try each test case.
3060            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3061                test_seek_and_tell_with_data(input)
3062
3063            # Position each test case so that it crosses a chunk boundary.
3064            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3065                offset = CHUNK_SIZE - len(input)//2
3066                prefix = b'.'*offset
3067                # Don't bother seeking into the prefix (takes too long).
3068                min_pos = offset*2
3069                test_seek_and_tell_with_data(prefix + input, min_pos)
3070
3071        # Ensure our test decoder won't interfere with subsequent tests.
3072        finally:
3073            StatefulIncrementalDecoder.codecEnabled = 0
3074
3075    def test_multibyte_seek_and_tell(self):
3076        f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
3077        f.write("AB\n\u3046\u3048\n")
3078        f.close()
3079
3080        f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
3081        self.assertEqual(f.readline(), "AB\n")
3082        p0 = f.tell()
3083        self.assertEqual(f.readline(), "\u3046\u3048\n")
3084        p1 = f.tell()
3085        f.seek(p0)
3086        self.assertEqual(f.readline(), "\u3046\u3048\n")
3087        self.assertEqual(f.tell(), p1)
3088        f.close()
3089
3090    def test_seek_with_encoder_state(self):
3091        f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
3092        f.write("\u00e6\u0300")
3093        p0 = f.tell()
3094        f.write("\u00e6")
3095        f.seek(p0)
3096        f.write("\u0300")
3097        f.close()
3098
3099        f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
3100        self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3101        f.close()
3102
3103    def test_encoded_writes(self):
3104        data = "1234567890"
3105        tests = ("utf-16",
3106                 "utf-16-le",
3107                 "utf-16-be",
3108                 "utf-32",
3109                 "utf-32-le",
3110                 "utf-32-be")
3111        for encoding in tests:
3112            buf = self.BytesIO()
3113            f = self.TextIOWrapper(buf, encoding=encoding)
3114            # Check if the BOM is written only once (see issue1753).
3115            f.write(data)
3116            f.write(data)
3117            f.seek(0)
3118            self.assertEqual(f.read(), data * 2)
3119            f.seek(0)
3120            self.assertEqual(f.read(), data * 2)
3121            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
3122
3123    def test_unreadable(self):
3124        class UnReadable(self.BytesIO):
3125            def readable(self):
3126                return False
3127        txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
3128        self.assertRaises(OSError, txt.read)
3129
3130    def test_read_one_by_one(self):
3131        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
3132        reads = ""
3133        while True:
3134            c = txt.read(1)
3135            if not c:
3136                break
3137            reads += c
3138        self.assertEqual(reads, "AA\nBB")
3139
3140    def test_readlines(self):
3141        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
3142        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3143        txt.seek(0)
3144        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3145        txt.seek(0)
3146        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3147
3148    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
3149    def test_read_by_chunk(self):
3150        # make sure "\r\n" straddles 128 char boundary.
3151        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
3152        reads = ""
3153        while True:
3154            c = txt.read(128)
3155            if not c:
3156                break
3157            reads += c
3158        self.assertEqual(reads, "A"*127+"\nB")
3159
3160    def test_writelines(self):
3161        l = ['ab', 'cd', 'ef']
3162        buf = self.BytesIO()
3163        txt = self.TextIOWrapper(buf, encoding="utf-8")
3164        txt.writelines(l)
3165        txt.flush()
3166        self.assertEqual(buf.getvalue(), b'abcdef')
3167
3168    def test_writelines_userlist(self):
3169        l = UserList(['ab', 'cd', 'ef'])
3170        buf = self.BytesIO()
3171        txt = self.TextIOWrapper(buf, encoding="utf-8")
3172        txt.writelines(l)
3173        txt.flush()
3174        self.assertEqual(buf.getvalue(), b'abcdef')
3175
3176    def test_writelines_error(self):
3177        txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
3178        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3179        self.assertRaises(TypeError, txt.writelines, None)
3180        self.assertRaises(TypeError, txt.writelines, b'abc')
3181
3182    def test_issue1395_1(self):
3183        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3184
3185        # read one char at a time
3186        reads = ""
3187        while True:
3188            c = txt.read(1)
3189            if not c:
3190                break
3191            reads += c
3192        self.assertEqual(reads, self.normalized)
3193
3194    def test_issue1395_2(self):
3195        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3196        txt._CHUNK_SIZE = 4
3197
3198        reads = ""
3199        while True:
3200            c = txt.read(4)
3201            if not c:
3202                break
3203            reads += c
3204        self.assertEqual(reads, self.normalized)
3205
3206    def test_issue1395_3(self):
3207        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3208        txt._CHUNK_SIZE = 4
3209
3210        reads = txt.read(4)
3211        reads += txt.read(4)
3212        reads += txt.readline()
3213        reads += txt.readline()
3214        reads += txt.readline()
3215        self.assertEqual(reads, self.normalized)
3216
3217    def test_issue1395_4(self):
3218        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3219        txt._CHUNK_SIZE = 4
3220
3221        reads = txt.read(4)
3222        reads += txt.read()
3223        self.assertEqual(reads, self.normalized)
3224
3225    def test_issue1395_5(self):
3226        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3227        txt._CHUNK_SIZE = 4
3228
3229        reads = txt.read(4)
3230        pos = txt.tell()
3231        txt.seek(0)
3232        txt.seek(pos)
3233        self.assertEqual(txt.read(4), "BBB\n")
3234
3235    def test_issue2282(self):
3236        buffer = self.BytesIO(self.testdata)
3237        txt = self.TextIOWrapper(buffer, encoding="ascii")
3238
3239        self.assertEqual(buffer.seekable(), txt.seekable())
3240
3241    def test_append_bom(self):
3242        # The BOM is not written again when appending to a non-empty file
3243        filename = os_helper.TESTFN
3244        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3245            with self.open(filename, 'w', encoding=charset) as f:
3246                f.write('aaa')
3247                pos = f.tell()
3248            with self.open(filename, 'rb') as f:
3249                self.assertEqual(f.read(), 'aaa'.encode(charset))
3250
3251            with self.open(filename, 'a', encoding=charset) as f:
3252                f.write('xxx')
3253            with self.open(filename, 'rb') as f:
3254                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3255
3256    def test_seek_bom(self):
3257        # Same test, but when seeking manually
3258        filename = os_helper.TESTFN
3259        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3260            with self.open(filename, 'w', encoding=charset) as f:
3261                f.write('aaa')
3262                pos = f.tell()
3263            with self.open(filename, 'r+', encoding=charset) as f:
3264                f.seek(pos)
3265                f.write('zzz')
3266                f.seek(0)
3267                f.write('bbb')
3268            with self.open(filename, 'rb') as f:
3269                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
3270
3271    def test_seek_append_bom(self):
3272        # Same test, but first seek to the start and then to the end
3273        filename = os_helper.TESTFN
3274        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3275            with self.open(filename, 'w', encoding=charset) as f:
3276                f.write('aaa')
3277            with self.open(filename, 'a', encoding=charset) as f:
3278                f.seek(0)
3279                f.seek(0, self.SEEK_END)
3280                f.write('xxx')
3281            with self.open(filename, 'rb') as f:
3282                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3283
3284    def test_errors_property(self):
3285        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
3286            self.assertEqual(f.errors, "strict")
3287        with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
3288            self.assertEqual(f.errors, "replace")
3289
3290    @support.no_tracing
3291    @threading_helper.requires_working_threading()
3292    def test_threads_write(self):
3293        # Issue6750: concurrent writes could duplicate data
3294        event = threading.Event()
3295        with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
3296            def run(n):
3297                text = "Thread%03d\n" % n
3298                event.wait()
3299                f.write(text)
3300            threads = [threading.Thread(target=run, args=(x,))
3301                       for x in range(20)]
3302            with threading_helper.start_threads(threads, event.set):
3303                time.sleep(0.02)
3304        with self.open(os_helper.TESTFN, encoding="utf-8") as f:
3305            content = f.read()
3306            for n in range(20):
3307                self.assertEqual(content.count("Thread%03d\n" % n), 1)
3308
3309    def test_flush_error_on_close(self):
3310        # Test that text file is closed despite failed flush
3311        # and that flush() is called before file closed.
3312        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3313        closed = []
3314        def bad_flush():
3315            closed[:] = [txt.closed, txt.buffer.closed]
3316            raise OSError()
3317        txt.flush = bad_flush
3318        self.assertRaises(OSError, txt.close) # exception not swallowed
3319        self.assertTrue(txt.closed)
3320        self.assertTrue(txt.buffer.closed)
3321        self.assertTrue(closed)      # flush() called
3322        self.assertFalse(closed[0])  # flush() called before file closed
3323        self.assertFalse(closed[1])
3324        txt.flush = lambda: None  # break reference loop
3325
3326    def test_close_error_on_close(self):
3327        buffer = self.BytesIO(self.testdata)
3328        def bad_flush():
3329            raise OSError('flush')
3330        def bad_close():
3331            raise OSError('close')
3332        buffer.close = bad_close
3333        txt = self.TextIOWrapper(buffer, encoding="ascii")
3334        txt.flush = bad_flush
3335        with self.assertRaises(OSError) as err: # exception not swallowed
3336            txt.close()
3337        self.assertEqual(err.exception.args, ('close',))
3338        self.assertIsInstance(err.exception.__context__, OSError)
3339        self.assertEqual(err.exception.__context__.args, ('flush',))
3340        self.assertFalse(txt.closed)
3341
3342        # Silence destructor error
3343        buffer.close = lambda: None
3344        txt.flush = lambda: None
3345
3346    def test_nonnormalized_close_error_on_close(self):
3347        # Issue #21677
3348        buffer = self.BytesIO(self.testdata)
3349        def bad_flush():
3350            raise non_existing_flush
3351        def bad_close():
3352            raise non_existing_close
3353        buffer.close = bad_close
3354        txt = self.TextIOWrapper(buffer, encoding="ascii")
3355        txt.flush = bad_flush
3356        with self.assertRaises(NameError) as err: # exception not swallowed
3357            txt.close()
3358        self.assertIn('non_existing_close', str(err.exception))
3359        self.assertIsInstance(err.exception.__context__, NameError)
3360        self.assertIn('non_existing_flush', str(err.exception.__context__))
3361        self.assertFalse(txt.closed)
3362
3363        # Silence destructor error
3364        buffer.close = lambda: None
3365        txt.flush = lambda: None
3366
3367    def test_multi_close(self):
3368        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3369        txt.close()
3370        txt.close()
3371        txt.close()
3372        self.assertRaises(ValueError, txt.flush)
3373
3374    def test_unseekable(self):
3375        txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
3376        self.assertRaises(self.UnsupportedOperation, txt.tell)
3377        self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3378
3379    def test_readonly_attributes(self):
3380        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3381        buf = self.BytesIO(self.testdata)
3382        with self.assertRaises(AttributeError):
3383            txt.buffer = buf
3384
3385    def test_rawio(self):
3386        # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3387        # that subprocess.Popen() can have the required unbuffered
3388        # semantics with universal_newlines=True.
3389        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3390        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3391        # Reads
3392        self.assertEqual(txt.read(4), 'abcd')
3393        self.assertEqual(txt.readline(), 'efghi\n')
3394        self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3395
3396    def test_rawio_write_through(self):
3397        # Issue #12591: with write_through=True, writes don't need a flush
3398        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3399        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3400                                 write_through=True)
3401        txt.write('1')
3402        txt.write('23\n4')
3403        txt.write('5')
3404        self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3405
3406    def test_bufio_write_through(self):
3407        # Issue #21396: write_through=True doesn't force a flush()
3408        # on the underlying binary buffered object.
3409        flush_called, write_called = [], []
3410        class BufferedWriter(self.BufferedWriter):
3411            def flush(self, *args, **kwargs):
3412                flush_called.append(True)
3413                return super().flush(*args, **kwargs)
3414            def write(self, *args, **kwargs):
3415                write_called.append(True)
3416                return super().write(*args, **kwargs)
3417
3418        rawio = self.BytesIO()
3419        data = b"a"
3420        bufio = BufferedWriter(rawio, len(data)*2)
3421        textio = self.TextIOWrapper(bufio, encoding='ascii',
3422                                    write_through=True)
3423        # write to the buffered io but don't overflow the buffer
3424        text = data.decode('ascii')
3425        textio.write(text)
3426
3427        # buffer.flush is not called with write_through=True
3428        self.assertFalse(flush_called)
3429        # buffer.write *is* called with write_through=True
3430        self.assertTrue(write_called)
3431        self.assertEqual(rawio.getvalue(), b"") # no flush
3432
3433        write_called = [] # reset
3434        textio.write(text * 10) # total content is larger than bufio buffer
3435        self.assertTrue(write_called)
3436        self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3437
3438    def test_reconfigure_write_through(self):
3439        raw = self.MockRawIO([])
3440        t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3441        t.write('1')
3442        t.reconfigure(write_through=True)  # implied flush
3443        self.assertEqual(t.write_through, True)
3444        self.assertEqual(b''.join(raw._write_stack), b'1')
3445        t.write('23')
3446        self.assertEqual(b''.join(raw._write_stack), b'123')
3447        t.reconfigure(write_through=False)
3448        self.assertEqual(t.write_through, False)
3449        t.write('45')
3450        t.flush()
3451        self.assertEqual(b''.join(raw._write_stack), b'12345')
3452        # Keeping default value
3453        t.reconfigure()
3454        t.reconfigure(write_through=None)
3455        self.assertEqual(t.write_through, False)
3456        t.reconfigure(write_through=True)
3457        t.reconfigure()
3458        t.reconfigure(write_through=None)
3459        self.assertEqual(t.write_through, True)
3460
3461    def test_read_nonbytes(self):
3462        # Issue #17106
3463        # Crash when underlying read() returns non-bytes
3464        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3465        self.assertRaises(TypeError, t.read, 1)
3466        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3467        self.assertRaises(TypeError, t.readline)
3468        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3469        self.assertRaises(TypeError, t.read)
3470
3471    def test_illegal_encoder(self):
3472        # Issue 31271: Calling write() while the return value of encoder's
3473        # encode() is invalid shouldn't cause an assertion failure.
3474        rot13 = codecs.lookup("rot13")
3475        with support.swap_attr(rot13, '_is_text_encoding', True):
3476            t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3477        self.assertRaises(TypeError, t.write, 'bar')
3478
3479    def test_illegal_decoder(self):
3480        # Issue #17106
3481        # Bypass the early encoding check added in issue 20404
3482        def _make_illegal_wrapper():
3483            quopri = codecs.lookup("quopri")
3484            quopri._is_text_encoding = True
3485            try:
3486                t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3487                                       newline='\n', encoding="quopri")
3488            finally:
3489                quopri._is_text_encoding = False
3490            return t
3491        # Crash when decoder returns non-string
3492        t = _make_illegal_wrapper()
3493        self.assertRaises(TypeError, t.read, 1)
3494        t = _make_illegal_wrapper()
3495        self.assertRaises(TypeError, t.readline)
3496        t = _make_illegal_wrapper()
3497        self.assertRaises(TypeError, t.read)
3498
3499        # Issue 31243: calling read() while the return value of decoder's
3500        # getstate() is invalid should neither crash the interpreter nor
3501        # raise a SystemError.
3502        def _make_very_illegal_wrapper(getstate_ret_val):
3503            class BadDecoder:
3504                def getstate(self):
3505                    return getstate_ret_val
3506            def _get_bad_decoder(dummy):
3507                return BadDecoder()
3508            quopri = codecs.lookup("quopri")
3509            with support.swap_attr(quopri, 'incrementaldecoder',
3510                                   _get_bad_decoder):
3511                return _make_illegal_wrapper()
3512        t = _make_very_illegal_wrapper(42)
3513        self.assertRaises(TypeError, t.read, 42)
3514        t = _make_very_illegal_wrapper(())
3515        self.assertRaises(TypeError, t.read, 42)
3516        t = _make_very_illegal_wrapper((1, 2))
3517        self.assertRaises(TypeError, t.read, 42)
3518
3519    def _check_create_at_shutdown(self, **kwargs):
3520        # Issue #20037: creating a TextIOWrapper at shutdown
3521        # shouldn't crash the interpreter.
3522        iomod = self.io.__name__
3523        code = """if 1:
3524            import codecs
3525            import {iomod} as io
3526
3527            # Avoid looking up codecs at shutdown
3528            codecs.lookup('utf-8')
3529
3530            class C:
3531                def __init__(self):
3532                    self.buf = io.BytesIO()
3533                def __del__(self):
3534                    io.TextIOWrapper(self.buf, **{kwargs})
3535                    print("ok")
3536            c = C()
3537            """.format(iomod=iomod, kwargs=kwargs)
3538        return assert_python_ok("-c", code)
3539
3540    def test_create_at_shutdown_without_encoding(self):
3541        rc, out, err = self._check_create_at_shutdown()
3542        if err:
3543            # Can error out with a RuntimeError if the module state
3544            # isn't found.
3545            self.assertIn(self.shutdown_error, err.decode())
3546        else:
3547            self.assertEqual("ok", out.decode().strip())
3548
3549    def test_create_at_shutdown_with_encoding(self):
3550        rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3551                                                      errors='strict')
3552        self.assertFalse(err)
3553        self.assertEqual("ok", out.decode().strip())
3554
3555    def test_read_byteslike(self):
3556        r = MemviewBytesIO(b'Just some random string\n')
3557        t = self.TextIOWrapper(r, 'utf-8')
3558
3559        # TextIOwrapper will not read the full string, because
3560        # we truncate it to a multiple of the native int size
3561        # so that we can construct a more complex memoryview.
3562        bytes_val =  _to_memoryview(r.getvalue()).tobytes()
3563
3564        self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3565
3566    def test_issue22849(self):
3567        class F(object):
3568            def readable(self): return True
3569            def writable(self): return True
3570            def seekable(self): return True
3571
3572        for i in range(10):
3573            try:
3574                self.TextIOWrapper(F(), encoding='utf-8')
3575            except Exception:
3576                pass
3577
3578        F.tell = lambda x: 0
3579        t = self.TextIOWrapper(F(), encoding='utf-8')
3580
3581    def test_reconfigure_locale(self):
3582        wrapper = io.TextIOWrapper(io.BytesIO(b"test"))
3583        wrapper.reconfigure(encoding="locale")
3584
3585    def test_reconfigure_encoding_read(self):
3586        # latin1 -> utf8
3587        # (latin1 can decode utf-8 encoded string)
3588        data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3589        raw = self.BytesIO(data)
3590        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3591        self.assertEqual(txt.readline(), 'abc\xe9\n')
3592        with self.assertRaises(self.UnsupportedOperation):
3593            txt.reconfigure(encoding='utf-8')
3594        with self.assertRaises(self.UnsupportedOperation):
3595            txt.reconfigure(newline=None)
3596
3597    def test_reconfigure_write_fromascii(self):
3598        # ascii has a specific encodefunc in the C implementation,
3599        # but utf-8-sig has not. Make sure that we get rid of the
3600        # cached encodefunc when we switch encoders.
3601        raw = self.BytesIO()
3602        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3603        txt.write('foo\n')
3604        txt.reconfigure(encoding='utf-8-sig')
3605        txt.write('\xe9\n')
3606        txt.flush()
3607        self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3608
3609    def test_reconfigure_write(self):
3610        # latin -> utf8
3611        raw = self.BytesIO()
3612        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3613        txt.write('abc\xe9\n')
3614        txt.reconfigure(encoding='utf-8')
3615        self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3616        txt.write('d\xe9f\n')
3617        txt.flush()
3618        self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3619
3620        # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3621        # the file
3622        raw = self.BytesIO()
3623        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3624        txt.write('abc\n')
3625        txt.reconfigure(encoding='utf-8-sig')
3626        txt.write('d\xe9f\n')
3627        txt.flush()
3628        self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3629
3630    def test_reconfigure_write_non_seekable(self):
3631        raw = self.BytesIO()
3632        raw.seekable = lambda: False
3633        raw.seek = None
3634        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3635        txt.write('abc\n')
3636        txt.reconfigure(encoding='utf-8-sig')
3637        txt.write('d\xe9f\n')
3638        txt.flush()
3639
3640        # If the raw stream is not seekable, there'll be a BOM
3641        self.assertEqual(raw.getvalue(),  b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3642
3643    def test_reconfigure_defaults(self):
3644        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3645        txt.reconfigure(encoding=None)
3646        self.assertEqual(txt.encoding, 'ascii')
3647        self.assertEqual(txt.errors, 'replace')
3648        txt.write('LF\n')
3649
3650        txt.reconfigure(newline='\r\n')
3651        self.assertEqual(txt.encoding, 'ascii')
3652        self.assertEqual(txt.errors, 'replace')
3653
3654        txt.reconfigure(errors='ignore')
3655        self.assertEqual(txt.encoding, 'ascii')
3656        self.assertEqual(txt.errors, 'ignore')
3657        txt.write('CRLF\n')
3658
3659        txt.reconfigure(encoding='utf-8', newline=None)
3660        self.assertEqual(txt.errors, 'strict')
3661        txt.seek(0)
3662        self.assertEqual(txt.read(), 'LF\nCRLF\n')
3663
3664        self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3665
3666    def test_reconfigure_newline(self):
3667        raw = self.BytesIO(b'CR\rEOF')
3668        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3669        txt.reconfigure(newline=None)
3670        self.assertEqual(txt.readline(), 'CR\n')
3671        raw = self.BytesIO(b'CR\rEOF')
3672        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3673        txt.reconfigure(newline='')
3674        self.assertEqual(txt.readline(), 'CR\r')
3675        raw = self.BytesIO(b'CR\rLF\nEOF')
3676        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3677        txt.reconfigure(newline='\n')
3678        self.assertEqual(txt.readline(), 'CR\rLF\n')
3679        raw = self.BytesIO(b'LF\nCR\rEOF')
3680        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3681        txt.reconfigure(newline='\r')
3682        self.assertEqual(txt.readline(), 'LF\nCR\r')
3683        raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3684        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3685        txt.reconfigure(newline='\r\n')
3686        self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3687
3688        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3689        txt.reconfigure(newline=None)
3690        txt.write('linesep\n')
3691        txt.reconfigure(newline='')
3692        txt.write('LF\n')
3693        txt.reconfigure(newline='\n')
3694        txt.write('LF\n')
3695        txt.reconfigure(newline='\r')
3696        txt.write('CR\n')
3697        txt.reconfigure(newline='\r\n')
3698        txt.write('CRLF\n')
3699        expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3700        self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3701
3702    def test_issue25862(self):
3703        # Assertion failures occurred in tell() after read() and write().
3704        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3705        t.read(1)
3706        t.read()
3707        t.tell()
3708        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3709        t.read(1)
3710        t.write('x')
3711        t.tell()
3712
3713
3714class MemviewBytesIO(io.BytesIO):
3715    '''A BytesIO object whose read method returns memoryviews
3716       rather than bytes'''
3717
3718    def read1(self, len_):
3719        return _to_memoryview(super().read1(len_))
3720
3721    def read(self, len_):
3722        return _to_memoryview(super().read(len_))
3723
3724def _to_memoryview(buf):
3725    '''Convert bytes-object *buf* to a non-trivial memoryview'''
3726
3727    arr = array.array('i')
3728    idx = len(buf) - len(buf) % arr.itemsize
3729    arr.frombytes(buf[:idx])
3730    return memoryview(arr)
3731
3732
3733class CTextIOWrapperTest(TextIOWrapperTest):
3734    io = io
3735    shutdown_error = "LookupError: unknown encoding: ascii"
3736
3737    def test_initialization(self):
3738        r = self.BytesIO(b"\xc3\xa9\n\n")
3739        b = self.BufferedReader(r, 1000)
3740        t = self.TextIOWrapper(b, encoding="utf-8")
3741        self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
3742        self.assertRaises(ValueError, t.read)
3743
3744        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3745        self.assertRaises(Exception, repr, t)
3746
3747    def test_garbage_collection(self):
3748        # C TextIOWrapper objects are collected, and collecting them flushes
3749        # all data to disk.
3750        # The Python version has __del__, so it ends in gc.garbage instead.
3751        with warnings_helper.check_warnings(('', ResourceWarning)):
3752            rawio = io.FileIO(os_helper.TESTFN, "wb")
3753            b = self.BufferedWriter(rawio)
3754            t = self.TextIOWrapper(b, encoding="ascii")
3755            t.write("456def")
3756            t.x = t
3757            wr = weakref.ref(t)
3758            del t
3759            support.gc_collect()
3760        self.assertIsNone(wr(), wr)
3761        with self.open(os_helper.TESTFN, "rb") as f:
3762            self.assertEqual(f.read(), b"456def")
3763
3764    def test_rwpair_cleared_before_textio(self):
3765        # Issue 13070: TextIOWrapper's finalization would crash when called
3766        # after the reference to the underlying BufferedRWPair's writer got
3767        # cleared by the GC.
3768        for i in range(1000):
3769            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3770            t1 = self.TextIOWrapper(b1, encoding="ascii")
3771            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3772            t2 = self.TextIOWrapper(b2, encoding="ascii")
3773            # circular references
3774            t1.buddy = t2
3775            t2.buddy = t1
3776        support.gc_collect()
3777
3778    def test_del__CHUNK_SIZE_SystemError(self):
3779        t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3780        with self.assertRaises(AttributeError):
3781            del t._CHUNK_SIZE
3782
3783    def test_internal_buffer_size(self):
3784        # bpo-43260: TextIOWrapper's internal buffer should not store
3785        # data larger than chunk size.
3786        chunk_size = 8192  # default chunk size, updated later
3787
3788        class MockIO(self.MockRawIO):
3789            def write(self, data):
3790                if len(data) > chunk_size:
3791                    raise RuntimeError
3792                return super().write(data)
3793
3794        buf = MockIO()
3795        t = self.TextIOWrapper(buf, encoding="ascii")
3796        chunk_size = t._CHUNK_SIZE
3797        t.write("abc")
3798        t.write("def")
3799        # default chunk size is 8192 bytes so t don't write data to buf.
3800        self.assertEqual([], buf._write_stack)
3801
3802        with self.assertRaises(RuntimeError):
3803            t.write("x"*(chunk_size+1))
3804
3805        self.assertEqual([b"abcdef"], buf._write_stack)
3806        t.write("ghi")
3807        t.write("x"*chunk_size)
3808        self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
3809
3810
3811class PyTextIOWrapperTest(TextIOWrapperTest):
3812    io = pyio
3813    shutdown_error = "LookupError: unknown encoding: ascii"
3814
3815
3816class IncrementalNewlineDecoderTest(unittest.TestCase):
3817
3818    def check_newline_decoding_utf8(self, decoder):
3819        # UTF-8 specific tests for a newline decoder
3820        def _check_decode(b, s, **kwargs):
3821            # We exercise getstate() / setstate() as well as decode()
3822            state = decoder.getstate()
3823            self.assertEqual(decoder.decode(b, **kwargs), s)
3824            decoder.setstate(state)
3825            self.assertEqual(decoder.decode(b, **kwargs), s)
3826
3827        _check_decode(b'\xe8\xa2\x88', "\u8888")
3828
3829        _check_decode(b'\xe8', "")
3830        _check_decode(b'\xa2', "")
3831        _check_decode(b'\x88', "\u8888")
3832
3833        _check_decode(b'\xe8', "")
3834        _check_decode(b'\xa2', "")
3835        _check_decode(b'\x88', "\u8888")
3836
3837        _check_decode(b'\xe8', "")
3838        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3839
3840        decoder.reset()
3841        _check_decode(b'\n', "\n")
3842        _check_decode(b'\r', "")
3843        _check_decode(b'', "\n", final=True)
3844        _check_decode(b'\r', "\n", final=True)
3845
3846        _check_decode(b'\r', "")
3847        _check_decode(b'a', "\na")
3848
3849        _check_decode(b'\r\r\n', "\n\n")
3850        _check_decode(b'\r', "")
3851        _check_decode(b'\r', "\n")
3852        _check_decode(b'\na', "\na")
3853
3854        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3855        _check_decode(b'\xe8\xa2\x88', "\u8888")
3856        _check_decode(b'\n', "\n")
3857        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3858        _check_decode(b'\n', "\n")
3859
3860    def check_newline_decoding(self, decoder, encoding):
3861        result = []
3862        if encoding is not None:
3863            encoder = codecs.getincrementalencoder(encoding)()
3864            def _decode_bytewise(s):
3865                # Decode one byte at a time
3866                for b in encoder.encode(s):
3867                    result.append(decoder.decode(bytes([b])))
3868        else:
3869            encoder = None
3870            def _decode_bytewise(s):
3871                # Decode one char at a time
3872                for c in s:
3873                    result.append(decoder.decode(c))
3874        self.assertEqual(decoder.newlines, None)
3875        _decode_bytewise("abc\n\r")
3876        self.assertEqual(decoder.newlines, '\n')
3877        _decode_bytewise("\nabc")
3878        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3879        _decode_bytewise("abc\r")
3880        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3881        _decode_bytewise("abc")
3882        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
3883        _decode_bytewise("abc\r")
3884        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
3885        decoder.reset()
3886        input = "abc"
3887        if encoder is not None:
3888            encoder.reset()
3889            input = encoder.encode(input)
3890        self.assertEqual(decoder.decode(input), "abc")
3891        self.assertEqual(decoder.newlines, None)
3892
3893    def test_newline_decoder(self):
3894        encodings = (
3895            # None meaning the IncrementalNewlineDecoder takes unicode input
3896            # rather than bytes input
3897            None, 'utf-8', 'latin-1',
3898            'utf-16', 'utf-16-le', 'utf-16-be',
3899            'utf-32', 'utf-32-le', 'utf-32-be',
3900        )
3901        for enc in encodings:
3902            decoder = enc and codecs.getincrementaldecoder(enc)()
3903            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3904            self.check_newline_decoding(decoder, enc)
3905        decoder = codecs.getincrementaldecoder("utf-8")()
3906        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3907        self.check_newline_decoding_utf8(decoder)
3908        self.assertRaises(TypeError, decoder.setstate, 42)
3909
3910    def test_newline_bytes(self):
3911        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3912        def _check(dec):
3913            self.assertEqual(dec.newlines, None)
3914            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3915            self.assertEqual(dec.newlines, None)
3916            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3917            self.assertEqual(dec.newlines, None)
3918        dec = self.IncrementalNewlineDecoder(None, translate=False)
3919        _check(dec)
3920        dec = self.IncrementalNewlineDecoder(None, translate=True)
3921        _check(dec)
3922
3923    def test_translate(self):
3924        # issue 35062
3925        for translate in (-2, -1, 1, 2):
3926            decoder = codecs.getincrementaldecoder("utf-8")()
3927            decoder = self.IncrementalNewlineDecoder(decoder, translate)
3928            self.check_newline_decoding_utf8(decoder)
3929        decoder = codecs.getincrementaldecoder("utf-8")()
3930        decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3931        self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3932
3933class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3934    @support.cpython_only
3935    def test_uninitialized(self):
3936        uninitialized = self.IncrementalNewlineDecoder.__new__(
3937            self.IncrementalNewlineDecoder)
3938        self.assertRaises(ValueError, uninitialized.decode, b'bar')
3939        self.assertRaises(ValueError, uninitialized.getstate)
3940        self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0))
3941        self.assertRaises(ValueError, uninitialized.reset)
3942
3943
3944class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3945    pass
3946
3947
3948# XXX Tests for open()
3949
3950class MiscIOTest(unittest.TestCase):
3951
3952    def tearDown(self):
3953        os_helper.unlink(os_helper.TESTFN)
3954
3955    def test___all__(self):
3956        for name in self.io.__all__:
3957            obj = getattr(self.io, name, None)
3958            self.assertIsNotNone(obj, name)
3959            if name in ("open", "open_code"):
3960                continue
3961            elif "error" in name.lower() or name == "UnsupportedOperation":
3962                self.assertTrue(issubclass(obj, Exception), name)
3963            elif not name.startswith("SEEK_"):
3964                self.assertTrue(issubclass(obj, self.IOBase))
3965
3966    def test_attributes(self):
3967        f = self.open(os_helper.TESTFN, "wb", buffering=0)
3968        self.assertEqual(f.mode, "wb")
3969        f.close()
3970
3971        f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
3972        self.assertEqual(f.mode,            "w+")
3973        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
3974        self.assertEqual(f.buffer.raw.mode, "rb+")
3975
3976        g = self.open(f.fileno(), "wb", closefd=False)
3977        self.assertEqual(g.mode,     "wb")
3978        self.assertEqual(g.raw.mode, "wb")
3979        self.assertEqual(g.name,     f.fileno())
3980        self.assertEqual(g.raw.name, f.fileno())
3981        f.close()
3982        g.close()
3983
3984    def test_removed_u_mode(self):
3985        # bpo-37330: The "U" mode has been removed in Python 3.11
3986        for mode in ("U", "rU", "r+U"):
3987            with self.assertRaises(ValueError) as cm:
3988                self.open(os_helper.TESTFN, mode)
3989            self.assertIn('invalid mode', str(cm.exception))
3990
3991    @unittest.skipIf(
3992        support.is_emscripten, "fstat() of a pipe fd is not supported"
3993    )
3994    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
3995    def test_open_pipe_with_append(self):
3996        # bpo-27805: Ignore ESPIPE from lseek() in open().
3997        r, w = os.pipe()
3998        self.addCleanup(os.close, r)
3999        f = self.open(w, 'a', encoding="utf-8")
4000        self.addCleanup(f.close)
4001        # Check that the file is marked non-seekable. On Windows, however, lseek
4002        # somehow succeeds on pipes.
4003        if sys.platform != 'win32':
4004            self.assertFalse(f.seekable())
4005
4006    def test_io_after_close(self):
4007        for kwargs in [
4008                {"mode": "w"},
4009                {"mode": "wb"},
4010                {"mode": "w", "buffering": 1},
4011                {"mode": "w", "buffering": 2},
4012                {"mode": "wb", "buffering": 0},
4013                {"mode": "r"},
4014                {"mode": "rb"},
4015                {"mode": "r", "buffering": 1},
4016                {"mode": "r", "buffering": 2},
4017                {"mode": "rb", "buffering": 0},
4018                {"mode": "w+"},
4019                {"mode": "w+b"},
4020                {"mode": "w+", "buffering": 1},
4021                {"mode": "w+", "buffering": 2},
4022                {"mode": "w+b", "buffering": 0},
4023            ]:
4024            if "b" not in kwargs["mode"]:
4025                kwargs["encoding"] = "utf-8"
4026            f = self.open(os_helper.TESTFN, **kwargs)
4027            f.close()
4028            self.assertRaises(ValueError, f.flush)
4029            self.assertRaises(ValueError, f.fileno)
4030            self.assertRaises(ValueError, f.isatty)
4031            self.assertRaises(ValueError, f.__iter__)
4032            if hasattr(f, "peek"):
4033                self.assertRaises(ValueError, f.peek, 1)
4034            self.assertRaises(ValueError, f.read)
4035            if hasattr(f, "read1"):
4036                self.assertRaises(ValueError, f.read1, 1024)
4037                self.assertRaises(ValueError, f.read1)
4038            if hasattr(f, "readall"):
4039                self.assertRaises(ValueError, f.readall)
4040            if hasattr(f, "readinto"):
4041                self.assertRaises(ValueError, f.readinto, bytearray(1024))
4042            if hasattr(f, "readinto1"):
4043                self.assertRaises(ValueError, f.readinto1, bytearray(1024))
4044            self.assertRaises(ValueError, f.readline)
4045            self.assertRaises(ValueError, f.readlines)
4046            self.assertRaises(ValueError, f.readlines, 1)
4047            self.assertRaises(ValueError, f.seek, 0)
4048            self.assertRaises(ValueError, f.tell)
4049            self.assertRaises(ValueError, f.truncate)
4050            self.assertRaises(ValueError, f.write,
4051                              b"" if "b" in kwargs['mode'] else "")
4052            self.assertRaises(ValueError, f.writelines, [])
4053            self.assertRaises(ValueError, next, f)
4054
4055    def test_blockingioerror(self):
4056        # Various BlockingIOError issues
4057        class C(str):
4058            pass
4059        c = C("")
4060        b = self.BlockingIOError(1, c)
4061        c.b = b
4062        b.c = c
4063        wr = weakref.ref(c)
4064        del c, b
4065        support.gc_collect()
4066        self.assertIsNone(wr(), wr)
4067
4068    def test_abcs(self):
4069        # Test the visible base classes are ABCs.
4070        self.assertIsInstance(self.IOBase, abc.ABCMeta)
4071        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4072        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4073        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
4074
4075    def _check_abc_inheritance(self, abcmodule):
4076        with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
4077            self.assertIsInstance(f, abcmodule.IOBase)
4078            self.assertIsInstance(f, abcmodule.RawIOBase)
4079            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4080            self.assertNotIsInstance(f, abcmodule.TextIOBase)
4081        with self.open(os_helper.TESTFN, "wb") as f:
4082            self.assertIsInstance(f, abcmodule.IOBase)
4083            self.assertNotIsInstance(f, abcmodule.RawIOBase)
4084            self.assertIsInstance(f, abcmodule.BufferedIOBase)
4085            self.assertNotIsInstance(f, abcmodule.TextIOBase)
4086        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
4087            self.assertIsInstance(f, abcmodule.IOBase)
4088            self.assertNotIsInstance(f, abcmodule.RawIOBase)
4089            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4090            self.assertIsInstance(f, abcmodule.TextIOBase)
4091
4092    def test_abc_inheritance(self):
4093        # Test implementations inherit from their respective ABCs
4094        self._check_abc_inheritance(self)
4095
4096    def test_abc_inheritance_official(self):
4097        # Test implementations inherit from the official ABCs of the
4098        # baseline "io" module.
4099        self._check_abc_inheritance(io)
4100
4101    def _check_warn_on_dealloc(self, *args, **kwargs):
4102        f = open(*args, **kwargs)
4103        r = repr(f)
4104        with self.assertWarns(ResourceWarning) as cm:
4105            f = None
4106            support.gc_collect()
4107        self.assertIn(r, str(cm.warning.args[0]))
4108
4109    def test_warn_on_dealloc(self):
4110        self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
4111        self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
4112        self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8")
4113
4114    def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4115        fds = []
4116        def cleanup_fds():
4117            for fd in fds:
4118                try:
4119                    os.close(fd)
4120                except OSError as e:
4121                    if e.errno != errno.EBADF:
4122                        raise
4123        self.addCleanup(cleanup_fds)
4124        r, w = os.pipe()
4125        fds += r, w
4126        self._check_warn_on_dealloc(r, *args, **kwargs)
4127        # When using closefd=False, there's no warning
4128        r, w = os.pipe()
4129        fds += r, w
4130        with warnings_helper.check_no_resource_warning(self):
4131            open(r, *args, closefd=False, **kwargs)
4132
4133    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
4134    def test_warn_on_dealloc_fd(self):
4135        self._check_warn_on_dealloc_fd("rb", buffering=0)
4136        self._check_warn_on_dealloc_fd("rb")
4137        self._check_warn_on_dealloc_fd("r", encoding="utf-8")
4138
4139
4140    def test_pickling(self):
4141        # Pickling file objects is forbidden
4142        for kwargs in [
4143                {"mode": "w"},
4144                {"mode": "wb"},
4145                {"mode": "wb", "buffering": 0},
4146                {"mode": "r"},
4147                {"mode": "rb"},
4148                {"mode": "rb", "buffering": 0},
4149                {"mode": "w+"},
4150                {"mode": "w+b"},
4151                {"mode": "w+b", "buffering": 0},
4152            ]:
4153            if "b" not in kwargs["mode"]:
4154                kwargs["encoding"] = "utf-8"
4155            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4156                with self.open(os_helper.TESTFN, **kwargs) as f:
4157                    self.assertRaises(TypeError, pickle.dumps, f, protocol)
4158
4159    @unittest.skipIf(
4160        support.is_emscripten, "fstat() of a pipe fd is not supported"
4161    )
4162    def test_nonblock_pipe_write_bigbuf(self):
4163        self._test_nonblock_pipe_write(16*1024)
4164
4165    @unittest.skipIf(
4166        support.is_emscripten, "fstat() of a pipe fd is not supported"
4167    )
4168    def test_nonblock_pipe_write_smallbuf(self):
4169        self._test_nonblock_pipe_write(1024)
4170
4171    @unittest.skipUnless(hasattr(os, 'set_blocking'),
4172                         'os.set_blocking() required for this test')
4173    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
4174    def _test_nonblock_pipe_write(self, bufsize):
4175        sent = []
4176        received = []
4177        r, w = os.pipe()
4178        os.set_blocking(r, False)
4179        os.set_blocking(w, False)
4180
4181        # To exercise all code paths in the C implementation we need
4182        # to play with buffer sizes.  For instance, if we choose a
4183        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4184        # then we will never get a partial write of the buffer.
4185        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4186        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4187
4188        with rf, wf:
4189            for N in 9999, 73, 7574:
4190                try:
4191                    i = 0
4192                    while True:
4193                        msg = bytes([i % 26 + 97]) * N
4194                        sent.append(msg)
4195                        wf.write(msg)
4196                        i += 1
4197
4198                except self.BlockingIOError as e:
4199                    self.assertEqual(e.args[0], errno.EAGAIN)
4200                    self.assertEqual(e.args[2], e.characters_written)
4201                    sent[-1] = sent[-1][:e.characters_written]
4202                    received.append(rf.read())
4203                    msg = b'BLOCKED'
4204                    wf.write(msg)
4205                    sent.append(msg)
4206
4207            while True:
4208                try:
4209                    wf.flush()
4210                    break
4211                except self.BlockingIOError as e:
4212                    self.assertEqual(e.args[0], errno.EAGAIN)
4213                    self.assertEqual(e.args[2], e.characters_written)
4214                    self.assertEqual(e.characters_written, 0)
4215                    received.append(rf.read())
4216
4217            received += iter(rf.read, None)
4218
4219        sent, received = b''.join(sent), b''.join(received)
4220        self.assertEqual(sent, received)
4221        self.assertTrue(wf.closed)
4222        self.assertTrue(rf.closed)
4223
4224    def test_create_fail(self):
4225        # 'x' mode fails if file is existing
4226        with self.open(os_helper.TESTFN, 'w', encoding="utf-8"):
4227            pass
4228        self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8")
4229
4230    def test_create_writes(self):
4231        # 'x' mode opens for writing
4232        with self.open(os_helper.TESTFN, 'xb') as f:
4233            f.write(b"spam")
4234        with self.open(os_helper.TESTFN, 'rb') as f:
4235            self.assertEqual(b"spam", f.read())
4236
4237    def test_open_allargs(self):
4238        # there used to be a buffer overflow in the parser for rawmode
4239        self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8")
4240
4241    def test_check_encoding_errors(self):
4242        # bpo-37388: open() and TextIOWrapper must check encoding and errors
4243        # arguments in dev mode
4244        mod = self.io.__name__
4245        filename = __file__
4246        invalid = 'Boom, Shaka Laka, Boom!'
4247        code = textwrap.dedent(f'''
4248            import sys
4249            from {mod} import open, TextIOWrapper
4250
4251            try:
4252                open({filename!r}, encoding={invalid!r})
4253            except LookupError:
4254                pass
4255            else:
4256                sys.exit(21)
4257
4258            try:
4259                open({filename!r}, errors={invalid!r})
4260            except LookupError:
4261                pass
4262            else:
4263                sys.exit(22)
4264
4265            fp = open({filename!r}, "rb")
4266            with fp:
4267                try:
4268                    TextIOWrapper(fp, encoding={invalid!r})
4269                except LookupError:
4270                    pass
4271                else:
4272                    sys.exit(23)
4273
4274                try:
4275                    TextIOWrapper(fp, errors={invalid!r})
4276                except LookupError:
4277                    pass
4278                else:
4279                    sys.exit(24)
4280
4281            sys.exit(10)
4282        ''')
4283        proc = assert_python_failure('-X', 'dev', '-c', code)
4284        self.assertEqual(proc.rc, 10, proc)
4285
4286    def test_check_encoding_warning(self):
4287        # PEP 597: Raise warning when encoding is not specified
4288        # and sys.flags.warn_default_encoding is set.
4289        mod = self.io.__name__
4290        filename = __file__
4291        code = textwrap.dedent(f'''\
4292            import sys
4293            from {mod} import open, TextIOWrapper
4294            import pathlib
4295
4296            with open({filename!r}) as f:           # line 5
4297                pass
4298
4299            pathlib.Path({filename!r}).read_text()  # line 8
4300        ''')
4301        proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
4302        warnings = proc.err.splitlines()
4303        self.assertEqual(len(warnings), 2)
4304        self.assertTrue(
4305            warnings[0].startswith(b"<string>:5: EncodingWarning: "))
4306        self.assertTrue(
4307            warnings[1].startswith(b"<string>:8: EncodingWarning: "))
4308
4309    def test_text_encoding(self):
4310        # PEP 597, bpo-47000. io.text_encoding() returns "locale" or "utf-8"
4311        # based on sys.flags.utf8_mode
4312        code = "import io; print(io.text_encoding(None))"
4313
4314        proc = assert_python_ok('-X', 'utf8=0', '-c', code)
4315        self.assertEqual(b"locale", proc.out.strip())
4316
4317        proc = assert_python_ok('-X', 'utf8=1', '-c', code)
4318        self.assertEqual(b"utf-8", proc.out.strip())
4319
4320    @support.cpython_only
4321    # Depending if OpenWrapper was already created or not, the warning is
4322    # emitted or not. For example, the attribute is already created when this
4323    # test is run multiple times.
4324    @warnings_helper.ignore_warnings(category=DeprecationWarning)
4325    def test_openwrapper(self):
4326        self.assertIs(self.io.OpenWrapper, self.io.open)
4327
4328
4329class CMiscIOTest(MiscIOTest):
4330    io = io
4331
4332    def test_readinto_buffer_overflow(self):
4333        # Issue #18025
4334        class BadReader(self.io.BufferedIOBase):
4335            def read(self, n=-1):
4336                return b'x' * 10**6
4337        bufio = BadReader()
4338        b = bytearray(2)
4339        self.assertRaises(ValueError, bufio.readinto, b)
4340
4341    def check_daemon_threads_shutdown_deadlock(self, stream_name):
4342        # Issue #23309: deadlocks at shutdown should be avoided when a
4343        # daemon thread and the main thread both write to a file.
4344        code = """if 1:
4345            import sys
4346            import time
4347            import threading
4348            from test.support import SuppressCrashReport
4349
4350            file = sys.{stream_name}
4351
4352            def run():
4353                while True:
4354                    file.write('.')
4355                    file.flush()
4356
4357            crash = SuppressCrashReport()
4358            crash.__enter__()
4359            # don't call __exit__(): the crash occurs at Python shutdown
4360
4361            thread = threading.Thread(target=run)
4362            thread.daemon = True
4363            thread.start()
4364
4365            time.sleep(0.5)
4366            file.write('!')
4367            file.flush()
4368            """.format_map(locals())
4369        res, _ = run_python_until_end("-c", code)
4370        err = res.err.decode()
4371        if res.rc != 0:
4372            # Failure: should be a fatal error
4373            pattern = (r"Fatal Python error: _enter_buffered_busy: "
4374                       r"could not acquire lock "
4375                       r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4376                       r"at interpreter shutdown, possibly due to "
4377                       r"daemon threads".format_map(locals()))
4378            self.assertRegex(err, pattern)
4379        else:
4380            self.assertFalse(err.strip('.!'))
4381
4382    @threading_helper.requires_working_threading()
4383    def test_daemon_threads_shutdown_stdout_deadlock(self):
4384        self.check_daemon_threads_shutdown_deadlock('stdout')
4385
4386    @threading_helper.requires_working_threading()
4387    def test_daemon_threads_shutdown_stderr_deadlock(self):
4388        self.check_daemon_threads_shutdown_deadlock('stderr')
4389
4390
4391class PyMiscIOTest(MiscIOTest):
4392    io = pyio
4393
4394
4395@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4396class SignalsTest(unittest.TestCase):
4397
4398    def setUp(self):
4399        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4400
4401    def tearDown(self):
4402        signal.signal(signal.SIGALRM, self.oldalrm)
4403
4404    def alarm_interrupt(self, sig, frame):
4405        1/0
4406
4407    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4408        """Check that a partial write, when it gets interrupted, properly
4409        invokes the signal handler, and bubbles up the exception raised
4410        in the latter."""
4411
4412        # XXX This test has three flaws that appear when objects are
4413        # XXX not reference counted.
4414
4415        # - if wio.write() happens to trigger a garbage collection,
4416        #   the signal exception may be raised when some __del__
4417        #   method is running; it will not reach the assertRaises()
4418        #   call.
4419
4420        # - more subtle, if the wio object is not destroyed at once
4421        #   and survives this function, the next opened file is likely
4422        #   to have the same fileno (since the file descriptor was
4423        #   actively closed).  When wio.__del__ is finally called, it
4424        #   will close the other's test file...  To trigger this with
4425        #   CPython, try adding "global wio" in this function.
4426
4427        # - This happens only for streams created by the _pyio module,
4428        #   because a wio.close() that fails still consider that the
4429        #   file needs to be closed again.  You can try adding an
4430        #   "assert wio.closed" at the end of the function.
4431
4432        # Fortunately, a little gc.collect() seems to be enough to
4433        # work around all these issues.
4434        support.gc_collect()  # For PyPy or other GCs.
4435
4436        read_results = []
4437        def _read():
4438            s = os.read(r, 1)
4439            read_results.append(s)
4440
4441        t = threading.Thread(target=_read)
4442        t.daemon = True
4443        r, w = os.pipe()
4444        fdopen_kwargs["closefd"] = False
4445        large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
4446        try:
4447            wio = self.io.open(w, **fdopen_kwargs)
4448            if hasattr(signal, 'pthread_sigmask'):
4449                # create the thread with SIGALRM signal blocked
4450                signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4451                t.start()
4452                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4453            else:
4454                t.start()
4455
4456            # Fill the pipe enough that the write will be blocking.
4457            # It will be interrupted by the timer armed above.  Since the
4458            # other thread has read one byte, the low-level write will
4459            # return with a successful (partial) result rather than an EINTR.
4460            # The buffered IO layer must check for pending signal
4461            # handlers, which in this case will invoke alarm_interrupt().
4462            signal.alarm(1)
4463            try:
4464                self.assertRaises(ZeroDivisionError, wio.write, large_data)
4465            finally:
4466                signal.alarm(0)
4467                t.join()
4468            # We got one byte, get another one and check that it isn't a
4469            # repeat of the first one.
4470            read_results.append(os.read(r, 1))
4471            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4472        finally:
4473            os.close(w)
4474            os.close(r)
4475            # This is deliberate. If we didn't close the file descriptor
4476            # before closing wio, wio would try to flush its internal
4477            # buffer, and block again.
4478            try:
4479                wio.close()
4480            except OSError as e:
4481                if e.errno != errno.EBADF:
4482                    raise
4483
4484    @requires_alarm
4485    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
4486    def test_interrupted_write_unbuffered(self):
4487        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4488
4489    @requires_alarm
4490    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
4491    def test_interrupted_write_buffered(self):
4492        self.check_interrupted_write(b"xy", b"xy", mode="wb")
4493
4494    @requires_alarm
4495    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
4496    def test_interrupted_write_text(self):
4497        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4498
4499    @support.no_tracing
4500    def check_reentrant_write(self, data, **fdopen_kwargs):
4501        def on_alarm(*args):
4502            # Will be called reentrantly from the same thread
4503            wio.write(data)
4504            1/0
4505        signal.signal(signal.SIGALRM, on_alarm)
4506        r, w = os.pipe()
4507        wio = self.io.open(w, **fdopen_kwargs)
4508        try:
4509            signal.alarm(1)
4510            # Either the reentrant call to wio.write() fails with RuntimeError,
4511            # or the signal handler raises ZeroDivisionError.
4512            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4513                while 1:
4514                    for i in range(100):
4515                        wio.write(data)
4516                        wio.flush()
4517                    # Make sure the buffer doesn't fill up and block further writes
4518                    os.read(r, len(data) * 100)
4519            exc = cm.exception
4520            if isinstance(exc, RuntimeError):
4521                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4522        finally:
4523            signal.alarm(0)
4524            wio.close()
4525            os.close(r)
4526
4527    @requires_alarm
4528    def test_reentrant_write_buffered(self):
4529        self.check_reentrant_write(b"xy", mode="wb")
4530
4531    @requires_alarm
4532    def test_reentrant_write_text(self):
4533        self.check_reentrant_write("xy", mode="w", encoding="ascii")
4534
4535    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4536        """Check that a buffered read, when it gets interrupted (either
4537        returning a partial result or EINTR), properly invokes the signal
4538        handler and retries if the latter returned successfully."""
4539        r, w = os.pipe()
4540        fdopen_kwargs["closefd"] = False
4541        def alarm_handler(sig, frame):
4542            os.write(w, b"bar")
4543        signal.signal(signal.SIGALRM, alarm_handler)
4544        try:
4545            rio = self.io.open(r, **fdopen_kwargs)
4546            os.write(w, b"foo")
4547            signal.alarm(1)
4548            # Expected behaviour:
4549            # - first raw read() returns partial b"foo"
4550            # - second raw read() returns EINTR
4551            # - third raw read() returns b"bar"
4552            self.assertEqual(decode(rio.read(6)), "foobar")
4553        finally:
4554            signal.alarm(0)
4555            rio.close()
4556            os.close(w)
4557            os.close(r)
4558
4559    @requires_alarm
4560    def test_interrupted_read_retry_buffered(self):
4561        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4562                                          mode="rb")
4563
4564    @requires_alarm
4565    def test_interrupted_read_retry_text(self):
4566        self.check_interrupted_read_retry(lambda x: x,
4567                                          mode="r", encoding="latin1")
4568
4569    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4570        """Check that a buffered write, when it gets interrupted (either
4571        returning a partial result or EINTR), properly invokes the signal
4572        handler and retries if the latter returned successfully."""
4573        select = import_helper.import_module("select")
4574
4575        # A quantity that exceeds the buffer size of an anonymous pipe's
4576        # write end.
4577        N = support.PIPE_MAX_SIZE
4578        r, w = os.pipe()
4579        fdopen_kwargs["closefd"] = False
4580
4581        # We need a separate thread to read from the pipe and allow the
4582        # write() to finish.  This thread is started after the SIGALRM is
4583        # received (forcing a first EINTR in write()).
4584        read_results = []
4585        write_finished = False
4586        error = None
4587        def _read():
4588            try:
4589                while not write_finished:
4590                    while r in select.select([r], [], [], 1.0)[0]:
4591                        s = os.read(r, 1024)
4592                        read_results.append(s)
4593            except BaseException as exc:
4594                nonlocal error
4595                error = exc
4596        t = threading.Thread(target=_read)
4597        t.daemon = True
4598        def alarm1(sig, frame):
4599            signal.signal(signal.SIGALRM, alarm2)
4600            signal.alarm(1)
4601        def alarm2(sig, frame):
4602            t.start()
4603
4604        large_data = item * N
4605        signal.signal(signal.SIGALRM, alarm1)
4606        try:
4607            wio = self.io.open(w, **fdopen_kwargs)
4608            signal.alarm(1)
4609            # Expected behaviour:
4610            # - first raw write() is partial (because of the limited pipe buffer
4611            #   and the first alarm)
4612            # - second raw write() returns EINTR (because of the second alarm)
4613            # - subsequent write()s are successful (either partial or complete)
4614            written = wio.write(large_data)
4615            self.assertEqual(N, written)
4616
4617            wio.flush()
4618            write_finished = True
4619            t.join()
4620
4621            self.assertIsNone(error)
4622            self.assertEqual(N, sum(len(x) for x in read_results))
4623        finally:
4624            signal.alarm(0)
4625            write_finished = True
4626            os.close(w)
4627            os.close(r)
4628            # This is deliberate. If we didn't close the file descriptor
4629            # before closing wio, wio would try to flush its internal
4630            # buffer, and could block (in case of failure).
4631            try:
4632                wio.close()
4633            except OSError as e:
4634                if e.errno != errno.EBADF:
4635                    raise
4636
4637    @requires_alarm
4638    def test_interrupted_write_retry_buffered(self):
4639        self.check_interrupted_write_retry(b"x", mode="wb")
4640
4641    @requires_alarm
4642    def test_interrupted_write_retry_text(self):
4643        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4644
4645
4646class CSignalsTest(SignalsTest):
4647    io = io
4648
4649class PySignalsTest(SignalsTest):
4650    io = pyio
4651
4652    # Handling reentrancy issues would slow down _pyio even more, so the
4653    # tests are disabled.
4654    test_reentrant_write_buffered = None
4655    test_reentrant_write_text = None
4656
4657
4658def load_tests(loader, tests, pattern):
4659    tests = (CIOTest, PyIOTest, APIMismatchTest,
4660             CBufferedReaderTest, PyBufferedReaderTest,
4661             CBufferedWriterTest, PyBufferedWriterTest,
4662             CBufferedRWPairTest, PyBufferedRWPairTest,
4663             CBufferedRandomTest, PyBufferedRandomTest,
4664             StatefulIncrementalDecoderTest,
4665             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4666             CTextIOWrapperTest, PyTextIOWrapperTest,
4667             CMiscIOTest, PyMiscIOTest,
4668             CSignalsTest, PySignalsTest,
4669             )
4670
4671    # Put the namespaces of the IO module we are testing and some useful mock
4672    # classes in the __dict__ of each test.
4673    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
4674             MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4675             SlowFlushRawIO)
4676    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4677    c_io_ns = {name : getattr(io, name) for name in all_members}
4678    py_io_ns = {name : getattr(pyio, name) for name in all_members}
4679    globs = globals()
4680    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4681    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4682    for test in tests:
4683        if test.__name__.startswith("C"):
4684            for name, obj in c_io_ns.items():
4685                setattr(test, name, obj)
4686        elif test.__name__.startswith("Py"):
4687            for name, obj in py_io_ns.items():
4688                setattr(test, name, obj)
4689
4690    suite = loader.suiteClass()
4691    for test in tests:
4692        suite.addTest(loader.loadTestsFromTestCase(test))
4693    return suite
4694
4695if __name__ == "__main__":
4696    unittest.main()
4697