1"""Unit tests for the io module.""" 2 3# Tests of io are scattered over the test suite: 4# * test_bufio - tests file buffering 5# * test_memoryio - tests BytesIO and StringIO 6# * test_fileio - tests FileIO 7# * test_file - tests the file interface 8# * test_io - tests everything else in the io module 9# * test_univnewlines - tests universal newline support 10# * test_largefile - tests operations on a file greater than 2**32 bytes 11# (only enabled with -ulargefile) 12 13################################################################################ 14# ATTENTION TEST WRITERS!!! 15################################################################################ 16# When writing tests for io, it's important to test both the C and Python 17# implementations. This is usually done by writing a base test that refers to 18# the type it is testing as an attribute. Then it provides custom subclasses to 19# test both implementations. This file has lots of examples. 20################################################################################ 21 22from __future__ import print_function 23from __future__ import unicode_literals 24 25import os 26import sys 27import time 28import array 29import random 30import unittest 31import weakref 32import warnings 33import abc 34import signal 35import errno 36from itertools import cycle, count 37from collections import deque 38from UserList import UserList 39from test import test_support as support 40import contextlib 41 42import codecs 43import io # C implementation of io 44import _pyio as pyio # Python implementation of io 45try: 46 import threading 47except ImportError: 48 threading = None 49try: 50 import fcntl 51except ImportError: 52 fcntl = None 53 54__metaclass__ = type 55bytes = support.py3k_bytes 56 57def byteslike(*pos, **kw): 58 return memoryview(bytearray(*pos, **kw)) 59 60def _default_chunk_size(): 61 """Get the default TextIOWrapper chunk size""" 62 with io.open(__file__, "r", encoding="latin1") as f: 63 return f._CHUNK_SIZE 64 65 66class MockRawIOWithoutRead: 67 """A RawIO implementation without read(), so as to exercise the default 68 RawIO.read() which calls readinto().""" 69 70 def __init__(self, read_stack=()): 71 self._read_stack = list(read_stack) 72 self._write_stack = [] 73 self._reads = 0 74 self._extraneous_reads = 0 75 76 def write(self, b): 77 self._write_stack.append(bytes(b)) 78 return len(b) 79 80 def writable(self): 81 return True 82 83 def fileno(self): 84 return 42 85 86 def readable(self): 87 return True 88 89 def seekable(self): 90 return True 91 92 def seek(self, pos, whence): 93 return 0 # wrong but we gotta return something 94 95 def tell(self): 96 return 0 # same comment as above 97 98 def readinto(self, buf): 99 self._reads += 1 100 max_len = len(buf) 101 try: 102 data = self._read_stack[0] 103 except IndexError: 104 self._extraneous_reads += 1 105 return 0 106 if data is None: 107 del self._read_stack[0] 108 return None 109 n = len(data) 110 if len(data) <= max_len: 111 del self._read_stack[0] 112 buf[:n] = data 113 return n 114 else: 115 buf[:] = data[:max_len] 116 self._read_stack[0] = data[max_len:] 117 return max_len 118 119 def truncate(self, pos=None): 120 return pos 121 122class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 123 pass 124 125class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 126 pass 127 128 129class MockRawIO(MockRawIOWithoutRead): 130 131 def read(self, n=None): 132 self._reads += 1 133 try: 134 return self._read_stack.pop(0) 135 except: 136 self._extraneous_reads += 1 137 return b"" 138 139class CMockRawIO(MockRawIO, io.RawIOBase): 140 pass 141 142class PyMockRawIO(MockRawIO, pyio.RawIOBase): 143 pass 144 145 146class MisbehavedRawIO(MockRawIO): 147 def write(self, b): 148 return MockRawIO.write(self, b) * 2 149 150 def read(self, n=None): 151 return MockRawIO.read(self, n) * 2 152 153 def seek(self, pos, whence): 154 return -123 155 156 def tell(self): 157 return -456 158 159 def readinto(self, buf): 160 MockRawIO.readinto(self, buf) 161 return len(buf) * 5 162 163class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 164 pass 165 166class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 167 pass 168 169 170class CloseFailureIO(MockRawIO): 171 closed = 0 172 173 def close(self): 174 if not self.closed: 175 self.closed = 1 176 raise IOError 177 178class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 179 pass 180 181class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 182 pass 183 184 185class MockFileIO: 186 187 def __init__(self, data): 188 self.read_history = [] 189 super(MockFileIO, self).__init__(data) 190 191 def read(self, n=None): 192 res = super(MockFileIO, self).read(n) 193 self.read_history.append(None if res is None else len(res)) 194 return res 195 196 def readinto(self, b): 197 res = super(MockFileIO, self).readinto(b) 198 self.read_history.append(res) 199 return res 200 201class CMockFileIO(MockFileIO, io.BytesIO): 202 pass 203 204class PyMockFileIO(MockFileIO, pyio.BytesIO): 205 pass 206 207 208class MockNonBlockWriterIO: 209 210 def __init__(self): 211 self._write_stack = [] 212 self._blocker_char = None 213 214 def pop_written(self): 215 s = b"".join(self._write_stack) 216 self._write_stack[:] = [] 217 return s 218 219 def block_on(self, char): 220 """Block when a given char is encountered.""" 221 self._blocker_char = char 222 223 def readable(self): 224 return True 225 226 def seekable(self): 227 return True 228 229 def writable(self): 230 return True 231 232 def write(self, b): 233 b = bytes(b) 234 n = -1 235 if self._blocker_char: 236 try: 237 n = b.index(self._blocker_char) 238 except ValueError: 239 pass 240 else: 241 if n > 0: 242 # write data up to the first blocker 243 self._write_stack.append(b[:n]) 244 return n 245 else: 246 # cancel blocker and indicate would block 247 self._blocker_char = None 248 return None 249 self._write_stack.append(b) 250 return len(b) 251 252class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 253 BlockingIOError = io.BlockingIOError 254 255class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 256 BlockingIOError = pyio.BlockingIOError 257 258 259class IOTest(unittest.TestCase): 260 261 def setUp(self): 262 support.unlink(support.TESTFN) 263 264 def tearDown(self): 265 support.unlink(support.TESTFN) 266 267 def write_ops(self, f): 268 self.assertEqual(f.write(b"blah."), 5) 269 f.truncate(0) 270 self.assertEqual(f.tell(), 5) 271 f.seek(0) 272 273 self.assertEqual(f.write(b"blah."), 5) 274 self.assertEqual(f.seek(0), 0) 275 self.assertEqual(f.write(b"Hello."), 6) 276 self.assertEqual(f.tell(), 6) 277 self.assertEqual(f.seek(-1, 1), 5) 278 self.assertEqual(f.tell(), 5) 279 buffer = bytearray(b" world\n\n\n") 280 self.assertEqual(f.write(buffer), 9) 281 buffer[:] = b"*" * 9 # Overwrite our copy of the data 282 self.assertEqual(f.seek(0), 0) 283 self.assertEqual(f.write(b"h"), 1) 284 self.assertEqual(f.seek(-1, 2), 13) 285 self.assertEqual(f.tell(), 13) 286 287 self.assertEqual(f.truncate(12), 12) 288 self.assertEqual(f.tell(), 13) 289 self.assertRaises(TypeError, f.seek, 0.0) 290 291 def read_ops(self, f, buffered=False): 292 data = f.read(5) 293 self.assertEqual(data, b"hello") 294 data = byteslike(data) 295 self.assertEqual(f.readinto(data), 5) 296 self.assertEqual(data.tobytes(), b" worl") 297 data = bytearray(5) 298 self.assertEqual(f.readinto(data), 2) 299 self.assertEqual(len(data), 5) 300 self.assertEqual(data[:2], b"d\n") 301 self.assertEqual(f.seek(0), 0) 302 self.assertEqual(f.read(20), b"hello world\n") 303 self.assertEqual(f.read(1), b"") 304 self.assertEqual(f.readinto(byteslike(b"x")), 0) 305 self.assertEqual(f.seek(-6, 2), 6) 306 self.assertEqual(f.read(5), b"world") 307 self.assertEqual(f.read(0), b"") 308 self.assertEqual(f.readinto(byteslike()), 0) 309 self.assertEqual(f.seek(-6, 1), 5) 310 self.assertEqual(f.read(5), b" worl") 311 self.assertEqual(f.tell(), 10) 312 self.assertRaises(TypeError, f.seek, 0.0) 313 if buffered: 314 f.seek(0) 315 self.assertEqual(f.read(), b"hello world\n") 316 f.seek(6) 317 self.assertEqual(f.read(), b"world\n") 318 self.assertEqual(f.read(), b"") 319 320 LARGE = 2**31 321 322 def large_file_ops(self, f): 323 assert f.readable() 324 assert f.writable() 325 self.assertEqual(f.seek(self.LARGE), self.LARGE) 326 self.assertEqual(f.tell(), self.LARGE) 327 self.assertEqual(f.write(b"xxx"), 3) 328 self.assertEqual(f.tell(), self.LARGE + 3) 329 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 330 self.assertEqual(f.truncate(), self.LARGE + 2) 331 self.assertEqual(f.tell(), self.LARGE + 2) 332 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 333 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 334 self.assertEqual(f.tell(), self.LARGE + 2) 335 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 336 self.assertEqual(f.seek(-1, 2), self.LARGE) 337 self.assertEqual(f.read(2), b"x") 338 339 def test_invalid_operations(self): 340 # Try writing on a file opened in read mode and vice-versa. 341 for mode in ("w", "wb"): 342 with self.open(support.TESTFN, mode) as fp: 343 self.assertRaises(IOError, fp.read) 344 self.assertRaises(IOError, fp.readline) 345 with self.open(support.TESTFN, "rb") as fp: 346 self.assertRaises(IOError, fp.write, b"blah") 347 self.assertRaises(IOError, fp.writelines, [b"blah\n"]) 348 with self.open(support.TESTFN, "r") as fp: 349 self.assertRaises(IOError, fp.write, "blah") 350 self.assertRaises(IOError, fp.writelines, ["blah\n"]) 351 352 def test_open_handles_NUL_chars(self): 353 fn_with_NUL = 'foo\0bar' 354 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w') 355 356 bytes_fn = fn_with_NUL.encode('ascii') 357 with warnings.catch_warnings(): 358 warnings.simplefilter("ignore", DeprecationWarning) 359 self.assertRaises(TypeError, self.open, bytes_fn, 'w') 360 361 def test_raw_file_io(self): 362 with self.open(support.TESTFN, "wb", buffering=0) as f: 363 self.assertEqual(f.readable(), False) 364 self.assertEqual(f.writable(), True) 365 self.assertEqual(f.seekable(), True) 366 self.write_ops(f) 367 with self.open(support.TESTFN, "rb", buffering=0) as f: 368 self.assertEqual(f.readable(), True) 369 self.assertEqual(f.writable(), False) 370 self.assertEqual(f.seekable(), True) 371 self.read_ops(f) 372 373 def test_buffered_file_io(self): 374 with self.open(support.TESTFN, "wb") as f: 375 self.assertEqual(f.readable(), False) 376 self.assertEqual(f.writable(), True) 377 self.assertEqual(f.seekable(), True) 378 self.write_ops(f) 379 with self.open(support.TESTFN, "rb") as f: 380 self.assertEqual(f.readable(), True) 381 self.assertEqual(f.writable(), False) 382 self.assertEqual(f.seekable(), True) 383 self.read_ops(f, True) 384 385 def test_readline(self): 386 with self.open(support.TESTFN, "wb") as f: 387 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 388 with self.open(support.TESTFN, "rb") as f: 389 self.assertEqual(f.readline(), b"abc\n") 390 self.assertEqual(f.readline(10), b"def\n") 391 self.assertEqual(f.readline(2), b"xy") 392 self.assertEqual(f.readline(4), b"zzy\n") 393 self.assertEqual(f.readline(), b"foo\x00bar\n") 394 self.assertEqual(f.readline(None), b"another line") 395 self.assertRaises(TypeError, f.readline, 5.3) 396 with self.open(support.TESTFN, "r") as f: 397 self.assertRaises(TypeError, f.readline, 5.3) 398 399 def test_readline_nonsizeable(self): 400 # Issue #30061 401 # Crash when readline() returns an object without __len__ 402 class R(self.IOBase): 403 def readline(self): 404 return None 405 self.assertRaises((TypeError, StopIteration), next, R()) 406 407 def test_next_nonsizeable(self): 408 # Issue #30061 409 # Crash when next() returns an object without __len__ 410 class R(self.IOBase): 411 def next(self): 412 return None 413 self.assertRaises(TypeError, R().readlines, 1) 414 415 def test_raw_bytes_io(self): 416 f = self.BytesIO() 417 self.write_ops(f) 418 data = f.getvalue() 419 self.assertEqual(data, b"hello world\n") 420 f = self.BytesIO(data) 421 self.read_ops(f, True) 422 423 def test_large_file_ops(self): 424 # On Windows and Mac OSX this test comsumes large resources; It takes 425 # a long time to build the >2GB file and takes >2GB of disk space 426 # therefore the resource must be enabled to run this test. 427 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 428 support.requires( 429 'largefile', 430 'test requires %s bytes and a long time to run' % self.LARGE) 431 with self.open(support.TESTFN, "w+b", 0) as f: 432 self.large_file_ops(f) 433 with self.open(support.TESTFN, "w+b") as f: 434 self.large_file_ops(f) 435 436 def test_with_open(self): 437 for bufsize in (0, 1, 100): 438 f = None 439 with self.open(support.TESTFN, "wb", bufsize) as f: 440 f.write(b"xxx") 441 self.assertEqual(f.closed, True) 442 f = None 443 try: 444 with self.open(support.TESTFN, "wb", bufsize) as f: 445 1 // 0 446 except ZeroDivisionError: 447 self.assertEqual(f.closed, True) 448 else: 449 self.fail("1 // 0 didn't raise an exception") 450 451 # issue 5008 452 def test_append_mode_tell(self): 453 with self.open(support.TESTFN, "wb") as f: 454 f.write(b"xxx") 455 with self.open(support.TESTFN, "ab", buffering=0) as f: 456 self.assertEqual(f.tell(), 3) 457 with self.open(support.TESTFN, "ab") as f: 458 self.assertEqual(f.tell(), 3) 459 with self.open(support.TESTFN, "a") as f: 460 self.assertGreater(f.tell(), 0) 461 462 def test_destructor(self): 463 record = [] 464 class MyFileIO(self.FileIO): 465 def __del__(self): 466 record.append(1) 467 try: 468 f = super(MyFileIO, self).__del__ 469 except AttributeError: 470 pass 471 else: 472 f() 473 def close(self): 474 record.append(2) 475 super(MyFileIO, self).close() 476 def flush(self): 477 record.append(3) 478 super(MyFileIO, self).flush() 479 f = MyFileIO(support.TESTFN, "wb") 480 f.write(b"xxx") 481 del f 482 support.gc_collect() 483 self.assertEqual(record, [1, 2, 3]) 484 with self.open(support.TESTFN, "rb") as f: 485 self.assertEqual(f.read(), b"xxx") 486 487 def _check_base_destructor(self, base): 488 record = [] 489 class MyIO(base): 490 def __init__(self): 491 # This exercises the availability of attributes on object 492 # destruction. 493 # (in the C version, close() is called by the tp_dealloc 494 # function, not by __del__) 495 self.on_del = 1 496 self.on_close = 2 497 self.on_flush = 3 498 def __del__(self): 499 record.append(self.on_del) 500 try: 501 f = super(MyIO, self).__del__ 502 except AttributeError: 503 pass 504 else: 505 f() 506 def close(self): 507 record.append(self.on_close) 508 super(MyIO, self).close() 509 def flush(self): 510 record.append(self.on_flush) 511 super(MyIO, self).flush() 512 f = MyIO() 513 del f 514 support.gc_collect() 515 self.assertEqual(record, [1, 2, 3]) 516 517 def test_IOBase_destructor(self): 518 self._check_base_destructor(self.IOBase) 519 520 def test_RawIOBase_destructor(self): 521 self._check_base_destructor(self.RawIOBase) 522 523 def test_BufferedIOBase_destructor(self): 524 self._check_base_destructor(self.BufferedIOBase) 525 526 def test_TextIOBase_destructor(self): 527 self._check_base_destructor(self.TextIOBase) 528 529 def test_close_flushes(self): 530 with self.open(support.TESTFN, "wb") as f: 531 f.write(b"xxx") 532 with self.open(support.TESTFN, "rb") as f: 533 self.assertEqual(f.read(), b"xxx") 534 535 def test_array_writes(self): 536 a = array.array(b'i', range(10)) 537 n = len(a.tostring()) 538 with self.open(support.TESTFN, "wb", 0) as f: 539 self.assertEqual(f.write(a), n) 540 with self.open(support.TESTFN, "wb") as f: 541 self.assertEqual(f.write(a), n) 542 543 def test_closefd(self): 544 self.assertRaises(ValueError, self.open, support.TESTFN, 'w', 545 closefd=False) 546 547 def test_read_closed(self): 548 with self.open(support.TESTFN, "w") as f: 549 f.write("egg\n") 550 with self.open(support.TESTFN, "r") as f: 551 file = self.open(f.fileno(), "r", closefd=False) 552 self.assertEqual(file.read(), "egg\n") 553 file.seek(0) 554 file.close() 555 self.assertRaises(ValueError, file.read) 556 557 def test_no_closefd_with_filename(self): 558 # can't use closefd in combination with a file name 559 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) 560 561 def test_closefd_attr(self): 562 with self.open(support.TESTFN, "wb") as f: 563 f.write(b"egg\n") 564 with self.open(support.TESTFN, "r") as f: 565 self.assertEqual(f.buffer.raw.closefd, True) 566 file = self.open(f.fileno(), "r", closefd=False) 567 self.assertEqual(file.buffer.raw.closefd, False) 568 569 def test_garbage_collection(self): 570 # FileIO objects are collected, and collecting them flushes 571 # all data to disk. 572 f = self.FileIO(support.TESTFN, "wb") 573 f.write(b"abcxxx") 574 f.f = f 575 wr = weakref.ref(f) 576 del f 577 support.gc_collect() 578 self.assertIsNone(wr(), wr) 579 with self.open(support.TESTFN, "rb") as f: 580 self.assertEqual(f.read(), b"abcxxx") 581 582 def test_unbounded_file(self): 583 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 584 zero = "/dev/zero" 585 if not os.path.exists(zero): 586 self.skipTest("{0} does not exist".format(zero)) 587 if sys.maxsize > 0x7FFFFFFF: 588 self.skipTest("test can only run in a 32-bit address space") 589 if support.real_max_memuse < support._2G: 590 self.skipTest("test requires at least 2GB of memory") 591 with self.open(zero, "rb", buffering=0) as f: 592 self.assertRaises(OverflowError, f.read) 593 with self.open(zero, "rb") as f: 594 self.assertRaises(OverflowError, f.read) 595 with self.open(zero, "r") as f: 596 self.assertRaises(OverflowError, f.read) 597 598 def check_flush_error_on_close(self, *args, **kwargs): 599 # Test that the file is closed despite failed flush 600 # and that flush() is called before file closed. 601 f = self.open(*args, **kwargs) 602 closed = [] 603 def bad_flush(): 604 closed[:] = [f.closed] 605 raise IOError() 606 f.flush = bad_flush 607 self.assertRaises(IOError, f.close) # exception not swallowed 608 self.assertTrue(f.closed) 609 self.assertTrue(closed) # flush() called 610 self.assertFalse(closed[0]) # flush() called before file closed 611 f.flush = lambda: None # break reference loop 612 613 def test_flush_error_on_close(self): 614 # raw file 615 # Issue #5700: io.FileIO calls flush() after file closed 616 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0) 617 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 618 self.check_flush_error_on_close(fd, 'wb', buffering=0) 619 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 620 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) 621 os.close(fd) 622 # buffered io 623 self.check_flush_error_on_close(support.TESTFN, 'wb') 624 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 625 self.check_flush_error_on_close(fd, 'wb') 626 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 627 self.check_flush_error_on_close(fd, 'wb', closefd=False) 628 os.close(fd) 629 # text io 630 self.check_flush_error_on_close(support.TESTFN, 'w') 631 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 632 self.check_flush_error_on_close(fd, 'w') 633 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 634 self.check_flush_error_on_close(fd, 'w', closefd=False) 635 os.close(fd) 636 637 def test_multi_close(self): 638 f = self.open(support.TESTFN, "wb", buffering=0) 639 f.close() 640 f.close() 641 f.close() 642 self.assertRaises(ValueError, f.flush) 643 644 def test_RawIOBase_read(self): 645 # Exercise the default RawIOBase.read() implementation (which calls 646 # readinto() internally). 647 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 648 self.assertEqual(rawio.read(2), b"ab") 649 self.assertEqual(rawio.read(2), b"c") 650 self.assertEqual(rawio.read(2), b"d") 651 self.assertEqual(rawio.read(2), None) 652 self.assertEqual(rawio.read(2), b"ef") 653 self.assertEqual(rawio.read(2), b"g") 654 self.assertEqual(rawio.read(2), None) 655 self.assertEqual(rawio.read(2), b"") 656 657 def test_fileio_closefd(self): 658 # Issue #4841 659 with self.open(__file__, 'rb') as f1, \ 660 self.open(__file__, 'rb') as f2: 661 fileio = self.FileIO(f1.fileno(), closefd=False) 662 # .__init__() must not close f1 663 fileio.__init__(f2.fileno(), closefd=False) 664 f1.readline() 665 # .close() must not close f2 666 fileio.close() 667 f2.readline() 668 669 def test_nonbuffered_textio(self): 670 with warnings.catch_warnings(record=True) as recorded: 671 with self.assertRaises(ValueError): 672 self.open(support.TESTFN, 'w', buffering=0) 673 support.gc_collect() 674 self.assertEqual(recorded, []) 675 676 def test_invalid_newline(self): 677 with warnings.catch_warnings(record=True) as recorded: 678 with self.assertRaises(ValueError): 679 self.open(support.TESTFN, 'w', newline='invalid') 680 support.gc_collect() 681 self.assertEqual(recorded, []) 682 683 def test_buffered_readinto_mixin(self): 684 # Test the implementation provided by BufferedIOBase 685 class Stream(self.BufferedIOBase): 686 def read(self, size): 687 return b"12345" 688 stream = Stream() 689 buffer = byteslike(5) 690 self.assertEqual(stream.readinto(buffer), 5) 691 self.assertEqual(buffer.tobytes(), b"12345") 692 693 def test_close_assert(self): 694 class R(self.IOBase): 695 def __setattr__(self, name, value): 696 pass 697 def flush(self): 698 raise OSError() 699 f = R() 700 # This would cause an assertion failure. 701 self.assertRaises(OSError, f.close) 702 703 704class CIOTest(IOTest): 705 706 def test_IOBase_finalize(self): 707 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 708 # class which inherits IOBase and an object of this class are caught 709 # in a reference cycle and close() is already in the method cache. 710 class MyIO(self.IOBase): 711 def close(self): 712 pass 713 714 # create an instance to populate the method cache 715 MyIO() 716 obj = MyIO() 717 obj.obj = obj 718 wr = weakref.ref(obj) 719 del MyIO 720 del obj 721 support.gc_collect() 722 self.assertIsNone(wr(), wr) 723 724class PyIOTest(IOTest): 725 test_array_writes = unittest.skip( 726 "len(array.array) returns number of elements rather than bytelength" 727 )(IOTest.test_array_writes) 728 729 730class CommonBufferedTests: 731 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 732 733 def test_detach(self): 734 raw = self.MockRawIO() 735 buf = self.tp(raw) 736 self.assertIs(buf.detach(), raw) 737 self.assertRaises(ValueError, buf.detach) 738 739 repr(buf) # Should still work 740 741 def test_fileno(self): 742 rawio = self.MockRawIO() 743 bufio = self.tp(rawio) 744 745 self.assertEqual(42, bufio.fileno()) 746 747 def test_invalid_args(self): 748 rawio = self.MockRawIO() 749 bufio = self.tp(rawio) 750 # Invalid whence 751 self.assertRaises(ValueError, bufio.seek, 0, -1) 752 self.assertRaises(ValueError, bufio.seek, 0, 3) 753 754 def test_override_destructor(self): 755 tp = self.tp 756 record = [] 757 class MyBufferedIO(tp): 758 def __del__(self): 759 record.append(1) 760 try: 761 f = super(MyBufferedIO, self).__del__ 762 except AttributeError: 763 pass 764 else: 765 f() 766 def close(self): 767 record.append(2) 768 super(MyBufferedIO, self).close() 769 def flush(self): 770 record.append(3) 771 super(MyBufferedIO, self).flush() 772 rawio = self.MockRawIO() 773 bufio = MyBufferedIO(rawio) 774 writable = bufio.writable() 775 del bufio 776 support.gc_collect() 777 if writable: 778 self.assertEqual(record, [1, 2, 3]) 779 else: 780 self.assertEqual(record, [1, 2]) 781 782 def test_context_manager(self): 783 # Test usability as a context manager 784 rawio = self.MockRawIO() 785 bufio = self.tp(rawio) 786 def _with(): 787 with bufio: 788 pass 789 _with() 790 # bufio should now be closed, and using it a second time should raise 791 # a ValueError. 792 self.assertRaises(ValueError, _with) 793 794 def test_error_through_destructor(self): 795 # Test that the exception state is not modified by a destructor, 796 # even if close() fails. 797 rawio = self.CloseFailureIO() 798 def f(): 799 self.tp(rawio).xyzzy 800 with support.captured_output("stderr") as s: 801 self.assertRaises(AttributeError, f) 802 s = s.getvalue().strip() 803 if s: 804 # The destructor *may* have printed an unraisable error, check it 805 self.assertEqual(len(s.splitlines()), 1) 806 self.assertTrue(s.startswith("Exception IOError: "), s) 807 self.assertTrue(s.endswith(" ignored"), s) 808 809 def test_repr(self): 810 raw = self.MockRawIO() 811 b = self.tp(raw) 812 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) 813 self.assertEqual(repr(b), "<%s>" % clsname) 814 raw.name = "dummy" 815 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname) 816 raw.name = b"dummy" 817 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) 818 819 def test_flush_error_on_close(self): 820 # Test that buffered file is closed despite failed flush 821 # and that flush() is called before file closed. 822 raw = self.MockRawIO() 823 closed = [] 824 def bad_flush(): 825 closed[:] = [b.closed, raw.closed] 826 raise IOError() 827 raw.flush = bad_flush 828 b = self.tp(raw) 829 self.assertRaises(IOError, b.close) # exception not swallowed 830 self.assertTrue(b.closed) 831 self.assertTrue(raw.closed) 832 self.assertTrue(closed) # flush() called 833 self.assertFalse(closed[0]) # flush() called before file closed 834 self.assertFalse(closed[1]) 835 raw.flush = lambda: None # break reference loop 836 837 def test_close_error_on_close(self): 838 raw = self.MockRawIO() 839 def bad_flush(): 840 raise IOError('flush') 841 def bad_close(): 842 raise IOError('close') 843 raw.close = bad_close 844 b = self.tp(raw) 845 b.flush = bad_flush 846 with self.assertRaises(IOError) as err: # exception not swallowed 847 b.close() 848 self.assertEqual(err.exception.args, ('close',)) 849 self.assertFalse(b.closed) 850 851 def test_multi_close(self): 852 raw = self.MockRawIO() 853 b = self.tp(raw) 854 b.close() 855 b.close() 856 b.close() 857 self.assertRaises(ValueError, b.flush) 858 859 def test_readonly_attributes(self): 860 raw = self.MockRawIO() 861 buf = self.tp(raw) 862 x = self.MockRawIO() 863 with self.assertRaises((AttributeError, TypeError)): 864 buf.raw = x 865 866 867class SizeofTest: 868 869 @support.cpython_only 870 def test_sizeof(self): 871 bufsize1 = 4096 872 bufsize2 = 8192 873 rawio = self.MockRawIO() 874 bufio = self.tp(rawio, buffer_size=bufsize1) 875 size = sys.getsizeof(bufio) - bufsize1 876 rawio = self.MockRawIO() 877 bufio = self.tp(rawio, buffer_size=bufsize2) 878 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 879 880 881class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 882 read_mode = "rb" 883 884 def test_constructor(self): 885 rawio = self.MockRawIO([b"abc"]) 886 bufio = self.tp(rawio) 887 bufio.__init__(rawio) 888 bufio.__init__(rawio, buffer_size=1024) 889 bufio.__init__(rawio, buffer_size=16) 890 self.assertEqual(b"abc", bufio.read()) 891 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 892 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 893 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 894 rawio = self.MockRawIO([b"abc"]) 895 bufio.__init__(rawio) 896 self.assertEqual(b"abc", bufio.read()) 897 898 def test_uninitialized(self): 899 bufio = self.tp.__new__(self.tp) 900 del bufio 901 bufio = self.tp.__new__(self.tp) 902 self.assertRaisesRegexp((ValueError, AttributeError), 903 'uninitialized|has no attribute', 904 bufio.read, 0) 905 bufio.__init__(self.MockRawIO()) 906 self.assertEqual(bufio.read(0), b'') 907 908 def test_read(self): 909 for arg in (None, 7): 910 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 911 bufio = self.tp(rawio) 912 self.assertEqual(b"abcdefg", bufio.read(arg)) 913 # Invalid args 914 self.assertRaises(ValueError, bufio.read, -2) 915 916 def test_read1(self): 917 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 918 bufio = self.tp(rawio) 919 self.assertEqual(b"a", bufio.read(1)) 920 self.assertEqual(b"b", bufio.read1(1)) 921 self.assertEqual(rawio._reads, 1) 922 self.assertEqual(b"c", bufio.read1(100)) 923 self.assertEqual(rawio._reads, 1) 924 self.assertEqual(b"d", bufio.read1(100)) 925 self.assertEqual(rawio._reads, 2) 926 self.assertEqual(b"efg", bufio.read1(100)) 927 self.assertEqual(rawio._reads, 3) 928 self.assertEqual(b"", bufio.read1(100)) 929 self.assertEqual(rawio._reads, 4) 930 # Invalid args 931 self.assertRaises(ValueError, bufio.read1, -1) 932 933 def test_readinto(self): 934 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 935 bufio = self.tp(rawio) 936 b = bytearray(2) 937 self.assertEqual(bufio.readinto(b), 2) 938 self.assertEqual(b, b"ab") 939 self.assertEqual(bufio.readinto(b), 2) 940 self.assertEqual(b, b"cd") 941 self.assertEqual(bufio.readinto(b), 2) 942 self.assertEqual(b, b"ef") 943 self.assertEqual(bufio.readinto(b), 1) 944 self.assertEqual(b, b"gf") 945 self.assertEqual(bufio.readinto(b), 0) 946 self.assertEqual(b, b"gf") 947 948 def test_readlines(self): 949 def bufio(): 950 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 951 return self.tp(rawio) 952 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 953 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 954 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 955 956 def test_buffering(self): 957 data = b"abcdefghi" 958 dlen = len(data) 959 960 tests = [ 961 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 962 [ 100, [ 3, 3, 3], [ dlen ] ], 963 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 964 ] 965 966 for bufsize, buf_read_sizes, raw_read_sizes in tests: 967 rawio = self.MockFileIO(data) 968 bufio = self.tp(rawio, buffer_size=bufsize) 969 pos = 0 970 for nbytes in buf_read_sizes: 971 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 972 pos += nbytes 973 # this is mildly implementation-dependent 974 self.assertEqual(rawio.read_history, raw_read_sizes) 975 976 def test_read_non_blocking(self): 977 # Inject some None's in there to simulate EWOULDBLOCK 978 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 979 bufio = self.tp(rawio) 980 self.assertEqual(b"abcd", bufio.read(6)) 981 self.assertEqual(b"e", bufio.read(1)) 982 self.assertEqual(b"fg", bufio.read()) 983 self.assertEqual(b"", bufio.peek(1)) 984 self.assertIsNone(bufio.read()) 985 self.assertEqual(b"", bufio.read()) 986 987 rawio = self.MockRawIO((b"a", None, None)) 988 self.assertEqual(b"a", rawio.readall()) 989 self.assertIsNone(rawio.readall()) 990 991 def test_read_past_eof(self): 992 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 993 bufio = self.tp(rawio) 994 995 self.assertEqual(b"abcdefg", bufio.read(9000)) 996 997 def test_read_all(self): 998 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 999 bufio = self.tp(rawio) 1000 1001 self.assertEqual(b"abcdefg", bufio.read()) 1002 1003 @unittest.skipUnless(threading, 'Threading required for this test.') 1004 @support.requires_resource('cpu') 1005 def test_threads(self): 1006 try: 1007 # Write out many bytes with exactly the same number of 0's, 1008 # 1's... 255's. This will help us check that concurrent reading 1009 # doesn't duplicate or forget contents. 1010 N = 1000 1011 l = list(range(256)) * N 1012 random.shuffle(l) 1013 s = bytes(bytearray(l)) 1014 with self.open(support.TESTFN, "wb") as f: 1015 f.write(s) 1016 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: 1017 bufio = self.tp(raw, 8) 1018 errors = [] 1019 results = [] 1020 def f(): 1021 try: 1022 # Intra-buffer read then buffer-flushing read 1023 for n in cycle([1, 19]): 1024 s = bufio.read(n) 1025 if not s: 1026 break 1027 # list.append() is atomic 1028 results.append(s) 1029 except Exception as e: 1030 errors.append(e) 1031 raise 1032 threads = [threading.Thread(target=f) for x in range(20)] 1033 with support.start_threads(threads): 1034 time.sleep(0.02) # yield 1035 self.assertFalse(errors, 1036 "the following exceptions were caught: %r" % errors) 1037 s = b''.join(results) 1038 for i in range(256): 1039 c = bytes(bytearray([i])) 1040 self.assertEqual(s.count(c), N) 1041 finally: 1042 support.unlink(support.TESTFN) 1043 1044 def test_misbehaved_io(self): 1045 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1046 bufio = self.tp(rawio) 1047 self.assertRaises(IOError, bufio.seek, 0) 1048 self.assertRaises(IOError, bufio.tell) 1049 1050 def test_no_extraneous_read(self): 1051 # Issue #9550; when the raw IO object has satisfied the read request, 1052 # we should not issue any additional reads, otherwise it may block 1053 # (e.g. socket). 1054 bufsize = 16 1055 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 1056 rawio = self.MockRawIO([b"x" * n]) 1057 bufio = self.tp(rawio, bufsize) 1058 self.assertEqual(bufio.read(n), b"x" * n) 1059 # Simple case: one raw read is enough to satisfy the request. 1060 self.assertEqual(rawio._extraneous_reads, 0, 1061 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1062 # A more complex case where two raw reads are needed to satisfy 1063 # the request. 1064 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 1065 bufio = self.tp(rawio, bufsize) 1066 self.assertEqual(bufio.read(n), b"x" * n) 1067 self.assertEqual(rawio._extraneous_reads, 0, 1068 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1069 1070 1071class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 1072 tp = io.BufferedReader 1073 1074 def test_constructor(self): 1075 BufferedReaderTest.test_constructor(self) 1076 # The allocation can succeed on 32-bit builds, e.g. with more 1077 # than 2GB RAM and a 64-bit kernel. 1078 if sys.maxsize > 0x7FFFFFFF: 1079 rawio = self.MockRawIO() 1080 bufio = self.tp(rawio) 1081 self.assertRaises((OverflowError, MemoryError, ValueError), 1082 bufio.__init__, rawio, sys.maxsize) 1083 1084 def test_initialization(self): 1085 rawio = self.MockRawIO([b"abc"]) 1086 bufio = self.tp(rawio) 1087 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1088 self.assertRaises(ValueError, bufio.read) 1089 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1090 self.assertRaises(ValueError, bufio.read) 1091 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1092 self.assertRaises(ValueError, bufio.read) 1093 1094 def test_misbehaved_io_read(self): 1095 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1096 bufio = self.tp(rawio) 1097 # _pyio.BufferedReader seems to implement reading different, so that 1098 # checking this is not so easy. 1099 self.assertRaises(IOError, bufio.read, 10) 1100 1101 def test_garbage_collection(self): 1102 # C BufferedReader objects are collected. 1103 # The Python version has __del__, so it ends into gc.garbage instead 1104 self.addCleanup(support.unlink, support.TESTFN) 1105 rawio = self.FileIO(support.TESTFN, "w+b") 1106 f = self.tp(rawio) 1107 f.f = f 1108 wr = weakref.ref(f) 1109 del f 1110 support.gc_collect() 1111 self.assertIsNone(wr(), wr) 1112 1113 def test_args_error(self): 1114 # Issue #17275 1115 with self.assertRaisesRegexp(TypeError, "BufferedReader"): 1116 self.tp(io.BytesIO(), 1024, 1024, 1024) 1117 1118 1119class PyBufferedReaderTest(BufferedReaderTest): 1120 tp = pyio.BufferedReader 1121 1122 1123class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1124 write_mode = "wb" 1125 1126 def test_constructor(self): 1127 rawio = self.MockRawIO() 1128 bufio = self.tp(rawio) 1129 bufio.__init__(rawio) 1130 bufio.__init__(rawio, buffer_size=1024) 1131 bufio.__init__(rawio, buffer_size=16) 1132 self.assertEqual(3, bufio.write(b"abc")) 1133 bufio.flush() 1134 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1135 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1136 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1137 bufio.__init__(rawio) 1138 self.assertEqual(3, bufio.write(b"ghi")) 1139 bufio.flush() 1140 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1141 1142 def test_uninitialized(self): 1143 bufio = self.tp.__new__(self.tp) 1144 del bufio 1145 bufio = self.tp.__new__(self.tp) 1146 self.assertRaisesRegexp((ValueError, AttributeError), 1147 'uninitialized|has no attribute', 1148 bufio.write, b'') 1149 bufio.__init__(self.MockRawIO()) 1150 self.assertEqual(bufio.write(b''), 0) 1151 1152 def test_detach_flush(self): 1153 raw = self.MockRawIO() 1154 buf = self.tp(raw) 1155 buf.write(b"howdy!") 1156 self.assertFalse(raw._write_stack) 1157 buf.detach() 1158 self.assertEqual(raw._write_stack, [b"howdy!"]) 1159 1160 def test_write(self): 1161 # Write to the buffered IO but don't overflow the buffer. 1162 writer = self.MockRawIO() 1163 bufio = self.tp(writer, 8) 1164 bufio.write(b"abc") 1165 self.assertFalse(writer._write_stack) 1166 buffer = bytearray(b"def") 1167 bufio.write(buffer) 1168 buffer[:] = b"***" # Overwrite our copy of the data 1169 bufio.flush() 1170 self.assertEqual(b"".join(writer._write_stack), b"abcdef") 1171 1172 def test_write_overflow(self): 1173 writer = self.MockRawIO() 1174 bufio = self.tp(writer, 8) 1175 contents = b"abcdefghijklmnop" 1176 for n in range(0, len(contents), 3): 1177 bufio.write(contents[n:n+3]) 1178 flushed = b"".join(writer._write_stack) 1179 # At least (total - 8) bytes were implicitly flushed, perhaps more 1180 # depending on the implementation. 1181 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1182 1183 def check_writes(self, intermediate_func): 1184 # Lots of writes, test the flushed output is as expected. 1185 contents = bytes(range(256)) * 1000 1186 n = 0 1187 writer = self.MockRawIO() 1188 bufio = self.tp(writer, 13) 1189 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1190 def gen_sizes(): 1191 for size in count(1): 1192 for i in range(15): 1193 yield size 1194 sizes = gen_sizes() 1195 while n < len(contents): 1196 size = min(next(sizes), len(contents) - n) 1197 self.assertEqual(bufio.write(contents[n:n+size]), size) 1198 intermediate_func(bufio) 1199 n += size 1200 bufio.flush() 1201 self.assertEqual(contents, 1202 b"".join(writer._write_stack)) 1203 1204 def test_writes(self): 1205 self.check_writes(lambda bufio: None) 1206 1207 def test_writes_and_flushes(self): 1208 self.check_writes(lambda bufio: bufio.flush()) 1209 1210 def test_writes_and_seeks(self): 1211 def _seekabs(bufio): 1212 pos = bufio.tell() 1213 bufio.seek(pos + 1, 0) 1214 bufio.seek(pos - 1, 0) 1215 bufio.seek(pos, 0) 1216 self.check_writes(_seekabs) 1217 def _seekrel(bufio): 1218 pos = bufio.seek(0, 1) 1219 bufio.seek(+1, 1) 1220 bufio.seek(-1, 1) 1221 bufio.seek(pos, 0) 1222 self.check_writes(_seekrel) 1223 1224 def test_writes_and_truncates(self): 1225 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1226 1227 def test_write_non_blocking(self): 1228 raw = self.MockNonBlockWriterIO() 1229 bufio = self.tp(raw, 8) 1230 1231 self.assertEqual(bufio.write(b"abcd"), 4) 1232 self.assertEqual(bufio.write(b"efghi"), 5) 1233 # 1 byte will be written, the rest will be buffered 1234 raw.block_on(b"k") 1235 self.assertEqual(bufio.write(b"jklmn"), 5) 1236 1237 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1238 raw.block_on(b"0") 1239 try: 1240 bufio.write(b"opqrwxyz0123456789") 1241 except self.BlockingIOError as e: 1242 written = e.characters_written 1243 else: 1244 self.fail("BlockingIOError should have been raised") 1245 self.assertEqual(written, 16) 1246 self.assertEqual(raw.pop_written(), 1247 b"abcdefghijklmnopqrwxyz") 1248 1249 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1250 s = raw.pop_written() 1251 # Previously buffered bytes were flushed 1252 self.assertTrue(s.startswith(b"01234567A"), s) 1253 1254 def test_write_and_rewind(self): 1255 raw = io.BytesIO() 1256 bufio = self.tp(raw, 4) 1257 self.assertEqual(bufio.write(b"abcdef"), 6) 1258 self.assertEqual(bufio.tell(), 6) 1259 bufio.seek(0, 0) 1260 self.assertEqual(bufio.write(b"XY"), 2) 1261 bufio.seek(6, 0) 1262 self.assertEqual(raw.getvalue(), b"XYcdef") 1263 self.assertEqual(bufio.write(b"123456"), 6) 1264 bufio.flush() 1265 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1266 1267 def test_flush(self): 1268 writer = self.MockRawIO() 1269 bufio = self.tp(writer, 8) 1270 bufio.write(b"abc") 1271 bufio.flush() 1272 self.assertEqual(b"abc", writer._write_stack[0]) 1273 1274 def test_writelines(self): 1275 l = [b'ab', b'cd', b'ef'] 1276 writer = self.MockRawIO() 1277 bufio = self.tp(writer, 8) 1278 bufio.writelines(l) 1279 bufio.flush() 1280 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1281 1282 def test_writelines_userlist(self): 1283 l = UserList([b'ab', b'cd', b'ef']) 1284 writer = self.MockRawIO() 1285 bufio = self.tp(writer, 8) 1286 bufio.writelines(l) 1287 bufio.flush() 1288 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1289 1290 def test_writelines_error(self): 1291 writer = self.MockRawIO() 1292 bufio = self.tp(writer, 8) 1293 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1294 self.assertRaises(TypeError, bufio.writelines, None) 1295 1296 def test_destructor(self): 1297 writer = self.MockRawIO() 1298 bufio = self.tp(writer, 8) 1299 bufio.write(b"abc") 1300 del bufio 1301 support.gc_collect() 1302 self.assertEqual(b"abc", writer._write_stack[0]) 1303 1304 def test_truncate(self): 1305 # Truncate implicitly flushes the buffer. 1306 self.addCleanup(support.unlink, support.TESTFN) 1307 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1308 bufio = self.tp(raw, 8) 1309 bufio.write(b"abcdef") 1310 self.assertEqual(bufio.truncate(3), 3) 1311 self.assertEqual(bufio.tell(), 6) 1312 with self.open(support.TESTFN, "rb", buffering=0) as f: 1313 self.assertEqual(f.read(), b"abc") 1314 1315 @unittest.skipUnless(threading, 'Threading required for this test.') 1316 @support.requires_resource('cpu') 1317 def test_threads(self): 1318 try: 1319 # Write out many bytes from many threads and test they were 1320 # all flushed. 1321 N = 1000 1322 contents = bytes(range(256)) * N 1323 sizes = cycle([1, 19]) 1324 n = 0 1325 queue = deque() 1326 while n < len(contents): 1327 size = next(sizes) 1328 queue.append(contents[n:n+size]) 1329 n += size 1330 del contents 1331 # We use a real file object because it allows us to 1332 # exercise situations where the GIL is released before 1333 # writing the buffer to the raw streams. This is in addition 1334 # to concurrency issues due to switching threads in the middle 1335 # of Python code. 1336 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1337 bufio = self.tp(raw, 8) 1338 errors = [] 1339 def f(): 1340 try: 1341 while True: 1342 try: 1343 s = queue.popleft() 1344 except IndexError: 1345 return 1346 bufio.write(s) 1347 except Exception as e: 1348 errors.append(e) 1349 raise 1350 threads = [threading.Thread(target=f) for x in range(20)] 1351 with support.start_threads(threads): 1352 time.sleep(0.02) # yield 1353 self.assertFalse(errors, 1354 "the following exceptions were caught: %r" % errors) 1355 bufio.close() 1356 with self.open(support.TESTFN, "rb") as f: 1357 s = f.read() 1358 for i in range(256): 1359 self.assertEqual(s.count(bytes([i])), N) 1360 finally: 1361 support.unlink(support.TESTFN) 1362 1363 def test_misbehaved_io(self): 1364 rawio = self.MisbehavedRawIO() 1365 bufio = self.tp(rawio, 5) 1366 self.assertRaises(IOError, bufio.seek, 0) 1367 self.assertRaises(IOError, bufio.tell) 1368 self.assertRaises(IOError, bufio.write, b"abcdef") 1369 1370 def test_max_buffer_size_deprecation(self): 1371 with support.check_warnings(("max_buffer_size is deprecated", 1372 DeprecationWarning)): 1373 self.tp(self.MockRawIO(), 8, 12) 1374 1375 def test_write_error_on_close(self): 1376 raw = self.MockRawIO() 1377 def bad_write(b): 1378 raise IOError() 1379 raw.write = bad_write 1380 b = self.tp(raw) 1381 b.write(b'spam') 1382 self.assertRaises(IOError, b.close) # exception not swallowed 1383 self.assertTrue(b.closed) 1384 1385 1386class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1387 tp = io.BufferedWriter 1388 1389 def test_constructor(self): 1390 BufferedWriterTest.test_constructor(self) 1391 # The allocation can succeed on 32-bit builds, e.g. with more 1392 # than 2GB RAM and a 64-bit kernel. 1393 if sys.maxsize > 0x7FFFFFFF: 1394 rawio = self.MockRawIO() 1395 bufio = self.tp(rawio) 1396 self.assertRaises((OverflowError, MemoryError, ValueError), 1397 bufio.__init__, rawio, sys.maxsize) 1398 1399 def test_initialization(self): 1400 rawio = self.MockRawIO() 1401 bufio = self.tp(rawio) 1402 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1403 self.assertRaises(ValueError, bufio.write, b"def") 1404 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1405 self.assertRaises(ValueError, bufio.write, b"def") 1406 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1407 self.assertRaises(ValueError, bufio.write, b"def") 1408 1409 def test_garbage_collection(self): 1410 # C BufferedWriter objects are collected, and collecting them flushes 1411 # all data to disk. 1412 # The Python version has __del__, so it ends into gc.garbage instead 1413 self.addCleanup(support.unlink, support.TESTFN) 1414 rawio = self.FileIO(support.TESTFN, "w+b") 1415 f = self.tp(rawio) 1416 f.write(b"123xxx") 1417 f.x = f 1418 wr = weakref.ref(f) 1419 del f 1420 support.gc_collect() 1421 self.assertIsNone(wr(), wr) 1422 with self.open(support.TESTFN, "rb") as f: 1423 self.assertEqual(f.read(), b"123xxx") 1424 1425 def test_args_error(self): 1426 # Issue #17275 1427 with self.assertRaisesRegexp(TypeError, "BufferedWriter"): 1428 self.tp(io.BytesIO(), 1024, 1024, 1024) 1429 1430 1431class PyBufferedWriterTest(BufferedWriterTest): 1432 tp = pyio.BufferedWriter 1433 1434class BufferedRWPairTest(unittest.TestCase): 1435 1436 def test_constructor(self): 1437 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1438 self.assertFalse(pair.closed) 1439 1440 def test_uninitialized(self): 1441 pair = self.tp.__new__(self.tp) 1442 del pair 1443 pair = self.tp.__new__(self.tp) 1444 self.assertRaisesRegexp((ValueError, AttributeError), 1445 'uninitialized|has no attribute', 1446 pair.read, 0) 1447 self.assertRaisesRegexp((ValueError, AttributeError), 1448 'uninitialized|has no attribute', 1449 pair.write, b'') 1450 pair.__init__(self.MockRawIO(), self.MockRawIO()) 1451 self.assertEqual(pair.read(0), b'') 1452 self.assertEqual(pair.write(b''), 0) 1453 1454 def test_detach(self): 1455 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1456 self.assertRaises(self.UnsupportedOperation, pair.detach) 1457 1458 def test_constructor_max_buffer_size_deprecation(self): 1459 with support.check_warnings(("max_buffer_size is deprecated", 1460 DeprecationWarning)): 1461 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1462 1463 def test_constructor_with_not_readable(self): 1464 class NotReadable(MockRawIO): 1465 def readable(self): 1466 return False 1467 1468 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO()) 1469 1470 def test_constructor_with_not_writeable(self): 1471 class NotWriteable(MockRawIO): 1472 def writable(self): 1473 return False 1474 1475 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable()) 1476 1477 def test_read(self): 1478 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1479 1480 self.assertEqual(pair.read(3), b"abc") 1481 self.assertEqual(pair.read(1), b"d") 1482 self.assertEqual(pair.read(), b"ef") 1483 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 1484 self.assertEqual(pair.read(None), b"abc") 1485 1486 def test_readlines(self): 1487 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 1488 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1489 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1490 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 1491 1492 def test_read1(self): 1493 # .read1() is delegated to the underlying reader object, so this test 1494 # can be shallow. 1495 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1496 1497 self.assertEqual(pair.read1(3), b"abc") 1498 1499 def test_readinto(self): 1500 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1501 1502 data = byteslike(5) 1503 self.assertEqual(pair.readinto(data), 5) 1504 self.assertEqual(data.tobytes(), b"abcde") 1505 1506 def test_write(self): 1507 w = self.MockRawIO() 1508 pair = self.tp(self.MockRawIO(), w) 1509 1510 pair.write(b"abc") 1511 pair.flush() 1512 buffer = bytearray(b"def") 1513 pair.write(buffer) 1514 buffer[:] = b"***" # Overwrite our copy of the data 1515 pair.flush() 1516 self.assertEqual(w._write_stack, [b"abc", b"def"]) 1517 1518 def test_peek(self): 1519 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1520 1521 self.assertTrue(pair.peek(3).startswith(b"abc")) 1522 self.assertEqual(pair.read(3), b"abc") 1523 1524 def test_readable(self): 1525 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1526 self.assertTrue(pair.readable()) 1527 1528 def test_writeable(self): 1529 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1530 self.assertTrue(pair.writable()) 1531 1532 def test_seekable(self): 1533 # BufferedRWPairs are never seekable, even if their readers and writers 1534 # are. 1535 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1536 self.assertFalse(pair.seekable()) 1537 1538 # .flush() is delegated to the underlying writer object and has been 1539 # tested in the test_write method. 1540 1541 def test_close_and_closed(self): 1542 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1543 self.assertFalse(pair.closed) 1544 pair.close() 1545 self.assertTrue(pair.closed) 1546 1547 def test_reader_close_error_on_close(self): 1548 def reader_close(): 1549 reader_non_existing 1550 reader = self.MockRawIO() 1551 reader.close = reader_close 1552 writer = self.MockRawIO() 1553 pair = self.tp(reader, writer) 1554 with self.assertRaises(NameError) as err: 1555 pair.close() 1556 self.assertIn('reader_non_existing', str(err.exception)) 1557 self.assertTrue(pair.closed) 1558 self.assertFalse(reader.closed) 1559 self.assertTrue(writer.closed) 1560 1561 def test_writer_close_error_on_close(self): 1562 def writer_close(): 1563 writer_non_existing 1564 reader = self.MockRawIO() 1565 writer = self.MockRawIO() 1566 writer.close = writer_close 1567 pair = self.tp(reader, writer) 1568 with self.assertRaises(NameError) as err: 1569 pair.close() 1570 self.assertIn('writer_non_existing', str(err.exception)) 1571 self.assertFalse(pair.closed) 1572 self.assertTrue(reader.closed) 1573 self.assertFalse(writer.closed) 1574 1575 def test_reader_writer_close_error_on_close(self): 1576 def reader_close(): 1577 reader_non_existing 1578 def writer_close(): 1579 writer_non_existing 1580 reader = self.MockRawIO() 1581 reader.close = reader_close 1582 writer = self.MockRawIO() 1583 writer.close = writer_close 1584 pair = self.tp(reader, writer) 1585 with self.assertRaises(NameError) as err: 1586 pair.close() 1587 self.assertIn('reader_non_existing', str(err.exception)) 1588 self.assertFalse(pair.closed) 1589 self.assertFalse(reader.closed) 1590 self.assertFalse(writer.closed) 1591 1592 def test_isatty(self): 1593 class SelectableIsAtty(MockRawIO): 1594 def __init__(self, isatty): 1595 MockRawIO.__init__(self) 1596 self._isatty = isatty 1597 1598 def isatty(self): 1599 return self._isatty 1600 1601 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 1602 self.assertFalse(pair.isatty()) 1603 1604 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 1605 self.assertTrue(pair.isatty()) 1606 1607 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 1608 self.assertTrue(pair.isatty()) 1609 1610 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 1611 self.assertTrue(pair.isatty()) 1612 1613 def test_weakref_clearing(self): 1614 brw = self.tp(self.MockRawIO(), self.MockRawIO()) 1615 ref = weakref.ref(brw) 1616 brw = None 1617 ref = None # Shouldn't segfault. 1618 1619class CBufferedRWPairTest(BufferedRWPairTest): 1620 tp = io.BufferedRWPair 1621 1622class PyBufferedRWPairTest(BufferedRWPairTest): 1623 tp = pyio.BufferedRWPair 1624 1625 1626class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 1627 read_mode = "rb+" 1628 write_mode = "wb+" 1629 1630 def test_constructor(self): 1631 BufferedReaderTest.test_constructor(self) 1632 BufferedWriterTest.test_constructor(self) 1633 1634 def test_uninitialized(self): 1635 BufferedReaderTest.test_uninitialized(self) 1636 BufferedWriterTest.test_uninitialized(self) 1637 1638 def test_read_and_write(self): 1639 raw = self.MockRawIO((b"asdf", b"ghjk")) 1640 rw = self.tp(raw, 8) 1641 1642 self.assertEqual(b"as", rw.read(2)) 1643 rw.write(b"ddd") 1644 rw.write(b"eee") 1645 self.assertFalse(raw._write_stack) # Buffer writes 1646 self.assertEqual(b"ghjk", rw.read()) 1647 self.assertEqual(b"dddeee", raw._write_stack[0]) 1648 1649 def test_seek_and_tell(self): 1650 raw = self.BytesIO(b"asdfghjkl") 1651 rw = self.tp(raw) 1652 1653 self.assertEqual(b"as", rw.read(2)) 1654 self.assertEqual(2, rw.tell()) 1655 rw.seek(0, 0) 1656 self.assertEqual(b"asdf", rw.read(4)) 1657 1658 rw.write(b"123f") 1659 rw.seek(0, 0) 1660 self.assertEqual(b"asdf123fl", rw.read()) 1661 self.assertEqual(9, rw.tell()) 1662 rw.seek(-4, 2) 1663 self.assertEqual(5, rw.tell()) 1664 rw.seek(2, 1) 1665 self.assertEqual(7, rw.tell()) 1666 self.assertEqual(b"fl", rw.read(11)) 1667 rw.flush() 1668 self.assertEqual(b"asdf123fl", raw.getvalue()) 1669 1670 self.assertRaises(TypeError, rw.seek, 0.0) 1671 1672 def check_flush_and_read(self, read_func): 1673 raw = self.BytesIO(b"abcdefghi") 1674 bufio = self.tp(raw) 1675 1676 self.assertEqual(b"ab", read_func(bufio, 2)) 1677 bufio.write(b"12") 1678 self.assertEqual(b"ef", read_func(bufio, 2)) 1679 self.assertEqual(6, bufio.tell()) 1680 bufio.flush() 1681 self.assertEqual(6, bufio.tell()) 1682 self.assertEqual(b"ghi", read_func(bufio)) 1683 raw.seek(0, 0) 1684 raw.write(b"XYZ") 1685 # flush() resets the read buffer 1686 bufio.flush() 1687 bufio.seek(0, 0) 1688 self.assertEqual(b"XYZ", read_func(bufio, 3)) 1689 1690 def test_flush_and_read(self): 1691 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 1692 1693 def test_flush_and_readinto(self): 1694 def _readinto(bufio, n=-1): 1695 b = bytearray(n if n >= 0 else 9999) 1696 n = bufio.readinto(b) 1697 return bytes(b[:n]) 1698 self.check_flush_and_read(_readinto) 1699 1700 def test_flush_and_peek(self): 1701 def _peek(bufio, n=-1): 1702 # This relies on the fact that the buffer can contain the whole 1703 # raw stream, otherwise peek() can return less. 1704 b = bufio.peek(n) 1705 if n != -1: 1706 b = b[:n] 1707 bufio.seek(len(b), 1) 1708 return b 1709 self.check_flush_and_read(_peek) 1710 1711 def test_flush_and_write(self): 1712 raw = self.BytesIO(b"abcdefghi") 1713 bufio = self.tp(raw) 1714 1715 bufio.write(b"123") 1716 bufio.flush() 1717 bufio.write(b"45") 1718 bufio.flush() 1719 bufio.seek(0, 0) 1720 self.assertEqual(b"12345fghi", raw.getvalue()) 1721 self.assertEqual(b"12345fghi", bufio.read()) 1722 1723 def test_threads(self): 1724 BufferedReaderTest.test_threads(self) 1725 BufferedWriterTest.test_threads(self) 1726 1727 def test_writes_and_peek(self): 1728 def _peek(bufio): 1729 bufio.peek(1) 1730 self.check_writes(_peek) 1731 def _peek(bufio): 1732 pos = bufio.tell() 1733 bufio.seek(-1, 1) 1734 bufio.peek(1) 1735 bufio.seek(pos, 0) 1736 self.check_writes(_peek) 1737 1738 def test_writes_and_reads(self): 1739 def _read(bufio): 1740 bufio.seek(-1, 1) 1741 bufio.read(1) 1742 self.check_writes(_read) 1743 1744 def test_writes_and_read1s(self): 1745 def _read1(bufio): 1746 bufio.seek(-1, 1) 1747 bufio.read1(1) 1748 self.check_writes(_read1) 1749 1750 def test_writes_and_readintos(self): 1751 def _read(bufio): 1752 bufio.seek(-1, 1) 1753 bufio.readinto(bytearray(1)) 1754 self.check_writes(_read) 1755 1756 def test_write_after_readahead(self): 1757 # Issue #6629: writing after the buffer was filled by readahead should 1758 # first rewind the raw stream. 1759 for overwrite_size in [1, 5]: 1760 raw = self.BytesIO(b"A" * 10) 1761 bufio = self.tp(raw, 4) 1762 # Trigger readahead 1763 self.assertEqual(bufio.read(1), b"A") 1764 self.assertEqual(bufio.tell(), 1) 1765 # Overwriting should rewind the raw stream if it needs so 1766 bufio.write(b"B" * overwrite_size) 1767 self.assertEqual(bufio.tell(), overwrite_size + 1) 1768 # If the write size was smaller than the buffer size, flush() and 1769 # check that rewind happens. 1770 bufio.flush() 1771 self.assertEqual(bufio.tell(), overwrite_size + 1) 1772 s = raw.getvalue() 1773 self.assertEqual(s, 1774 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 1775 1776 def test_write_rewind_write(self): 1777 # Various combinations of reading / writing / seeking backwards / writing again 1778 def mutate(bufio, pos1, pos2): 1779 assert pos2 >= pos1 1780 # Fill the buffer 1781 bufio.seek(pos1) 1782 bufio.read(pos2 - pos1) 1783 bufio.write(b'\x02') 1784 # This writes earlier than the previous write, but still inside 1785 # the buffer. 1786 bufio.seek(pos1) 1787 bufio.write(b'\x01') 1788 1789 b = b"\x80\x81\x82\x83\x84" 1790 for i in range(0, len(b)): 1791 for j in range(i, len(b)): 1792 raw = self.BytesIO(b) 1793 bufio = self.tp(raw, 100) 1794 mutate(bufio, i, j) 1795 bufio.flush() 1796 expected = bytearray(b) 1797 expected[j] = 2 1798 expected[i] = 1 1799 self.assertEqual(raw.getvalue(), expected, 1800 "failed result for i=%d, j=%d" % (i, j)) 1801 1802 def test_truncate_after_read_or_write(self): 1803 raw = self.BytesIO(b"A" * 10) 1804 bufio = self.tp(raw, 100) 1805 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 1806 self.assertEqual(bufio.truncate(), 2) 1807 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 1808 self.assertEqual(bufio.truncate(), 4) 1809 1810 def test_misbehaved_io(self): 1811 BufferedReaderTest.test_misbehaved_io(self) 1812 BufferedWriterTest.test_misbehaved_io(self) 1813 1814 def test_interleaved_read_write(self): 1815 # Test for issue #12213 1816 with self.BytesIO(b'abcdefgh') as raw: 1817 with self.tp(raw, 100) as f: 1818 f.write(b"1") 1819 self.assertEqual(f.read(1), b'b') 1820 f.write(b'2') 1821 self.assertEqual(f.read1(1), b'd') 1822 f.write(b'3') 1823 buf = bytearray(1) 1824 f.readinto(buf) 1825 self.assertEqual(buf, b'f') 1826 f.write(b'4') 1827 self.assertEqual(f.peek(1), b'h') 1828 f.flush() 1829 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 1830 1831 with self.BytesIO(b'abc') as raw: 1832 with self.tp(raw, 100) as f: 1833 self.assertEqual(f.read(1), b'a') 1834 f.write(b"2") 1835 self.assertEqual(f.read(1), b'c') 1836 f.flush() 1837 self.assertEqual(raw.getvalue(), b'a2c') 1838 1839 def test_interleaved_readline_write(self): 1840 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 1841 with self.tp(raw) as f: 1842 f.write(b'1') 1843 self.assertEqual(f.readline(), b'b\n') 1844 f.write(b'2') 1845 self.assertEqual(f.readline(), b'def\n') 1846 f.write(b'3') 1847 self.assertEqual(f.readline(), b'\n') 1848 f.flush() 1849 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 1850 1851 1852class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, 1853 BufferedRandomTest, SizeofTest): 1854 tp = io.BufferedRandom 1855 1856 def test_constructor(self): 1857 BufferedRandomTest.test_constructor(self) 1858 # The allocation can succeed on 32-bit builds, e.g. with more 1859 # than 2GB RAM and a 64-bit kernel. 1860 if sys.maxsize > 0x7FFFFFFF: 1861 rawio = self.MockRawIO() 1862 bufio = self.tp(rawio) 1863 self.assertRaises((OverflowError, MemoryError, ValueError), 1864 bufio.__init__, rawio, sys.maxsize) 1865 1866 def test_garbage_collection(self): 1867 CBufferedReaderTest.test_garbage_collection(self) 1868 CBufferedWriterTest.test_garbage_collection(self) 1869 1870 def test_args_error(self): 1871 # Issue #17275 1872 with self.assertRaisesRegexp(TypeError, "BufferedRandom"): 1873 self.tp(io.BytesIO(), 1024, 1024, 1024) 1874 1875 1876class PyBufferedRandomTest(BufferedRandomTest): 1877 tp = pyio.BufferedRandom 1878 1879 1880# To fully exercise seek/tell, the StatefulIncrementalDecoder has these 1881# properties: 1882# - A single output character can correspond to many bytes of input. 1883# - The number of input bytes to complete the character can be 1884# undetermined until the last input byte is received. 1885# - The number of input bytes can vary depending on previous input. 1886# - A single input byte can correspond to many characters of output. 1887# - The number of output characters can be undetermined until the 1888# last input byte is received. 1889# - The number of output characters can vary depending on previous input. 1890 1891class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 1892 """ 1893 For testing seek/tell behavior with a stateful, buffering decoder. 1894 1895 Input is a sequence of words. Words may be fixed-length (length set 1896 by input) or variable-length (period-terminated). In variable-length 1897 mode, extra periods are ignored. Possible words are: 1898 - 'i' followed by a number sets the input length, I (maximum 99). 1899 When I is set to 0, words are space-terminated. 1900 - 'o' followed by a number sets the output length, O (maximum 99). 1901 - Any other word is converted into a word followed by a period on 1902 the output. The output word consists of the input word truncated 1903 or padded out with hyphens to make its length equal to O. If O 1904 is 0, the word is output verbatim without truncating or padding. 1905 I and O are initially set to 1. When I changes, any buffered input is 1906 re-scanned according to the new I. EOF also terminates the last word. 1907 """ 1908 1909 def __init__(self, errors='strict'): 1910 codecs.IncrementalDecoder.__init__(self, errors) 1911 self.reset() 1912 1913 def __repr__(self): 1914 return '<SID %x>' % id(self) 1915 1916 def reset(self): 1917 self.i = 1 1918 self.o = 1 1919 self.buffer = bytearray() 1920 1921 def getstate(self): 1922 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 1923 return bytes(self.buffer), i*100 + o 1924 1925 def setstate(self, state): 1926 buffer, io = state 1927 self.buffer = bytearray(buffer) 1928 i, o = divmod(io, 100) 1929 self.i, self.o = i ^ 1, o ^ 1 1930 1931 def decode(self, input, final=False): 1932 output = '' 1933 for b in input: 1934 if self.i == 0: # variable-length, terminated with period 1935 if b == '.': 1936 if self.buffer: 1937 output += self.process_word() 1938 else: 1939 self.buffer.append(b) 1940 else: # fixed-length, terminate after self.i bytes 1941 self.buffer.append(b) 1942 if len(self.buffer) == self.i: 1943 output += self.process_word() 1944 if final and self.buffer: # EOF terminates the last word 1945 output += self.process_word() 1946 return output 1947 1948 def process_word(self): 1949 output = '' 1950 if self.buffer[0] == ord('i'): 1951 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 1952 elif self.buffer[0] == ord('o'): 1953 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 1954 else: 1955 output = self.buffer.decode('ascii') 1956 if len(output) < self.o: 1957 output += '-'*self.o # pad out with hyphens 1958 if self.o: 1959 output = output[:self.o] # truncate to output length 1960 output += '.' 1961 self.buffer = bytearray() 1962 return output 1963 1964 codecEnabled = False 1965 1966 @classmethod 1967 def lookupTestDecoder(cls, name): 1968 if cls.codecEnabled and name == 'test_decoder': 1969 latin1 = codecs.lookup('latin-1') 1970 return codecs.CodecInfo( 1971 name='test_decoder', encode=latin1.encode, decode=None, 1972 incrementalencoder=None, 1973 streamreader=None, streamwriter=None, 1974 incrementaldecoder=cls) 1975 1976# Register the previous decoder for testing. 1977# Disabled by default, tests will enable it. 1978codecs.register(StatefulIncrementalDecoder.lookupTestDecoder) 1979 1980 1981class StatefulIncrementalDecoderTest(unittest.TestCase): 1982 """ 1983 Make sure the StatefulIncrementalDecoder actually works. 1984 """ 1985 1986 test_cases = [ 1987 # I=1, O=1 (fixed-length input == fixed-length output) 1988 (b'abcd', False, 'a.b.c.d.'), 1989 # I=0, O=0 (variable-length input, variable-length output) 1990 (b'oiabcd', True, 'abcd.'), 1991 # I=0, O=0 (should ignore extra periods) 1992 (b'oi...abcd...', True, 'abcd.'), 1993 # I=0, O=6 (variable-length input, fixed-length output) 1994 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 1995 # I=2, O=6 (fixed-length input < fixed-length output) 1996 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 1997 # I=6, O=3 (fixed-length input > fixed-length output) 1998 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 1999 # I=0, then 3; O=29, then 15 (with longer output) 2000 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 2001 'a----------------------------.' + 2002 'b----------------------------.' + 2003 'cde--------------------------.' + 2004 'abcdefghijabcde.' + 2005 'a.b------------.' + 2006 '.c.------------.' + 2007 'd.e------------.' + 2008 'k--------------.' + 2009 'l--------------.' + 2010 'm--------------.') 2011 ] 2012 2013 def test_decoder(self): 2014 # Try a few one-shot test cases. 2015 for input, eof, output in self.test_cases: 2016 d = StatefulIncrementalDecoder() 2017 self.assertEqual(d.decode(input, eof), output) 2018 2019 # Also test an unfinished decode, followed by forcing EOF. 2020 d = StatefulIncrementalDecoder() 2021 self.assertEqual(d.decode(b'oiabcd'), '') 2022 self.assertEqual(d.decode(b'', 1), 'abcd.') 2023 2024class TextIOWrapperTest(unittest.TestCase): 2025 2026 def setUp(self): 2027 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 2028 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 2029 support.unlink(support.TESTFN) 2030 2031 def tearDown(self): 2032 support.unlink(support.TESTFN) 2033 2034 def test_constructor(self): 2035 r = self.BytesIO(b"\xc3\xa9\n\n") 2036 b = self.BufferedReader(r, 1000) 2037 t = self.TextIOWrapper(b) 2038 t.__init__(b, encoding="latin1", newline="\r\n") 2039 self.assertEqual(t.encoding, "latin1") 2040 self.assertEqual(t.line_buffering, False) 2041 t.__init__(b, encoding="utf8", line_buffering=True) 2042 self.assertEqual(t.encoding, "utf8") 2043 self.assertEqual(t.line_buffering, True) 2044 self.assertEqual("\xe9\n", t.readline()) 2045 self.assertRaises(TypeError, t.__init__, b, newline=42) 2046 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2047 2048 def test_uninitialized(self): 2049 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2050 del t 2051 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2052 self.assertRaises(Exception, repr, t) 2053 self.assertRaisesRegexp((ValueError, AttributeError), 2054 'uninitialized|has no attribute', 2055 t.read, 0) 2056 t.__init__(self.MockRawIO()) 2057 self.assertEqual(t.read(0), u'') 2058 2059 def test_non_text_encoding_codecs_are_rejected(self): 2060 # Ensure the constructor complains if passed a codec that isn't 2061 # marked as a text encoding 2062 # http://bugs.python.org/issue20404 2063 r = self.BytesIO() 2064 b = self.BufferedWriter(r) 2065 with support.check_py3k_warnings(): 2066 self.TextIOWrapper(b, encoding="hex_codec") 2067 2068 def test_detach(self): 2069 r = self.BytesIO() 2070 b = self.BufferedWriter(r) 2071 t = self.TextIOWrapper(b) 2072 self.assertIs(t.detach(), b) 2073 2074 t = self.TextIOWrapper(b, encoding="ascii") 2075 t.write("howdy") 2076 self.assertFalse(r.getvalue()) 2077 t.detach() 2078 self.assertEqual(r.getvalue(), b"howdy") 2079 self.assertRaises(ValueError, t.detach) 2080 2081 # Operations independent of the detached stream should still work 2082 repr(t) 2083 self.assertEqual(t.encoding, "ascii") 2084 self.assertEqual(t.errors, "strict") 2085 self.assertFalse(t.line_buffering) 2086 2087 def test_repr(self): 2088 raw = self.BytesIO("hello".encode("utf-8")) 2089 b = self.BufferedReader(raw) 2090 t = self.TextIOWrapper(b, encoding="utf-8") 2091 modname = self.TextIOWrapper.__module__ 2092 self.assertEqual(repr(t), 2093 "<%s.TextIOWrapper encoding='utf-8'>" % modname) 2094 raw.name = "dummy" 2095 self.assertEqual(repr(t), 2096 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname) 2097 raw.name = b"dummy" 2098 self.assertEqual(repr(t), 2099 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 2100 2101 t.buffer.detach() 2102 repr(t) # Should not raise an exception 2103 2104 def test_line_buffering(self): 2105 r = self.BytesIO() 2106 b = self.BufferedWriter(r, 1000) 2107 t = self.TextIOWrapper(b, newline="\n", line_buffering=True) 2108 t.write("X") 2109 self.assertEqual(r.getvalue(), b"") # No flush happened 2110 t.write("Y\nZ") 2111 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 2112 t.write("A\rB") 2113 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 2114 2115 def test_encoding(self): 2116 # Check the encoding attribute is always set, and valid 2117 b = self.BytesIO() 2118 t = self.TextIOWrapper(b, encoding="utf8") 2119 self.assertEqual(t.encoding, "utf8") 2120 t = self.TextIOWrapper(b) 2121 self.assertIsNotNone(t.encoding) 2122 codecs.lookup(t.encoding) 2123 2124 def test_encoding_errors_reading(self): 2125 # (1) default 2126 b = self.BytesIO(b"abc\n\xff\n") 2127 t = self.TextIOWrapper(b, encoding="ascii") 2128 self.assertRaises(UnicodeError, t.read) 2129 # (2) explicit strict 2130 b = self.BytesIO(b"abc\n\xff\n") 2131 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2132 self.assertRaises(UnicodeError, t.read) 2133 # (3) ignore 2134 b = self.BytesIO(b"abc\n\xff\n") 2135 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 2136 self.assertEqual(t.read(), "abc\n\n") 2137 # (4) replace 2138 b = self.BytesIO(b"abc\n\xff\n") 2139 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 2140 self.assertEqual(t.read(), "abc\n\ufffd\n") 2141 2142 def test_encoding_errors_writing(self): 2143 # (1) default 2144 b = self.BytesIO() 2145 t = self.TextIOWrapper(b, encoding="ascii") 2146 self.assertRaises(UnicodeError, t.write, "\xff") 2147 # (2) explicit strict 2148 b = self.BytesIO() 2149 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2150 self.assertRaises(UnicodeError, t.write, "\xff") 2151 # (3) ignore 2152 b = self.BytesIO() 2153 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 2154 newline="\n") 2155 t.write("abc\xffdef\n") 2156 t.flush() 2157 self.assertEqual(b.getvalue(), b"abcdef\n") 2158 # (4) replace 2159 b = self.BytesIO() 2160 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 2161 newline="\n") 2162 t.write("abc\xffdef\n") 2163 t.flush() 2164 self.assertEqual(b.getvalue(), b"abc?def\n") 2165 2166 def test_newlines(self): 2167 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 2168 2169 tests = [ 2170 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 2171 [ '', input_lines ], 2172 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 2173 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 2174 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 2175 ] 2176 encodings = ( 2177 'utf-8', 'latin-1', 2178 'utf-16', 'utf-16-le', 'utf-16-be', 2179 'utf-32', 'utf-32-le', 'utf-32-be', 2180 ) 2181 2182 # Try a range of buffer sizes to test the case where \r is the last 2183 # character in TextIOWrapper._pending_line. 2184 for encoding in encodings: 2185 # XXX: str.encode() should return bytes 2186 data = bytes(''.join(input_lines).encode(encoding)) 2187 for do_reads in (False, True): 2188 for bufsize in range(1, 10): 2189 for newline, exp_lines in tests: 2190 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 2191 textio = self.TextIOWrapper(bufio, newline=newline, 2192 encoding=encoding) 2193 if do_reads: 2194 got_lines = [] 2195 while True: 2196 c2 = textio.read(2) 2197 if c2 == '': 2198 break 2199 self.assertEqual(len(c2), 2) 2200 got_lines.append(c2 + textio.readline()) 2201 else: 2202 got_lines = list(textio) 2203 2204 for got_line, exp_line in zip(got_lines, exp_lines): 2205 self.assertEqual(got_line, exp_line) 2206 self.assertEqual(len(got_lines), len(exp_lines)) 2207 2208 def test_newlines_input(self): 2209 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 2210 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2211 for newline, expected in [ 2212 (None, normalized.decode("ascii").splitlines(True)), 2213 ("", testdata.decode("ascii").splitlines(True)), 2214 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2215 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2216 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 2217 ]: 2218 buf = self.BytesIO(testdata) 2219 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2220 self.assertEqual(txt.readlines(), expected) 2221 txt.seek(0) 2222 self.assertEqual(txt.read(), "".join(expected)) 2223 2224 def test_newlines_output(self): 2225 testdict = { 2226 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2227 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2228 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2229 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2230 } 2231 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2232 for newline, expected in tests: 2233 buf = self.BytesIO() 2234 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2235 txt.write("AAA\nB") 2236 txt.write("BB\nCCC\n") 2237 txt.write("X\rY\r\nZ") 2238 txt.flush() 2239 self.assertEqual(buf.closed, False) 2240 self.assertEqual(buf.getvalue(), expected) 2241 2242 def test_destructor(self): 2243 l = [] 2244 base = self.BytesIO 2245 class MyBytesIO(base): 2246 def close(self): 2247 l.append(self.getvalue()) 2248 base.close(self) 2249 b = MyBytesIO() 2250 t = self.TextIOWrapper(b, encoding="ascii") 2251 t.write("abc") 2252 del t 2253 support.gc_collect() 2254 self.assertEqual([b"abc"], l) 2255 2256 def test_override_destructor(self): 2257 record = [] 2258 class MyTextIO(self.TextIOWrapper): 2259 def __del__(self): 2260 record.append(1) 2261 try: 2262 f = super(MyTextIO, self).__del__ 2263 except AttributeError: 2264 pass 2265 else: 2266 f() 2267 def close(self): 2268 record.append(2) 2269 super(MyTextIO, self).close() 2270 def flush(self): 2271 record.append(3) 2272 super(MyTextIO, self).flush() 2273 b = self.BytesIO() 2274 t = MyTextIO(b, encoding="ascii") 2275 del t 2276 support.gc_collect() 2277 self.assertEqual(record, [1, 2, 3]) 2278 2279 def test_error_through_destructor(self): 2280 # Test that the exception state is not modified by a destructor, 2281 # even if close() fails. 2282 rawio = self.CloseFailureIO() 2283 def f(): 2284 self.TextIOWrapper(rawio).xyzzy 2285 with support.captured_output("stderr") as s: 2286 self.assertRaises(AttributeError, f) 2287 s = s.getvalue().strip() 2288 if s: 2289 # The destructor *may* have printed an unraisable error, check it 2290 self.assertEqual(len(s.splitlines()), 1) 2291 self.assertTrue(s.startswith("Exception IOError: "), s) 2292 self.assertTrue(s.endswith(" ignored"), s) 2293 2294 # Systematic tests of the text I/O API 2295 2296 def test_basic_io(self): 2297 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2298 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": 2299 f = self.open(support.TESTFN, "w+", encoding=enc) 2300 f._CHUNK_SIZE = chunksize 2301 self.assertEqual(f.write("abc"), 3) 2302 f.close() 2303 f = self.open(support.TESTFN, "r+", encoding=enc) 2304 f._CHUNK_SIZE = chunksize 2305 self.assertEqual(f.tell(), 0) 2306 self.assertEqual(f.read(), "abc") 2307 cookie = f.tell() 2308 self.assertEqual(f.seek(0), 0) 2309 self.assertEqual(f.read(None), "abc") 2310 f.seek(0) 2311 self.assertEqual(f.read(2), "ab") 2312 self.assertEqual(f.read(1), "c") 2313 self.assertEqual(f.read(1), "") 2314 self.assertEqual(f.read(), "") 2315 self.assertEqual(f.tell(), cookie) 2316 self.assertEqual(f.seek(0), 0) 2317 self.assertEqual(f.seek(0, 2), cookie) 2318 self.assertEqual(f.write("def"), 3) 2319 self.assertEqual(f.seek(cookie), cookie) 2320 self.assertEqual(f.read(), "def") 2321 if enc.startswith("utf"): 2322 self.multi_line_test(f, enc) 2323 f.close() 2324 2325 def multi_line_test(self, f, enc): 2326 f.seek(0) 2327 f.truncate() 2328 sample = "s\xff\u0fff\uffff" 2329 wlines = [] 2330 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2331 chars = [] 2332 for i in range(size): 2333 chars.append(sample[i % len(sample)]) 2334 line = "".join(chars) + "\n" 2335 wlines.append((f.tell(), line)) 2336 f.write(line) 2337 f.seek(0) 2338 rlines = [] 2339 while True: 2340 pos = f.tell() 2341 line = f.readline() 2342 if not line: 2343 break 2344 rlines.append((pos, line)) 2345 self.assertEqual(rlines, wlines) 2346 2347 def test_telling(self): 2348 f = self.open(support.TESTFN, "w+", encoding="utf8") 2349 p0 = f.tell() 2350 f.write("\xff\n") 2351 p1 = f.tell() 2352 f.write("\xff\n") 2353 p2 = f.tell() 2354 f.seek(0) 2355 self.assertEqual(f.tell(), p0) 2356 self.assertEqual(f.readline(), "\xff\n") 2357 self.assertEqual(f.tell(), p1) 2358 self.assertEqual(f.readline(), "\xff\n") 2359 self.assertEqual(f.tell(), p2) 2360 f.seek(0) 2361 for line in f: 2362 self.assertEqual(line, "\xff\n") 2363 self.assertRaises(IOError, f.tell) 2364 self.assertEqual(f.tell(), p2) 2365 f.close() 2366 2367 def test_seeking(self): 2368 chunk_size = _default_chunk_size() 2369 prefix_size = chunk_size - 2 2370 u_prefix = "a" * prefix_size 2371 prefix = bytes(u_prefix.encode("utf-8")) 2372 self.assertEqual(len(u_prefix), len(prefix)) 2373 u_suffix = "\u8888\n" 2374 suffix = bytes(u_suffix.encode("utf-8")) 2375 line = prefix + suffix 2376 f = self.open(support.TESTFN, "wb") 2377 f.write(line*2) 2378 f.close() 2379 f = self.open(support.TESTFN, "r", encoding="utf-8") 2380 s = f.read(prefix_size) 2381 self.assertEqual(s, prefix.decode("ascii")) 2382 self.assertEqual(f.tell(), prefix_size) 2383 self.assertEqual(f.readline(), u_suffix) 2384 2385 def test_seeking_too(self): 2386 # Regression test for a specific bug 2387 data = b'\xe0\xbf\xbf\n' 2388 f = self.open(support.TESTFN, "wb") 2389 f.write(data) 2390 f.close() 2391 f = self.open(support.TESTFN, "r", encoding="utf-8") 2392 f._CHUNK_SIZE # Just test that it exists 2393 f._CHUNK_SIZE = 2 2394 f.readline() 2395 f.tell() 2396 2397 def test_seek_and_tell(self): 2398 #Test seek/tell using the StatefulIncrementalDecoder. 2399 # Make test faster by doing smaller seeks 2400 CHUNK_SIZE = 128 2401 2402 def test_seek_and_tell_with_data(data, min_pos=0): 2403 """Tell/seek to various points within a data stream and ensure 2404 that the decoded data returned by read() is consistent.""" 2405 f = self.open(support.TESTFN, 'wb') 2406 f.write(data) 2407 f.close() 2408 f = self.open(support.TESTFN, encoding='test_decoder') 2409 f._CHUNK_SIZE = CHUNK_SIZE 2410 decoded = f.read() 2411 f.close() 2412 2413 for i in range(min_pos, len(decoded) + 1): # seek positions 2414 for j in [1, 5, len(decoded) - i]: # read lengths 2415 f = self.open(support.TESTFN, encoding='test_decoder') 2416 self.assertEqual(f.read(i), decoded[:i]) 2417 cookie = f.tell() 2418 self.assertEqual(f.read(j), decoded[i:i + j]) 2419 f.seek(cookie) 2420 self.assertEqual(f.read(), decoded[i:]) 2421 f.close() 2422 2423 # Enable the test decoder. 2424 StatefulIncrementalDecoder.codecEnabled = 1 2425 2426 # Run the tests. 2427 try: 2428 # Try each test case. 2429 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2430 test_seek_and_tell_with_data(input) 2431 2432 # Position each test case so that it crosses a chunk boundary. 2433 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2434 offset = CHUNK_SIZE - len(input)//2 2435 prefix = b'.'*offset 2436 # Don't bother seeking into the prefix (takes too long). 2437 min_pos = offset*2 2438 test_seek_and_tell_with_data(prefix + input, min_pos) 2439 2440 # Ensure our test decoder won't interfere with subsequent tests. 2441 finally: 2442 StatefulIncrementalDecoder.codecEnabled = 0 2443 2444 def test_encoded_writes(self): 2445 data = "1234567890" 2446 tests = ("utf-16", 2447 "utf-16-le", 2448 "utf-16-be", 2449 "utf-32", 2450 "utf-32-le", 2451 "utf-32-be") 2452 for encoding in tests: 2453 buf = self.BytesIO() 2454 f = self.TextIOWrapper(buf, encoding=encoding) 2455 # Check if the BOM is written only once (see issue1753). 2456 f.write(data) 2457 f.write(data) 2458 f.seek(0) 2459 self.assertEqual(f.read(), data * 2) 2460 f.seek(0) 2461 self.assertEqual(f.read(), data * 2) 2462 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 2463 2464 def test_unreadable(self): 2465 class UnReadable(self.BytesIO): 2466 def readable(self): 2467 return False 2468 txt = self.TextIOWrapper(UnReadable()) 2469 self.assertRaises(IOError, txt.read) 2470 2471 def test_read_one_by_one(self): 2472 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) 2473 reads = "" 2474 while True: 2475 c = txt.read(1) 2476 if not c: 2477 break 2478 reads += c 2479 self.assertEqual(reads, "AA\nBB") 2480 2481 def test_readlines(self): 2482 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) 2483 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 2484 txt.seek(0) 2485 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 2486 txt.seek(0) 2487 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 2488 2489 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 2490 def test_read_by_chunk(self): 2491 # make sure "\r\n" straddles 128 char boundary. 2492 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) 2493 reads = "" 2494 while True: 2495 c = txt.read(128) 2496 if not c: 2497 break 2498 reads += c 2499 self.assertEqual(reads, "A"*127+"\nB") 2500 2501 def test_writelines(self): 2502 l = ['ab', 'cd', 'ef'] 2503 buf = self.BytesIO() 2504 txt = self.TextIOWrapper(buf) 2505 txt.writelines(l) 2506 txt.flush() 2507 self.assertEqual(buf.getvalue(), b'abcdef') 2508 2509 def test_writelines_userlist(self): 2510 l = UserList(['ab', 'cd', 'ef']) 2511 buf = self.BytesIO() 2512 txt = self.TextIOWrapper(buf) 2513 txt.writelines(l) 2514 txt.flush() 2515 self.assertEqual(buf.getvalue(), b'abcdef') 2516 2517 def test_writelines_error(self): 2518 txt = self.TextIOWrapper(self.BytesIO()) 2519 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 2520 self.assertRaises(TypeError, txt.writelines, None) 2521 self.assertRaises(TypeError, txt.writelines, b'abc') 2522 2523 def test_issue1395_1(self): 2524 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2525 2526 # read one char at a time 2527 reads = "" 2528 while True: 2529 c = txt.read(1) 2530 if not c: 2531 break 2532 reads += c 2533 self.assertEqual(reads, self.normalized) 2534 2535 def test_issue1395_2(self): 2536 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2537 txt._CHUNK_SIZE = 4 2538 2539 reads = "" 2540 while True: 2541 c = txt.read(4) 2542 if not c: 2543 break 2544 reads += c 2545 self.assertEqual(reads, self.normalized) 2546 2547 def test_issue1395_3(self): 2548 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2549 txt._CHUNK_SIZE = 4 2550 2551 reads = txt.read(4) 2552 reads += txt.read(4) 2553 reads += txt.readline() 2554 reads += txt.readline() 2555 reads += txt.readline() 2556 self.assertEqual(reads, self.normalized) 2557 2558 def test_issue1395_4(self): 2559 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2560 txt._CHUNK_SIZE = 4 2561 2562 reads = txt.read(4) 2563 reads += txt.read() 2564 self.assertEqual(reads, self.normalized) 2565 2566 def test_issue1395_5(self): 2567 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2568 txt._CHUNK_SIZE = 4 2569 2570 reads = txt.read(4) 2571 pos = txt.tell() 2572 txt.seek(0) 2573 txt.seek(pos) 2574 self.assertEqual(txt.read(4), "BBB\n") 2575 2576 def test_issue2282(self): 2577 buffer = self.BytesIO(self.testdata) 2578 txt = self.TextIOWrapper(buffer, encoding="ascii") 2579 2580 self.assertEqual(buffer.seekable(), txt.seekable()) 2581 2582 def test_append_bom(self): 2583 # The BOM is not written again when appending to a non-empty file 2584 filename = support.TESTFN 2585 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2586 with self.open(filename, 'w', encoding=charset) as f: 2587 f.write('aaa') 2588 pos = f.tell() 2589 with self.open(filename, 'rb') as f: 2590 self.assertEqual(f.read(), 'aaa'.encode(charset)) 2591 2592 with self.open(filename, 'a', encoding=charset) as f: 2593 f.write('xxx') 2594 with self.open(filename, 'rb') as f: 2595 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 2596 2597 def test_seek_bom(self): 2598 # Same test, but when seeking manually 2599 filename = support.TESTFN 2600 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2601 with self.open(filename, 'w', encoding=charset) as f: 2602 f.write('aaa') 2603 pos = f.tell() 2604 with self.open(filename, 'r+', encoding=charset) as f: 2605 f.seek(pos) 2606 f.write('zzz') 2607 f.seek(0) 2608 f.write('bbb') 2609 with self.open(filename, 'rb') as f: 2610 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 2611 2612 def test_errors_property(self): 2613 with self.open(support.TESTFN, "w") as f: 2614 self.assertEqual(f.errors, "strict") 2615 with self.open(support.TESTFN, "w", errors="replace") as f: 2616 self.assertEqual(f.errors, "replace") 2617 2618 @unittest.skipUnless(threading, 'Threading required for this test.') 2619 def test_threads_write(self): 2620 # Issue6750: concurrent writes could duplicate data 2621 event = threading.Event() 2622 with self.open(support.TESTFN, "w", buffering=1) as f: 2623 def run(n): 2624 text = "Thread%03d\n" % n 2625 event.wait() 2626 f.write(text) 2627 threads = [threading.Thread(target=run, args=(x,)) 2628 for x in range(20)] 2629 with support.start_threads(threads, event.set): 2630 time.sleep(0.02) 2631 with self.open(support.TESTFN) as f: 2632 content = f.read() 2633 for n in range(20): 2634 self.assertEqual(content.count("Thread%03d\n" % n), 1) 2635 2636 def test_flush_error_on_close(self): 2637 # Test that text file is closed despite failed flush 2638 # and that flush() is called before file closed. 2639 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2640 closed = [] 2641 def bad_flush(): 2642 closed[:] = [txt.closed, txt.buffer.closed] 2643 raise IOError() 2644 txt.flush = bad_flush 2645 self.assertRaises(IOError, txt.close) # exception not swallowed 2646 self.assertTrue(txt.closed) 2647 self.assertTrue(txt.buffer.closed) 2648 self.assertTrue(closed) # flush() called 2649 self.assertFalse(closed[0]) # flush() called before file closed 2650 self.assertFalse(closed[1]) 2651 txt.flush = lambda: None # break reference loop 2652 2653 def test_multi_close(self): 2654 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2655 txt.close() 2656 txt.close() 2657 txt.close() 2658 self.assertRaises(ValueError, txt.flush) 2659 2660 def test_readonly_attributes(self): 2661 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2662 buf = self.BytesIO(self.testdata) 2663 with self.assertRaises((AttributeError, TypeError)): 2664 txt.buffer = buf 2665 2666 def test_read_nonbytes(self): 2667 # Issue #17106 2668 # Crash when underlying read() returns non-bytes 2669 class NonbytesStream(self.StringIO): 2670 read1 = self.StringIO.read 2671 class NonbytesStream(self.StringIO): 2672 read1 = self.StringIO.read 2673 t = self.TextIOWrapper(NonbytesStream('a')) 2674 with self.maybeRaises(TypeError): 2675 t.read(1) 2676 t = self.TextIOWrapper(NonbytesStream('a')) 2677 with self.maybeRaises(TypeError): 2678 t.readline() 2679 t = self.TextIOWrapper(NonbytesStream('a')) 2680 self.assertEqual(t.read(), u'a') 2681 2682 def test_illegal_encoder(self): 2683 # bpo-31271: A TypeError should be raised in case the return value of 2684 # encoder's encode() is invalid. 2685 class BadEncoder: 2686 def encode(self, dummy): 2687 return u'spam' 2688 def get_bad_encoder(dummy): 2689 return BadEncoder() 2690 rot13 = codecs.lookup("rot13") 2691 with support.swap_attr(rot13, '_is_text_encoding', True), \ 2692 support.swap_attr(rot13, 'incrementalencoder', get_bad_encoder): 2693 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13") 2694 with self.assertRaises(TypeError): 2695 t.write('bar') 2696 t.flush() 2697 2698 def test_illegal_decoder(self): 2699 # Issue #17106 2700 # Bypass the early encoding check added in issue 20404 2701 def _make_illegal_wrapper(): 2702 quopri = codecs.lookup("quopri_codec") 2703 quopri._is_text_encoding = True 2704 try: 2705 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), 2706 newline='\n', encoding="quopri_codec") 2707 finally: 2708 quopri._is_text_encoding = False 2709 return t 2710 # Crash when decoder returns non-string 2711 with support.check_py3k_warnings(): 2712 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2713 encoding='quopri_codec') 2714 with self.maybeRaises(TypeError): 2715 t.read(1) 2716 with support.check_py3k_warnings(): 2717 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2718 encoding='quopri_codec') 2719 with self.maybeRaises(TypeError): 2720 t.readline() 2721 with support.check_py3k_warnings(): 2722 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2723 encoding='quopri_codec') 2724 with self.maybeRaises(TypeError): 2725 t.read() 2726 #else: 2727 #t = _make_illegal_wrapper() 2728 #self.assertRaises(TypeError, t.read, 1) 2729 #t = _make_illegal_wrapper() 2730 #self.assertRaises(TypeError, t.readline) 2731 #t = _make_illegal_wrapper() 2732 #self.assertRaises(TypeError, t.read) 2733 2734 # Issue 31243: calling read() while the return value of decoder's 2735 # getstate() is invalid should neither crash the interpreter nor 2736 # raise a SystemError. 2737 def _make_very_illegal_wrapper(getstate_ret_val): 2738 class BadDecoder: 2739 def getstate(self): 2740 return getstate_ret_val 2741 def _get_bad_decoder(dummy): 2742 return BadDecoder() 2743 quopri = codecs.lookup("quopri_codec") 2744 with support.swap_attr(quopri, 'incrementaldecoder', 2745 _get_bad_decoder): 2746 return _make_illegal_wrapper() 2747 t = _make_very_illegal_wrapper(42) 2748 with self.maybeRaises(TypeError): 2749 t.read(42) 2750 t = _make_very_illegal_wrapper(()) 2751 with self.maybeRaises(TypeError): 2752 t.read(42) 2753 2754 def test_issue25862(self): 2755 # Assertion failures occurred in tell() after read() and write(). 2756 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 2757 t.read(1) 2758 t.read() 2759 t.tell() 2760 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 2761 t.read(1) 2762 t.write('x') 2763 t.tell() 2764 2765 2766class CTextIOWrapperTest(TextIOWrapperTest): 2767 2768 def test_initialization(self): 2769 r = self.BytesIO(b"\xc3\xa9\n\n") 2770 b = self.BufferedReader(r, 1000) 2771 t = self.TextIOWrapper(b) 2772 self.assertRaises(TypeError, t.__init__, b, newline=42) 2773 self.assertRaises(ValueError, t.read) 2774 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2775 self.assertRaises(ValueError, t.read) 2776 2777 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2778 self.assertRaises(Exception, repr, t) 2779 2780 def test_garbage_collection(self): 2781 # C TextIOWrapper objects are collected, and collecting them flushes 2782 # all data to disk. 2783 # The Python version has __del__, so it ends in gc.garbage instead. 2784 rawio = io.FileIO(support.TESTFN, "wb") 2785 b = self.BufferedWriter(rawio) 2786 t = self.TextIOWrapper(b, encoding="ascii") 2787 t.write("456def") 2788 t.x = t 2789 wr = weakref.ref(t) 2790 del t 2791 support.gc_collect() 2792 self.assertIsNone(wr(), wr) 2793 with self.open(support.TESTFN, "rb") as f: 2794 self.assertEqual(f.read(), b"456def") 2795 2796 def test_rwpair_cleared_before_textio(self): 2797 # Issue 13070: TextIOWrapper's finalization would crash when called 2798 # after the reference to the underlying BufferedRWPair's writer got 2799 # cleared by the GC. 2800 for i in range(1000): 2801 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 2802 t1 = self.TextIOWrapper(b1, encoding="ascii") 2803 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 2804 t2 = self.TextIOWrapper(b2, encoding="ascii") 2805 # circular references 2806 t1.buddy = t2 2807 t2.buddy = t1 2808 support.gc_collect() 2809 2810 maybeRaises = unittest.TestCase.assertRaises 2811 2812 2813class PyTextIOWrapperTest(TextIOWrapperTest): 2814 @contextlib.contextmanager 2815 def maybeRaises(self, *args, **kwds): 2816 yield 2817 2818 2819class IncrementalNewlineDecoderTest(unittest.TestCase): 2820 2821 def check_newline_decoding_utf8(self, decoder): 2822 # UTF-8 specific tests for a newline decoder 2823 def _check_decode(b, s, **kwargs): 2824 # We exercise getstate() / setstate() as well as decode() 2825 state = decoder.getstate() 2826 self.assertEqual(decoder.decode(b, **kwargs), s) 2827 decoder.setstate(state) 2828 self.assertEqual(decoder.decode(b, **kwargs), s) 2829 2830 _check_decode(b'\xe8\xa2\x88', "\u8888") 2831 2832 _check_decode(b'\xe8', "") 2833 _check_decode(b'\xa2', "") 2834 _check_decode(b'\x88', "\u8888") 2835 2836 _check_decode(b'\xe8', "") 2837 _check_decode(b'\xa2', "") 2838 _check_decode(b'\x88', "\u8888") 2839 2840 _check_decode(b'\xe8', "") 2841 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 2842 2843 decoder.reset() 2844 _check_decode(b'\n', "\n") 2845 _check_decode(b'\r', "") 2846 _check_decode(b'', "\n", final=True) 2847 _check_decode(b'\r', "\n", final=True) 2848 2849 _check_decode(b'\r', "") 2850 _check_decode(b'a', "\na") 2851 2852 _check_decode(b'\r\r\n', "\n\n") 2853 _check_decode(b'\r', "") 2854 _check_decode(b'\r', "\n") 2855 _check_decode(b'\na', "\na") 2856 2857 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 2858 _check_decode(b'\xe8\xa2\x88', "\u8888") 2859 _check_decode(b'\n', "\n") 2860 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 2861 _check_decode(b'\n', "\n") 2862 2863 def check_newline_decoding(self, decoder, encoding): 2864 result = [] 2865 if encoding is not None: 2866 encoder = codecs.getincrementalencoder(encoding)() 2867 def _decode_bytewise(s): 2868 # Decode one byte at a time 2869 for b in encoder.encode(s): 2870 result.append(decoder.decode(b)) 2871 else: 2872 encoder = None 2873 def _decode_bytewise(s): 2874 # Decode one char at a time 2875 for c in s: 2876 result.append(decoder.decode(c)) 2877 self.assertEqual(decoder.newlines, None) 2878 _decode_bytewise("abc\n\r") 2879 self.assertEqual(decoder.newlines, '\n') 2880 _decode_bytewise("\nabc") 2881 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 2882 _decode_bytewise("abc\r") 2883 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 2884 _decode_bytewise("abc") 2885 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 2886 _decode_bytewise("abc\r") 2887 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 2888 decoder.reset() 2889 input = "abc" 2890 if encoder is not None: 2891 encoder.reset() 2892 input = encoder.encode(input) 2893 self.assertEqual(decoder.decode(input), "abc") 2894 self.assertEqual(decoder.newlines, None) 2895 2896 def test_newline_decoder(self): 2897 encodings = ( 2898 # None meaning the IncrementalNewlineDecoder takes unicode input 2899 # rather than bytes input 2900 None, 'utf-8', 'latin-1', 2901 'utf-16', 'utf-16-le', 'utf-16-be', 2902 'utf-32', 'utf-32-le', 'utf-32-be', 2903 ) 2904 for enc in encodings: 2905 decoder = enc and codecs.getincrementaldecoder(enc)() 2906 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2907 self.check_newline_decoding(decoder, enc) 2908 decoder = codecs.getincrementaldecoder("utf-8")() 2909 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2910 self.check_newline_decoding_utf8(decoder) 2911 2912 def test_newline_bytes(self): 2913 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 2914 def _check(dec): 2915 self.assertEqual(dec.newlines, None) 2916 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 2917 self.assertEqual(dec.newlines, None) 2918 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 2919 self.assertEqual(dec.newlines, None) 2920 dec = self.IncrementalNewlineDecoder(None, translate=False) 2921 _check(dec) 2922 dec = self.IncrementalNewlineDecoder(None, translate=True) 2923 _check(dec) 2924 2925class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2926 pass 2927 2928class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2929 pass 2930 2931 2932# XXX Tests for open() 2933 2934class MiscIOTest(unittest.TestCase): 2935 2936 def tearDown(self): 2937 support.unlink(support.TESTFN) 2938 2939 def test___all__(self): 2940 for name in self.io.__all__: 2941 obj = getattr(self.io, name, None) 2942 self.assertIsNotNone(obj, name) 2943 if name == "open": 2944 continue 2945 elif "error" in name.lower() or name == "UnsupportedOperation": 2946 self.assertTrue(issubclass(obj, Exception), name) 2947 elif not name.startswith("SEEK_"): 2948 self.assertTrue(issubclass(obj, self.IOBase)) 2949 2950 def test_attributes(self): 2951 f = self.open(support.TESTFN, "wb", buffering=0) 2952 self.assertEqual(f.mode, "wb") 2953 f.close() 2954 2955 f = self.open(support.TESTFN, "U") 2956 self.assertEqual(f.name, support.TESTFN) 2957 self.assertEqual(f.buffer.name, support.TESTFN) 2958 self.assertEqual(f.buffer.raw.name, support.TESTFN) 2959 self.assertEqual(f.mode, "U") 2960 self.assertEqual(f.buffer.mode, "rb") 2961 self.assertEqual(f.buffer.raw.mode, "rb") 2962 f.close() 2963 2964 f = self.open(support.TESTFN, "w+") 2965 self.assertEqual(f.mode, "w+") 2966 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 2967 self.assertEqual(f.buffer.raw.mode, "rb+") 2968 2969 g = self.open(f.fileno(), "wb", closefd=False) 2970 self.assertEqual(g.mode, "wb") 2971 self.assertEqual(g.raw.mode, "wb") 2972 self.assertEqual(g.name, f.fileno()) 2973 self.assertEqual(g.raw.name, f.fileno()) 2974 f.close() 2975 g.close() 2976 2977 def test_io_after_close(self): 2978 for kwargs in [ 2979 {"mode": "w"}, 2980 {"mode": "wb"}, 2981 {"mode": "w", "buffering": 1}, 2982 {"mode": "w", "buffering": 2}, 2983 {"mode": "wb", "buffering": 0}, 2984 {"mode": "r"}, 2985 {"mode": "rb"}, 2986 {"mode": "r", "buffering": 1}, 2987 {"mode": "r", "buffering": 2}, 2988 {"mode": "rb", "buffering": 0}, 2989 {"mode": "w+"}, 2990 {"mode": "w+b"}, 2991 {"mode": "w+", "buffering": 1}, 2992 {"mode": "w+", "buffering": 2}, 2993 {"mode": "w+b", "buffering": 0}, 2994 ]: 2995 f = self.open(support.TESTFN, **kwargs) 2996 f.close() 2997 self.assertRaises(ValueError, f.flush) 2998 self.assertRaises(ValueError, f.fileno) 2999 self.assertRaises(ValueError, f.isatty) 3000 self.assertRaises(ValueError, f.__iter__) 3001 if hasattr(f, "peek"): 3002 self.assertRaises(ValueError, f.peek, 1) 3003 self.assertRaises(ValueError, f.read) 3004 if hasattr(f, "read1"): 3005 self.assertRaises(ValueError, f.read1, 1024) 3006 if hasattr(f, "readall"): 3007 self.assertRaises(ValueError, f.readall) 3008 if hasattr(f, "readinto"): 3009 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 3010 self.assertRaises(ValueError, f.readline) 3011 self.assertRaises(ValueError, f.readlines) 3012 self.assertRaises(ValueError, f.readlines, 1) 3013 self.assertRaises(ValueError, f.seek, 0) 3014 self.assertRaises(ValueError, f.tell) 3015 self.assertRaises(ValueError, f.truncate) 3016 self.assertRaises(ValueError, f.write, 3017 b"" if "b" in kwargs['mode'] else "") 3018 self.assertRaises(ValueError, f.writelines, []) 3019 self.assertRaises(ValueError, next, f) 3020 3021 def test_blockingioerror(self): 3022 # Various BlockingIOError issues 3023 self.assertRaises(TypeError, self.BlockingIOError) 3024 self.assertRaises(TypeError, self.BlockingIOError, 1) 3025 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) 3026 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) 3027 b = self.BlockingIOError(1, "") 3028 self.assertEqual(b.characters_written, 0) 3029 class C(unicode): 3030 pass 3031 c = C("") 3032 b = self.BlockingIOError(1, c) 3033 c.b = b 3034 b.c = c 3035 wr = weakref.ref(c) 3036 del c, b 3037 support.gc_collect() 3038 self.assertIsNone(wr(), wr) 3039 3040 def test_abcs(self): 3041 # Test the visible base classes are ABCs. 3042 self.assertIsInstance(self.IOBase, abc.ABCMeta) 3043 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 3044 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 3045 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 3046 3047 def _check_abc_inheritance(self, abcmodule): 3048 with self.open(support.TESTFN, "wb", buffering=0) as f: 3049 self.assertIsInstance(f, abcmodule.IOBase) 3050 self.assertIsInstance(f, abcmodule.RawIOBase) 3051 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 3052 self.assertNotIsInstance(f, abcmodule.TextIOBase) 3053 with self.open(support.TESTFN, "wb") as f: 3054 self.assertIsInstance(f, abcmodule.IOBase) 3055 self.assertNotIsInstance(f, abcmodule.RawIOBase) 3056 self.assertIsInstance(f, abcmodule.BufferedIOBase) 3057 self.assertNotIsInstance(f, abcmodule.TextIOBase) 3058 with self.open(support.TESTFN, "w") as f: 3059 self.assertIsInstance(f, abcmodule.IOBase) 3060 self.assertNotIsInstance(f, abcmodule.RawIOBase) 3061 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 3062 self.assertIsInstance(f, abcmodule.TextIOBase) 3063 3064 def test_abc_inheritance(self): 3065 # Test implementations inherit from their respective ABCs 3066 self._check_abc_inheritance(self) 3067 3068 def test_abc_inheritance_official(self): 3069 # Test implementations inherit from the official ABCs of the 3070 # baseline "io" module. 3071 self._check_abc_inheritance(io) 3072 3073 @unittest.skipUnless(fcntl, 'fcntl required for this test') 3074 def test_nonblock_pipe_write_bigbuf(self): 3075 self._test_nonblock_pipe_write(16*1024) 3076 3077 @unittest.skipUnless(fcntl, 'fcntl required for this test') 3078 def test_nonblock_pipe_write_smallbuf(self): 3079 self._test_nonblock_pipe_write(1024) 3080 3081 def _set_non_blocking(self, fd): 3082 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 3083 self.assertNotEqual(flags, -1) 3084 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) 3085 self.assertEqual(res, 0) 3086 3087 def _test_nonblock_pipe_write(self, bufsize): 3088 sent = [] 3089 received = [] 3090 r, w = os.pipe() 3091 self._set_non_blocking(r) 3092 self._set_non_blocking(w) 3093 3094 # To exercise all code paths in the C implementation we need 3095 # to play with buffer sizes. For instance, if we choose a 3096 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 3097 # then we will never get a partial write of the buffer. 3098 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 3099 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 3100 3101 with rf, wf: 3102 for N in 9999, 73, 7574: 3103 try: 3104 i = 0 3105 while True: 3106 msg = bytes([i % 26 + 97]) * N 3107 sent.append(msg) 3108 wf.write(msg) 3109 i += 1 3110 3111 except self.BlockingIOError as e: 3112 self.assertEqual(e.args[0], errno.EAGAIN) 3113 sent[-1] = sent[-1][:e.characters_written] 3114 received.append(rf.read()) 3115 msg = b'BLOCKED' 3116 wf.write(msg) 3117 sent.append(msg) 3118 3119 while True: 3120 try: 3121 wf.flush() 3122 break 3123 except self.BlockingIOError as e: 3124 self.assertEqual(e.args[0], errno.EAGAIN) 3125 self.assertEqual(e.characters_written, 0) 3126 received.append(rf.read()) 3127 3128 received += iter(rf.read, None) 3129 3130 sent, received = b''.join(sent), b''.join(received) 3131 self.assertEqual(sent, received) 3132 self.assertTrue(wf.closed) 3133 self.assertTrue(rf.closed) 3134 3135class CMiscIOTest(MiscIOTest): 3136 io = io 3137 shutdown_error = "RuntimeError: could not find io module state" 3138 3139class PyMiscIOTest(MiscIOTest): 3140 io = pyio 3141 shutdown_error = "LookupError: unknown encoding: ascii" 3142 3143 3144@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 3145class SignalsTest(unittest.TestCase): 3146 3147 def setUp(self): 3148 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 3149 3150 def tearDown(self): 3151 signal.signal(signal.SIGALRM, self.oldalrm) 3152 3153 def alarm_interrupt(self, sig, frame): 3154 1 // 0 3155 3156 @unittest.skipUnless(threading, 'Threading required for this test.') 3157 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'), 3158 'issue #12429: skip test on FreeBSD <= 7') 3159 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 3160 """Check that a partial write, when it gets interrupted, properly 3161 invokes the signal handler, and bubbles up the exception raised 3162 in the latter.""" 3163 read_results = [] 3164 def _read(): 3165 s = os.read(r, 1) 3166 read_results.append(s) 3167 t = threading.Thread(target=_read) 3168 t.daemon = True 3169 r, w = os.pipe() 3170 try: 3171 wio = self.io.open(w, **fdopen_kwargs) 3172 t.start() 3173 # Fill the pipe enough that the write will be blocking. 3174 # It will be interrupted by the timer armed above. Since the 3175 # other thread has read one byte, the low-level write will 3176 # return with a successful (partial) result rather than an EINTR. 3177 # The buffered IO layer must check for pending signal 3178 # handlers, which in this case will invoke alarm_interrupt(). 3179 try: 3180 signal.alarm(1) 3181 with self.assertRaises(ZeroDivisionError): 3182 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1)) 3183 finally: 3184 signal.alarm(0) 3185 t.join() 3186 3187 # We got one byte, get another one and check that it isn't a 3188 # repeat of the first one. 3189 read_results.append(os.read(r, 1)) 3190 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 3191 finally: 3192 os.close(w) 3193 os.close(r) 3194 # This is deliberate. If we didn't close the file descriptor 3195 # before closing wio, wio would try to flush its internal 3196 # buffer, and block again. 3197 try: 3198 wio.close() 3199 except IOError as e: 3200 if e.errno != errno.EBADF: 3201 raise 3202 3203 def test_interrupted_write_unbuffered(self): 3204 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 3205 3206 def test_interrupted_write_buffered(self): 3207 self.check_interrupted_write(b"xy", b"xy", mode="wb") 3208 3209 def test_interrupted_write_text(self): 3210 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 3211 3212 def check_reentrant_write(self, data, **fdopen_kwargs): 3213 def on_alarm(*args): 3214 # Will be called reentrantly from the same thread 3215 wio.write(data) 3216 1//0 3217 signal.signal(signal.SIGALRM, on_alarm) 3218 r, w = os.pipe() 3219 wio = self.io.open(w, **fdopen_kwargs) 3220 try: 3221 signal.alarm(1) 3222 # Either the reentrant call to wio.write() fails with RuntimeError, 3223 # or the signal handler raises ZeroDivisionError. 3224 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 3225 while 1: 3226 for i in range(100): 3227 wio.write(data) 3228 wio.flush() 3229 # Make sure the buffer doesn't fill up and block further writes 3230 os.read(r, len(data) * 100) 3231 exc = cm.exception 3232 if isinstance(exc, RuntimeError): 3233 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 3234 finally: 3235 signal.alarm(0) 3236 wio.close() 3237 os.close(r) 3238 3239 def test_reentrant_write_buffered(self): 3240 self.check_reentrant_write(b"xy", mode="wb") 3241 3242 def test_reentrant_write_text(self): 3243 self.check_reentrant_write("xy", mode="w", encoding="ascii") 3244 3245 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 3246 """Check that a buffered read, when it gets interrupted (either 3247 returning a partial result or EINTR), properly invokes the signal 3248 handler and retries if the latter returned successfully.""" 3249 r, w = os.pipe() 3250 fdopen_kwargs["closefd"] = False 3251 def alarm_handler(sig, frame): 3252 os.write(w, b"bar") 3253 signal.signal(signal.SIGALRM, alarm_handler) 3254 try: 3255 rio = self.io.open(r, **fdopen_kwargs) 3256 os.write(w, b"foo") 3257 signal.alarm(1) 3258 # Expected behaviour: 3259 # - first raw read() returns partial b"foo" 3260 # - second raw read() returns EINTR 3261 # - third raw read() returns b"bar" 3262 self.assertEqual(decode(rio.read(6)), "foobar") 3263 finally: 3264 signal.alarm(0) 3265 rio.close() 3266 os.close(w) 3267 os.close(r) 3268 3269 def test_interrupterd_read_retry_buffered(self): 3270 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 3271 mode="rb") 3272 3273 def test_interrupterd_read_retry_text(self): 3274 self.check_interrupted_read_retry(lambda x: x, 3275 mode="r") 3276 3277 @unittest.skipUnless(threading, 'Threading required for this test.') 3278 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 3279 """Check that a buffered write, when it gets interrupted (either 3280 returning a partial result or EINTR), properly invokes the signal 3281 handler and retries if the latter returned successfully.""" 3282 select = support.import_module("select") 3283 # A quantity that exceeds the buffer size of an anonymous pipe's 3284 # write end. 3285 N = support.PIPE_MAX_SIZE 3286 r, w = os.pipe() 3287 fdopen_kwargs["closefd"] = False 3288 # We need a separate thread to read from the pipe and allow the 3289 # write() to finish. This thread is started after the SIGALRM is 3290 # received (forcing a first EINTR in write()). 3291 read_results = [] 3292 write_finished = False 3293 error = [None] 3294 def _read(): 3295 try: 3296 while not write_finished: 3297 while r in select.select([r], [], [], 1.0)[0]: 3298 s = os.read(r, 1024) 3299 read_results.append(s) 3300 except BaseException as exc: 3301 error[0] = exc 3302 t = threading.Thread(target=_read) 3303 t.daemon = True 3304 def alarm1(sig, frame): 3305 signal.signal(signal.SIGALRM, alarm2) 3306 signal.alarm(1) 3307 def alarm2(sig, frame): 3308 t.start() 3309 signal.signal(signal.SIGALRM, alarm1) 3310 try: 3311 wio = self.io.open(w, **fdopen_kwargs) 3312 signal.alarm(1) 3313 # Expected behaviour: 3314 # - first raw write() is partial (because of the limited pipe buffer 3315 # and the first alarm) 3316 # - second raw write() returns EINTR (because of the second alarm) 3317 # - subsequent write()s are successful (either partial or complete) 3318 self.assertEqual(N, wio.write(item * N)) 3319 wio.flush() 3320 write_finished = True 3321 t.join() 3322 3323 self.assertIsNone(error[0]) 3324 self.assertEqual(N, sum(len(x) for x in read_results)) 3325 finally: 3326 signal.alarm(0) 3327 write_finished = True 3328 os.close(w) 3329 os.close(r) 3330 # This is deliberate. If we didn't close the file descriptor 3331 # before closing wio, wio would try to flush its internal 3332 # buffer, and could block (in case of failure). 3333 try: 3334 wio.close() 3335 except IOError as e: 3336 if e.errno != errno.EBADF: 3337 raise 3338 3339 def test_interrupterd_write_retry_buffered(self): 3340 self.check_interrupted_write_retry(b"x", mode="wb") 3341 3342 def test_interrupterd_write_retry_text(self): 3343 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 3344 3345 3346class CSignalsTest(SignalsTest): 3347 io = io 3348 3349class PySignalsTest(SignalsTest): 3350 io = pyio 3351 3352 # Handling reentrancy issues would slow down _pyio even more, so the 3353 # tests are disabled. 3354 test_reentrant_write_buffered = None 3355 test_reentrant_write_text = None 3356 3357 3358def test_main(): 3359 tests = (CIOTest, PyIOTest, 3360 CBufferedReaderTest, PyBufferedReaderTest, 3361 CBufferedWriterTest, PyBufferedWriterTest, 3362 CBufferedRWPairTest, PyBufferedRWPairTest, 3363 CBufferedRandomTest, PyBufferedRandomTest, 3364 StatefulIncrementalDecoderTest, 3365 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 3366 CTextIOWrapperTest, PyTextIOWrapperTest, 3367 CMiscIOTest, PyMiscIOTest, 3368 CSignalsTest, PySignalsTest, 3369 ) 3370 3371 # Put the namespaces of the IO module we are testing and some useful mock 3372 # classes in the __dict__ of each test. 3373 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 3374 MockNonBlockWriterIO, MockRawIOWithoutRead) 3375 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 3376 c_io_ns = dict((name, getattr(io, name)) for name in all_members) 3377 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members) 3378 globs = globals() 3379 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 3380 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 3381 # Avoid turning open into a bound method. 3382 py_io_ns["open"] = pyio.OpenWrapper 3383 for test in tests: 3384 if test.__name__.startswith("C"): 3385 for name, obj in c_io_ns.items(): 3386 setattr(test, name, obj) 3387 elif test.__name__.startswith("Py"): 3388 for name, obj in py_io_ns.items(): 3389 setattr(test, name, obj) 3390 3391 support.run_unittest(*tests) 3392 3393if __name__ == "__main__": 3394 test_main() 3395