1# NOTE: this file tests the new `io` library backported from Python 3.x.
2# Similar tests for the builtin file object can be found in test_file2k.py.
3
4from __future__ import print_function
5
6import sys
7import os
8import unittest
9from array import array
10from weakref import proxy
11
12import io
13import _pyio as pyio
14
15from test.support import TESTFN, run_unittest
16from test import support
17from UserList import UserList
18
19class AutoFileTests(unittest.TestCase):
20    # file tests for which a test file is automatically set up
21
22    def setUp(self):
23        self.f = self.open(TESTFN, 'wb')
24
25    def tearDown(self):
26        if self.f:
27            self.f.close()
28        support.unlink(TESTFN)
29
30    def testWeakRefs(self):
31        # verify weak references
32        p = proxy(self.f)
33        p.write(b'teststring')
34        self.assertEqual(self.f.tell(), p.tell())
35        self.f.close()
36        self.f = None
37        self.assertRaises(ReferenceError, getattr, p, 'tell')
38
39    def testAttributes(self):
40        # verify expected attributes exist
41        f = self.f
42        f.name     # merely shouldn't blow up
43        f.mode     # ditto
44        f.closed   # ditto
45
46    def testReadinto(self):
47        # verify readinto
48        self.f.write(b'12')
49        self.f.close()
50        a = array('b', b'x'*10)
51        self.f = self.open(TESTFN, 'rb')
52        n = self.f.readinto(a)
53        self.assertEqual(b'12', a.tostring()[:n])
54
55    def testReadinto_text(self):
56        # verify readinto refuses text files
57        a = array('b', b'x'*10)
58        self.f.close()
59        self.f = self.open(TESTFN, 'r')
60        if hasattr(self.f, "readinto"):
61            self.assertRaises(TypeError, self.f.readinto, a)
62
63    def testWritelinesUserList(self):
64        # verify writelines with instance sequence
65        l = UserList([b'1', b'2'])
66        self.f.writelines(l)
67        self.f.close()
68        self.f = self.open(TESTFN, 'rb')
69        buf = self.f.read()
70        self.assertEqual(buf, b'12')
71
72    def testWritelinesIntegers(self):
73        # verify writelines with integers
74        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
75
76    def testWritelinesIntegersUserList(self):
77        # verify writelines with integers in UserList
78        l = UserList([1,2,3])
79        self.assertRaises(TypeError, self.f.writelines, l)
80
81    def testWritelinesNonString(self):
82        # verify writelines with non-string object
83        class NonString:
84            pass
85
86        self.assertRaises(TypeError, self.f.writelines,
87                          [NonString(), NonString()])
88
89    def testErrors(self):
90        f = self.f
91        self.assertEqual(f.name, TESTFN)
92        self.assertFalse(f.isatty())
93        self.assertFalse(f.closed)
94
95        if hasattr(f, "readinto"):
96            self.assertRaises((IOError, TypeError), f.readinto, "")
97        f.close()
98        self.assertTrue(f.closed)
99
100    def testMethods(self):
101        methods = [('fileno', ()),
102                   ('flush', ()),
103                   ('isatty', ()),
104                   ('next', ()),
105                   ('read', ()),
106                   ('write', (b"",)),
107                   ('readline', ()),
108                   ('readlines', ()),
109                   ('seek', (0,)),
110                   ('tell', ()),
111                   ('write', (b"",)),
112                   ('writelines', ([],)),
113                   ('__iter__', ()),
114                   ]
115        if not sys.platform.startswith('atheos'):
116            methods.append(('truncate', ()))
117
118        # __exit__ should close the file
119        self.f.__exit__(None, None, None)
120        self.assertTrue(self.f.closed)
121
122        for methodname, args in methods:
123            method = getattr(self.f, methodname)
124            # should raise on closed file
125            self.assertRaises(ValueError, method, *args)
126
127        # file is closed, __exit__ shouldn't do anything
128        self.assertEqual(self.f.__exit__(None, None, None), None)
129        # it must also return None if an exception was given
130        try:
131            1 // 0
132        except:
133            self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
134
135    def testReadWhenWriting(self):
136        self.assertRaises(IOError, self.f.read)
137
138class CAutoFileTests(AutoFileTests):
139    open = io.open
140
141class PyAutoFileTests(AutoFileTests):
142    open = staticmethod(pyio.open)
143
144
145class OtherFileTests(unittest.TestCase):
146
147    def tearDown(self):
148        support.unlink(TESTFN)
149
150    def testModeStrings(self):
151        # check invalid mode strings
152        self.open(TESTFN, 'wb').close()
153        for mode in ("", "aU", "wU+"):
154            try:
155                f = self.open(TESTFN, mode)
156            except ValueError:
157                pass
158            else:
159                f.close()
160                self.fail('%r is an invalid file mode' % mode)
161
162    def testBadModeArgument(self):
163        # verify that we get a sensible error message for bad mode argument
164        bad_mode = "qwerty"
165        try:
166            f = self.open(TESTFN, bad_mode)
167        except ValueError as msg:
168            if msg.args[0] != 0:
169                s = str(msg)
170                if TESTFN in s or bad_mode not in s:
171                    self.fail("bad error message for invalid mode: %s" % s)
172            # if msg.args[0] == 0, we're probably on Windows where there may be
173            # no obvious way to discover why open() failed.
174        else:
175            f.close()
176            self.fail("no error for invalid mode: %s" % bad_mode)
177
178    def testSetBufferSize(self):
179        # make sure that explicitly setting the buffer size doesn't cause
180        # misbehaviour especially with repeated close() calls
181        for s in (-1, 0, 1, 512):
182            try:
183                f = self.open(TESTFN, 'wb', s)
184                f.write(str(s).encode("ascii"))
185                f.close()
186                f.close()
187                f = self.open(TESTFN, 'rb', s)
188                d = int(f.read().decode("ascii"))
189                f.close()
190                f.close()
191            except IOError as msg:
192                self.fail('error setting buffer size %d: %s' % (s, str(msg)))
193            self.assertEqual(d, s)
194
195    def testTruncateOnWindows(self):
196        # SF bug <http://www.python.org/sf/801631>
197        # "file.truncate fault on windows"
198
199        f = self.open(TESTFN, 'wb')
200
201        try:
202            f.write(b'12345678901')   # 11 bytes
203            f.close()
204
205            f = self.open(TESTFN,'rb+')
206            data = f.read(5)
207            if data != b'12345':
208                self.fail("Read on file opened for update failed %r" % data)
209            if f.tell() != 5:
210                self.fail("File pos after read wrong %d" % f.tell())
211
212            f.truncate()
213            if f.tell() != 5:
214                self.fail("File pos after ftruncate wrong %d" % f.tell())
215
216            f.close()
217            size = os.path.getsize(TESTFN)
218            if size != 5:
219                self.fail("File size after ftruncate wrong %d" % size)
220        finally:
221            f.close()
222
223    def testIteration(self):
224        # Test the complex interaction when mixing file-iteration and the
225        # various read* methods.
226        dataoffset = 16384
227        filler = b"ham\n"
228        assert not dataoffset % len(filler), \
229            "dataoffset must be multiple of len(filler)"
230        nchunks = dataoffset // len(filler)
231        testlines = [
232            b"spam, spam and eggs\n",
233            b"eggs, spam, ham and spam\n",
234            b"saussages, spam, spam and eggs\n",
235            b"spam, ham, spam and eggs\n",
236            b"spam, spam, spam, spam, spam, ham, spam\n",
237            b"wonderful spaaaaaam.\n"
238        ]
239        methods = [("readline", ()), ("read", ()), ("readlines", ()),
240                   ("readinto", (array("b", b" "*100),))]
241
242        # Prepare the testfile
243        bag = self.open(TESTFN, "wb")
244        bag.write(filler * nchunks)
245        bag.writelines(testlines)
246        bag.close()
247        # Test for appropriate errors mixing read* and iteration
248        for methodname, args in methods:
249            f = self.open(TESTFN, 'rb')
250            self.assertEqual(next(f), filler)
251            meth = getattr(f, methodname)
252            meth(*args)  # This simply shouldn't fail
253            f.close()
254
255        # Test to see if harmless (by accident) mixing of read* and
256        # iteration still works. This depends on the size of the internal
257        # iteration buffer (currently 8192,) but we can test it in a
258        # flexible manner.  Each line in the bag o' ham is 4 bytes
259        # ("h", "a", "m", "\n"), so 4096 lines of that should get us
260        # exactly on the buffer boundary for any power-of-2 buffersize
261        # between 4 and 16384 (inclusive).
262        f = self.open(TESTFN, 'rb')
263        for i in range(nchunks):
264            next(f)
265        testline = testlines.pop(0)
266        try:
267            line = f.readline()
268        except ValueError:
269            self.fail("readline() after next() with supposedly empty "
270                        "iteration-buffer failed anyway")
271        if line != testline:
272            self.fail("readline() after next() with empty buffer "
273                        "failed. Got %r, expected %r" % (line, testline))
274        testline = testlines.pop(0)
275        buf = array("b", b"\x00" * len(testline))
276        try:
277            f.readinto(buf)
278        except ValueError:
279            self.fail("readinto() after next() with supposedly empty "
280                        "iteration-buffer failed anyway")
281        line = buf.tostring()
282        if line != testline:
283            self.fail("readinto() after next() with empty buffer "
284                        "failed. Got %r, expected %r" % (line, testline))
285
286        testline = testlines.pop(0)
287        try:
288            line = f.read(len(testline))
289        except ValueError:
290            self.fail("read() after next() with supposedly empty "
291                        "iteration-buffer failed anyway")
292        if line != testline:
293            self.fail("read() after next() with empty buffer "
294                        "failed. Got %r, expected %r" % (line, testline))
295        try:
296            lines = f.readlines()
297        except ValueError:
298            self.fail("readlines() after next() with supposedly empty "
299                        "iteration-buffer failed anyway")
300        if lines != testlines:
301            self.fail("readlines() after next() with empty buffer "
302                        "failed. Got %r, expected %r" % (line, testline))
303        # Reading after iteration hit EOF shouldn't hurt either
304        f.close()
305        f = self.open(TESTFN, 'rb')
306        try:
307            for line in f:
308                pass
309            try:
310                f.readline()
311                f.readinto(buf)
312                f.read()
313                f.readlines()
314            except ValueError:
315                self.fail("read* failed after next() consumed file")
316        finally:
317            f.close()
318
319class COtherFileTests(OtherFileTests):
320    open = io.open
321
322class PyOtherFileTests(OtherFileTests):
323    open = staticmethod(pyio.open)
324
325
326def test_main():
327    run_unittest(CAutoFileTests, PyAutoFileTests,
328                 COtherFileTests, PyOtherFileTests)
329
330if __name__ == '__main__':
331    test_main()
332