1"""Unit tests for the io module.""" 2 3# Tests of io are scattered over the test suite: 4# * test_bufio - tests file buffering 5# * test_memoryio - tests BytesIO and StringIO 6# * test_fileio - tests FileIO 7# * test_file - tests the file interface 8# * test_io - tests everything else in the io module 9# * test_univnewlines - tests universal newline support 10# * test_largefile - tests operations on a file greater than 2**32 bytes 11# (only enabled with -ulargefile) 12 13################################################################################ 14# ATTENTION TEST WRITERS!!! 15################################################################################ 16# When writing tests for io, it's important to test both the C and Python 17# implementations. This is usually done by writing a base test that refers to 18# the type it is testing as an attribute. Then it provides custom subclasses to 19# test both implementations. This file has lots of examples. 20################################################################################ 21 22import abc 23import array 24import errno 25import locale 26import os 27import pickle 28import random 29import signal 30import sys 31import textwrap 32import threading 33import time 34import unittest 35import warnings 36import weakref 37from collections import deque, UserList 38from itertools import cycle, count 39from test import support 40from test.support.script_helper import ( 41 assert_python_ok, assert_python_failure, run_python_until_end) 42from test.support import import_helper 43from test.support import os_helper 44from test.support import threading_helper 45from test.support import warnings_helper 46from test.support import skip_if_sanitizer 47from test.support.os_helper import FakePath 48 49import codecs 50import io # C implementation of io 51import _pyio as pyio # Python implementation of io 52 53try: 54 import ctypes 55except ImportError: 56 def byteslike(*pos, **kw): 57 return array.array("b", bytes(*pos, **kw)) 58else: 59 def byteslike(*pos, **kw): 60 """Create a bytes-like object having no string or sequence methods""" 61 data = bytes(*pos, **kw) 62 obj = EmptyStruct() 63 ctypes.resize(obj, len(data)) 64 memoryview(obj).cast("B")[:] = data 65 return obj 66 class EmptyStruct(ctypes.Structure): 67 pass 68 69# Does io.IOBase finalizer log the exception if the close() method fails? 70# The exception is ignored silently by default in release build. 71IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode) 72 73 74def _default_chunk_size(): 75 """Get the default TextIOWrapper chunk size""" 76 with open(__file__, "r", encoding="latin-1") as f: 77 return f._CHUNK_SIZE 78 79requires_alarm = unittest.skipUnless( 80 hasattr(signal, "alarm"), "test requires signal.alarm()" 81) 82 83 84class MockRawIOWithoutRead: 85 """A RawIO implementation without read(), so as to exercise the default 86 RawIO.read() which calls readinto().""" 87 88 def __init__(self, read_stack=()): 89 self._read_stack = list(read_stack) 90 self._write_stack = [] 91 self._reads = 0 92 self._extraneous_reads = 0 93 94 def write(self, b): 95 self._write_stack.append(bytes(b)) 96 return len(b) 97 98 def writable(self): 99 return True 100 101 def fileno(self): 102 return 42 103 104 def readable(self): 105 return True 106 107 def seekable(self): 108 return True 109 110 def seek(self, pos, whence): 111 return 0 # wrong but we gotta return something 112 113 def tell(self): 114 return 0 # same comment as above 115 116 def readinto(self, buf): 117 self._reads += 1 118 max_len = len(buf) 119 try: 120 data = self._read_stack[0] 121 except IndexError: 122 self._extraneous_reads += 1 123 return 0 124 if data is None: 125 del self._read_stack[0] 126 return None 127 n = len(data) 128 if len(data) <= max_len: 129 del self._read_stack[0] 130 buf[:n] = data 131 return n 132 else: 133 buf[:] = data[:max_len] 134 self._read_stack[0] = data[max_len:] 135 return max_len 136 137 def truncate(self, pos=None): 138 return pos 139 140class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 141 pass 142 143class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 144 pass 145 146 147class MockRawIO(MockRawIOWithoutRead): 148 149 def read(self, n=None): 150 self._reads += 1 151 try: 152 return self._read_stack.pop(0) 153 except: 154 self._extraneous_reads += 1 155 return b"" 156 157class CMockRawIO(MockRawIO, io.RawIOBase): 158 pass 159 160class PyMockRawIO(MockRawIO, pyio.RawIOBase): 161 pass 162 163 164class MisbehavedRawIO(MockRawIO): 165 def write(self, b): 166 return super().write(b) * 2 167 168 def read(self, n=None): 169 return super().read(n) * 2 170 171 def seek(self, pos, whence): 172 return -123 173 174 def tell(self): 175 return -456 176 177 def readinto(self, buf): 178 super().readinto(buf) 179 return len(buf) * 5 180 181class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 182 pass 183 184class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 185 pass 186 187 188class SlowFlushRawIO(MockRawIO): 189 def __init__(self): 190 super().__init__() 191 self.in_flush = threading.Event() 192 193 def flush(self): 194 self.in_flush.set() 195 time.sleep(0.25) 196 197class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase): 198 pass 199 200class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase): 201 pass 202 203 204class CloseFailureIO(MockRawIO): 205 closed = 0 206 207 def close(self): 208 if not self.closed: 209 self.closed = 1 210 raise OSError 211 212class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 213 pass 214 215class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 216 pass 217 218 219class MockFileIO: 220 221 def __init__(self, data): 222 self.read_history = [] 223 super().__init__(data) 224 225 def read(self, n=None): 226 res = super().read(n) 227 self.read_history.append(None if res is None else len(res)) 228 return res 229 230 def readinto(self, b): 231 res = super().readinto(b) 232 self.read_history.append(res) 233 return res 234 235class CMockFileIO(MockFileIO, io.BytesIO): 236 pass 237 238class PyMockFileIO(MockFileIO, pyio.BytesIO): 239 pass 240 241 242class MockUnseekableIO: 243 def seekable(self): 244 return False 245 246 def seek(self, *args): 247 raise self.UnsupportedOperation("not seekable") 248 249 def tell(self, *args): 250 raise self.UnsupportedOperation("not seekable") 251 252 def truncate(self, *args): 253 raise self.UnsupportedOperation("not seekable") 254 255class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): 256 UnsupportedOperation = io.UnsupportedOperation 257 258class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO): 259 UnsupportedOperation = pyio.UnsupportedOperation 260 261 262class MockNonBlockWriterIO: 263 264 def __init__(self): 265 self._write_stack = [] 266 self._blocker_char = None 267 268 def pop_written(self): 269 s = b"".join(self._write_stack) 270 self._write_stack[:] = [] 271 return s 272 273 def block_on(self, char): 274 """Block when a given char is encountered.""" 275 self._blocker_char = char 276 277 def readable(self): 278 return True 279 280 def seekable(self): 281 return True 282 283 def seek(self, pos, whence=0): 284 # naive implementation, enough for tests 285 return 0 286 287 def writable(self): 288 return True 289 290 def write(self, b): 291 b = bytes(b) 292 n = -1 293 if self._blocker_char: 294 try: 295 n = b.index(self._blocker_char) 296 except ValueError: 297 pass 298 else: 299 if n > 0: 300 # write data up to the first blocker 301 self._write_stack.append(b[:n]) 302 return n 303 else: 304 # cancel blocker and indicate would block 305 self._blocker_char = None 306 return None 307 self._write_stack.append(b) 308 return len(b) 309 310class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 311 BlockingIOError = io.BlockingIOError 312 313class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 314 BlockingIOError = pyio.BlockingIOError 315 316 317class IOTest(unittest.TestCase): 318 319 def setUp(self): 320 os_helper.unlink(os_helper.TESTFN) 321 322 def tearDown(self): 323 os_helper.unlink(os_helper.TESTFN) 324 325 def write_ops(self, f): 326 self.assertEqual(f.write(b"blah."), 5) 327 f.truncate(0) 328 self.assertEqual(f.tell(), 5) 329 f.seek(0) 330 331 self.assertEqual(f.write(b"blah."), 5) 332 self.assertEqual(f.seek(0), 0) 333 self.assertEqual(f.write(b"Hello."), 6) 334 self.assertEqual(f.tell(), 6) 335 self.assertEqual(f.seek(-1, 1), 5) 336 self.assertEqual(f.tell(), 5) 337 buffer = bytearray(b" world\n\n\n") 338 self.assertEqual(f.write(buffer), 9) 339 buffer[:] = b"*" * 9 # Overwrite our copy of the data 340 self.assertEqual(f.seek(0), 0) 341 self.assertEqual(f.write(b"h"), 1) 342 self.assertEqual(f.seek(-1, 2), 13) 343 self.assertEqual(f.tell(), 13) 344 345 self.assertEqual(f.truncate(12), 12) 346 self.assertEqual(f.tell(), 13) 347 self.assertRaises(TypeError, f.seek, 0.0) 348 349 def read_ops(self, f, buffered=False): 350 data = f.read(5) 351 self.assertEqual(data, b"hello") 352 data = byteslike(data) 353 self.assertEqual(f.readinto(data), 5) 354 self.assertEqual(bytes(data), b" worl") 355 data = bytearray(5) 356 self.assertEqual(f.readinto(data), 2) 357 self.assertEqual(len(data), 5) 358 self.assertEqual(data[:2], b"d\n") 359 self.assertEqual(f.seek(0), 0) 360 self.assertEqual(f.read(20), b"hello world\n") 361 self.assertEqual(f.read(1), b"") 362 self.assertEqual(f.readinto(byteslike(b"x")), 0) 363 self.assertEqual(f.seek(-6, 2), 6) 364 self.assertEqual(f.read(5), b"world") 365 self.assertEqual(f.read(0), b"") 366 self.assertEqual(f.readinto(byteslike()), 0) 367 self.assertEqual(f.seek(-6, 1), 5) 368 self.assertEqual(f.read(5), b" worl") 369 self.assertEqual(f.tell(), 10) 370 self.assertRaises(TypeError, f.seek, 0.0) 371 if buffered: 372 f.seek(0) 373 self.assertEqual(f.read(), b"hello world\n") 374 f.seek(6) 375 self.assertEqual(f.read(), b"world\n") 376 self.assertEqual(f.read(), b"") 377 f.seek(0) 378 data = byteslike(5) 379 self.assertEqual(f.readinto1(data), 5) 380 self.assertEqual(bytes(data), b"hello") 381 382 LARGE = 2**31 383 384 def large_file_ops(self, f): 385 assert f.readable() 386 assert f.writable() 387 try: 388 self.assertEqual(f.seek(self.LARGE), self.LARGE) 389 except (OverflowError, ValueError): 390 self.skipTest("no largefile support") 391 self.assertEqual(f.tell(), self.LARGE) 392 self.assertEqual(f.write(b"xxx"), 3) 393 self.assertEqual(f.tell(), self.LARGE + 3) 394 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 395 self.assertEqual(f.truncate(), self.LARGE + 2) 396 self.assertEqual(f.tell(), self.LARGE + 2) 397 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 398 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 399 self.assertEqual(f.tell(), self.LARGE + 2) 400 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 401 self.assertEqual(f.seek(-1, 2), self.LARGE) 402 self.assertEqual(f.read(2), b"x") 403 404 def test_invalid_operations(self): 405 # Try writing on a file opened in read mode and vice-versa. 406 exc = self.UnsupportedOperation 407 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp: 408 self.assertRaises(exc, fp.read) 409 self.assertRaises(exc, fp.readline) 410 with self.open(os_helper.TESTFN, "wb") as fp: 411 self.assertRaises(exc, fp.read) 412 self.assertRaises(exc, fp.readline) 413 with self.open(os_helper.TESTFN, "wb", buffering=0) as fp: 414 self.assertRaises(exc, fp.read) 415 self.assertRaises(exc, fp.readline) 416 with self.open(os_helper.TESTFN, "rb", buffering=0) as fp: 417 self.assertRaises(exc, fp.write, b"blah") 418 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 419 with self.open(os_helper.TESTFN, "rb") as fp: 420 self.assertRaises(exc, fp.write, b"blah") 421 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 422 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp: 423 self.assertRaises(exc, fp.write, "blah") 424 self.assertRaises(exc, fp.writelines, ["blah\n"]) 425 # Non-zero seeking from current or end pos 426 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) 427 self.assertRaises(exc, fp.seek, -1, self.SEEK_END) 428 429 @unittest.skipIf( 430 support.is_emscripten, "fstat() of a pipe fd is not supported" 431 ) 432 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 433 def test_optional_abilities(self): 434 # Test for OSError when optional APIs are not supported 435 # The purpose of this test is to try fileno(), reading, writing and 436 # seeking operations with various objects that indicate they do not 437 # support these operations. 438 439 def pipe_reader(): 440 [r, w] = os.pipe() 441 os.close(w) # So that read() is harmless 442 return self.FileIO(r, "r") 443 444 def pipe_writer(): 445 [r, w] = os.pipe() 446 self.addCleanup(os.close, r) 447 # Guarantee that we can write into the pipe without blocking 448 thread = threading.Thread(target=os.read, args=(r, 100)) 449 thread.start() 450 self.addCleanup(thread.join) 451 return self.FileIO(w, "w") 452 453 def buffered_reader(): 454 return self.BufferedReader(self.MockUnseekableIO()) 455 456 def buffered_writer(): 457 return self.BufferedWriter(self.MockUnseekableIO()) 458 459 def buffered_random(): 460 return self.BufferedRandom(self.BytesIO()) 461 462 def buffered_rw_pair(): 463 return self.BufferedRWPair(self.MockUnseekableIO(), 464 self.MockUnseekableIO()) 465 466 def text_reader(): 467 class UnseekableReader(self.MockUnseekableIO): 468 writable = self.BufferedIOBase.writable 469 write = self.BufferedIOBase.write 470 return self.TextIOWrapper(UnseekableReader(), "ascii") 471 472 def text_writer(): 473 class UnseekableWriter(self.MockUnseekableIO): 474 readable = self.BufferedIOBase.readable 475 read = self.BufferedIOBase.read 476 return self.TextIOWrapper(UnseekableWriter(), "ascii") 477 478 tests = ( 479 (pipe_reader, "fr"), (pipe_writer, "fw"), 480 (buffered_reader, "r"), (buffered_writer, "w"), 481 (buffered_random, "rws"), (buffered_rw_pair, "rw"), 482 (text_reader, "r"), (text_writer, "w"), 483 (self.BytesIO, "rws"), (self.StringIO, "rws"), 484 ) 485 for [test, abilities] in tests: 486 with self.subTest(test), test() as obj: 487 readable = "r" in abilities 488 self.assertEqual(obj.readable(), readable) 489 writable = "w" in abilities 490 self.assertEqual(obj.writable(), writable) 491 492 if isinstance(obj, self.TextIOBase): 493 data = "3" 494 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)): 495 data = b"3" 496 else: 497 self.fail("Unknown base class") 498 499 if "f" in abilities: 500 obj.fileno() 501 else: 502 self.assertRaises(OSError, obj.fileno) 503 504 if readable: 505 obj.read(1) 506 obj.read() 507 else: 508 self.assertRaises(OSError, obj.read, 1) 509 self.assertRaises(OSError, obj.read) 510 511 if writable: 512 obj.write(data) 513 else: 514 self.assertRaises(OSError, obj.write, data) 515 516 if sys.platform.startswith("win") and test in ( 517 pipe_reader, pipe_writer): 518 # Pipes seem to appear as seekable on Windows 519 continue 520 seekable = "s" in abilities 521 self.assertEqual(obj.seekable(), seekable) 522 523 if seekable: 524 obj.tell() 525 obj.seek(0) 526 else: 527 self.assertRaises(OSError, obj.tell) 528 self.assertRaises(OSError, obj.seek, 0) 529 530 if writable and seekable: 531 obj.truncate() 532 obj.truncate(0) 533 else: 534 self.assertRaises(OSError, obj.truncate) 535 self.assertRaises(OSError, obj.truncate, 0) 536 537 def test_open_handles_NUL_chars(self): 538 fn_with_NUL = 'foo\0bar' 539 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8") 540 541 bytes_fn = bytes(fn_with_NUL, 'ascii') 542 with warnings.catch_warnings(): 543 warnings.simplefilter("ignore", DeprecationWarning) 544 self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8") 545 546 def test_raw_file_io(self): 547 with self.open(os_helper.TESTFN, "wb", buffering=0) as f: 548 self.assertEqual(f.readable(), False) 549 self.assertEqual(f.writable(), True) 550 self.assertEqual(f.seekable(), True) 551 self.write_ops(f) 552 with self.open(os_helper.TESTFN, "rb", buffering=0) as f: 553 self.assertEqual(f.readable(), True) 554 self.assertEqual(f.writable(), False) 555 self.assertEqual(f.seekable(), True) 556 self.read_ops(f) 557 558 def test_buffered_file_io(self): 559 with self.open(os_helper.TESTFN, "wb") as f: 560 self.assertEqual(f.readable(), False) 561 self.assertEqual(f.writable(), True) 562 self.assertEqual(f.seekable(), True) 563 self.write_ops(f) 564 with self.open(os_helper.TESTFN, "rb") as f: 565 self.assertEqual(f.readable(), True) 566 self.assertEqual(f.writable(), False) 567 self.assertEqual(f.seekable(), True) 568 self.read_ops(f, True) 569 570 def test_readline(self): 571 with self.open(os_helper.TESTFN, "wb") as f: 572 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 573 with self.open(os_helper.TESTFN, "rb") as f: 574 self.assertEqual(f.readline(), b"abc\n") 575 self.assertEqual(f.readline(10), b"def\n") 576 self.assertEqual(f.readline(2), b"xy") 577 self.assertEqual(f.readline(4), b"zzy\n") 578 self.assertEqual(f.readline(), b"foo\x00bar\n") 579 self.assertEqual(f.readline(None), b"another line") 580 self.assertRaises(TypeError, f.readline, 5.3) 581 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 582 self.assertRaises(TypeError, f.readline, 5.3) 583 584 def test_readline_nonsizeable(self): 585 # Issue #30061 586 # Crash when readline() returns an object without __len__ 587 class R(self.IOBase): 588 def readline(self): 589 return None 590 self.assertRaises((TypeError, StopIteration), next, R()) 591 592 def test_next_nonsizeable(self): 593 # Issue #30061 594 # Crash when __next__() returns an object without __len__ 595 class R(self.IOBase): 596 def __next__(self): 597 return None 598 self.assertRaises(TypeError, R().readlines, 1) 599 600 def test_raw_bytes_io(self): 601 f = self.BytesIO() 602 self.write_ops(f) 603 data = f.getvalue() 604 self.assertEqual(data, b"hello world\n") 605 f = self.BytesIO(data) 606 self.read_ops(f, True) 607 608 def test_large_file_ops(self): 609 # On Windows and Mac OSX this test consumes large resources; It takes 610 # a long time to build the >2 GiB file and takes >2 GiB of disk space 611 # therefore the resource must be enabled to run this test. 612 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 613 support.requires( 614 'largefile', 615 'test requires %s bytes and a long time to run' % self.LARGE) 616 with self.open(os_helper.TESTFN, "w+b", 0) as f: 617 self.large_file_ops(f) 618 with self.open(os_helper.TESTFN, "w+b") as f: 619 self.large_file_ops(f) 620 621 def test_with_open(self): 622 for bufsize in (0, 100): 623 f = None 624 with self.open(os_helper.TESTFN, "wb", bufsize) as f: 625 f.write(b"xxx") 626 self.assertEqual(f.closed, True) 627 f = None 628 try: 629 with self.open(os_helper.TESTFN, "wb", bufsize) as f: 630 1/0 631 except ZeroDivisionError: 632 self.assertEqual(f.closed, True) 633 else: 634 self.fail("1/0 didn't raise an exception") 635 636 # issue 5008 637 def test_append_mode_tell(self): 638 with self.open(os_helper.TESTFN, "wb") as f: 639 f.write(b"xxx") 640 with self.open(os_helper.TESTFN, "ab", buffering=0) as f: 641 self.assertEqual(f.tell(), 3) 642 with self.open(os_helper.TESTFN, "ab") as f: 643 self.assertEqual(f.tell(), 3) 644 with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f: 645 self.assertGreater(f.tell(), 0) 646 647 def test_destructor(self): 648 record = [] 649 class MyFileIO(self.FileIO): 650 def __del__(self): 651 record.append(1) 652 try: 653 f = super().__del__ 654 except AttributeError: 655 pass 656 else: 657 f() 658 def close(self): 659 record.append(2) 660 super().close() 661 def flush(self): 662 record.append(3) 663 super().flush() 664 with warnings_helper.check_warnings(('', ResourceWarning)): 665 f = MyFileIO(os_helper.TESTFN, "wb") 666 f.write(b"xxx") 667 del f 668 support.gc_collect() 669 self.assertEqual(record, [1, 2, 3]) 670 with self.open(os_helper.TESTFN, "rb") as f: 671 self.assertEqual(f.read(), b"xxx") 672 673 def _check_base_destructor(self, base): 674 record = [] 675 class MyIO(base): 676 def __init__(self): 677 # This exercises the availability of attributes on object 678 # destruction. 679 # (in the C version, close() is called by the tp_dealloc 680 # function, not by __del__) 681 self.on_del = 1 682 self.on_close = 2 683 self.on_flush = 3 684 def __del__(self): 685 record.append(self.on_del) 686 try: 687 f = super().__del__ 688 except AttributeError: 689 pass 690 else: 691 f() 692 def close(self): 693 record.append(self.on_close) 694 super().close() 695 def flush(self): 696 record.append(self.on_flush) 697 super().flush() 698 f = MyIO() 699 del f 700 support.gc_collect() 701 self.assertEqual(record, [1, 2, 3]) 702 703 def test_IOBase_destructor(self): 704 self._check_base_destructor(self.IOBase) 705 706 def test_RawIOBase_destructor(self): 707 self._check_base_destructor(self.RawIOBase) 708 709 def test_BufferedIOBase_destructor(self): 710 self._check_base_destructor(self.BufferedIOBase) 711 712 def test_TextIOBase_destructor(self): 713 self._check_base_destructor(self.TextIOBase) 714 715 def test_close_flushes(self): 716 with self.open(os_helper.TESTFN, "wb") as f: 717 f.write(b"xxx") 718 with self.open(os_helper.TESTFN, "rb") as f: 719 self.assertEqual(f.read(), b"xxx") 720 721 def test_array_writes(self): 722 a = array.array('i', range(10)) 723 n = len(a.tobytes()) 724 def check(f): 725 with f: 726 self.assertEqual(f.write(a), n) 727 f.writelines((a,)) 728 check(self.BytesIO()) 729 check(self.FileIO(os_helper.TESTFN, "w")) 730 check(self.BufferedWriter(self.MockRawIO())) 731 check(self.BufferedRandom(self.MockRawIO())) 732 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())) 733 734 def test_closefd(self): 735 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w', 736 encoding="utf-8", closefd=False) 737 738 def test_read_closed(self): 739 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 740 f.write("egg\n") 741 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 742 file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False) 743 self.assertEqual(file.read(), "egg\n") 744 file.seek(0) 745 file.close() 746 self.assertRaises(ValueError, file.read) 747 with self.open(os_helper.TESTFN, "rb") as f: 748 file = self.open(f.fileno(), "rb", closefd=False) 749 self.assertEqual(file.read()[:3], b"egg") 750 file.close() 751 self.assertRaises(ValueError, file.readinto, bytearray(1)) 752 753 def test_no_closefd_with_filename(self): 754 # can't use closefd in combination with a file name 755 self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", 756 encoding="utf-8", closefd=False) 757 758 def test_closefd_attr(self): 759 with self.open(os_helper.TESTFN, "wb") as f: 760 f.write(b"egg\n") 761 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 762 self.assertEqual(f.buffer.raw.closefd, True) 763 file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False) 764 self.assertEqual(file.buffer.raw.closefd, False) 765 766 def test_garbage_collection(self): 767 # FileIO objects are collected, and collecting them flushes 768 # all data to disk. 769 with warnings_helper.check_warnings(('', ResourceWarning)): 770 f = self.FileIO(os_helper.TESTFN, "wb") 771 f.write(b"abcxxx") 772 f.f = f 773 wr = weakref.ref(f) 774 del f 775 support.gc_collect() 776 self.assertIsNone(wr(), wr) 777 with self.open(os_helper.TESTFN, "rb") as f: 778 self.assertEqual(f.read(), b"abcxxx") 779 780 def test_unbounded_file(self): 781 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 782 zero = "/dev/zero" 783 if not os.path.exists(zero): 784 self.skipTest("{0} does not exist".format(zero)) 785 if sys.maxsize > 0x7FFFFFFF: 786 self.skipTest("test can only run in a 32-bit address space") 787 if support.real_max_memuse < support._2G: 788 self.skipTest("test requires at least 2 GiB of memory") 789 with self.open(zero, "rb", buffering=0) as f: 790 self.assertRaises(OverflowError, f.read) 791 with self.open(zero, "rb") as f: 792 self.assertRaises(OverflowError, f.read) 793 with self.open(zero, "r") as f: 794 self.assertRaises(OverflowError, f.read) 795 796 def check_flush_error_on_close(self, *args, **kwargs): 797 # Test that the file is closed despite failed flush 798 # and that flush() is called before file closed. 799 f = self.open(*args, **kwargs) 800 closed = [] 801 def bad_flush(): 802 closed[:] = [f.closed] 803 raise OSError() 804 f.flush = bad_flush 805 self.assertRaises(OSError, f.close) # exception not swallowed 806 self.assertTrue(f.closed) 807 self.assertTrue(closed) # flush() called 808 self.assertFalse(closed[0]) # flush() called before file closed 809 f.flush = lambda: None # break reference loop 810 811 def test_flush_error_on_close(self): 812 # raw file 813 # Issue #5700: io.FileIO calls flush() after file closed 814 self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0) 815 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 816 self.check_flush_error_on_close(fd, 'wb', buffering=0) 817 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 818 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) 819 os.close(fd) 820 # buffered io 821 self.check_flush_error_on_close(os_helper.TESTFN, 'wb') 822 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 823 self.check_flush_error_on_close(fd, 'wb') 824 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 825 self.check_flush_error_on_close(fd, 'wb', closefd=False) 826 os.close(fd) 827 # text io 828 self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8") 829 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 830 self.check_flush_error_on_close(fd, 'w', encoding="utf-8") 831 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) 832 self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False) 833 os.close(fd) 834 835 def test_multi_close(self): 836 f = self.open(os_helper.TESTFN, "wb", buffering=0) 837 f.close() 838 f.close() 839 f.close() 840 self.assertRaises(ValueError, f.flush) 841 842 def test_RawIOBase_read(self): 843 # Exercise the default limited RawIOBase.read(n) implementation (which 844 # calls readinto() internally). 845 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 846 self.assertEqual(rawio.read(2), b"ab") 847 self.assertEqual(rawio.read(2), b"c") 848 self.assertEqual(rawio.read(2), b"d") 849 self.assertEqual(rawio.read(2), None) 850 self.assertEqual(rawio.read(2), b"ef") 851 self.assertEqual(rawio.read(2), b"g") 852 self.assertEqual(rawio.read(2), None) 853 self.assertEqual(rawio.read(2), b"") 854 855 def test_types_have_dict(self): 856 test = ( 857 self.IOBase(), 858 self.RawIOBase(), 859 self.TextIOBase(), 860 self.StringIO(), 861 self.BytesIO() 862 ) 863 for obj in test: 864 self.assertTrue(hasattr(obj, "__dict__")) 865 866 def test_opener(self): 867 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 868 f.write("egg\n") 869 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 870 def opener(path, flags): 871 return fd 872 with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f: 873 self.assertEqual(f.read(), "egg\n") 874 875 def test_bad_opener_negative_1(self): 876 # Issue #27066. 877 def badopener(fname, flags): 878 return -1 879 with self.assertRaises(ValueError) as cm: 880 open('non-existent', 'r', opener=badopener) 881 self.assertEqual(str(cm.exception), 'opener returned -1') 882 883 def test_bad_opener_other_negative(self): 884 # Issue #27066. 885 def badopener(fname, flags): 886 return -2 887 with self.assertRaises(ValueError) as cm: 888 open('non-existent', 'r', opener=badopener) 889 self.assertEqual(str(cm.exception), 'opener returned -2') 890 891 def test_opener_invalid_fd(self): 892 # Check that OSError is raised with error code EBADF if the 893 # opener returns an invalid file descriptor (see gh-82212). 894 fd = os_helper.make_bad_fd() 895 with self.assertRaises(OSError) as cm: 896 self.open('foo', opener=lambda name, flags: fd) 897 self.assertEqual(cm.exception.errno, errno.EBADF) 898 899 def test_fileio_closefd(self): 900 # Issue #4841 901 with self.open(__file__, 'rb') as f1, \ 902 self.open(__file__, 'rb') as f2: 903 fileio = self.FileIO(f1.fileno(), closefd=False) 904 # .__init__() must not close f1 905 fileio.__init__(f2.fileno(), closefd=False) 906 f1.readline() 907 # .close() must not close f2 908 fileio.close() 909 f2.readline() 910 911 def test_nonbuffered_textio(self): 912 with warnings_helper.check_no_resource_warning(self): 913 with self.assertRaises(ValueError): 914 self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0) 915 916 def test_invalid_newline(self): 917 with warnings_helper.check_no_resource_warning(self): 918 with self.assertRaises(ValueError): 919 self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid') 920 921 def test_buffered_readinto_mixin(self): 922 # Test the implementation provided by BufferedIOBase 923 class Stream(self.BufferedIOBase): 924 def read(self, size): 925 return b"12345" 926 read1 = read 927 stream = Stream() 928 for method in ("readinto", "readinto1"): 929 with self.subTest(method): 930 buffer = byteslike(5) 931 self.assertEqual(getattr(stream, method)(buffer), 5) 932 self.assertEqual(bytes(buffer), b"12345") 933 934 def test_fspath_support(self): 935 def check_path_succeeds(path): 936 with self.open(path, "w", encoding="utf-8") as f: 937 f.write("egg\n") 938 939 with self.open(path, "r", encoding="utf-8") as f: 940 self.assertEqual(f.read(), "egg\n") 941 942 check_path_succeeds(FakePath(os_helper.TESTFN)) 943 check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN))) 944 945 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 946 bad_path = FakePath(f.fileno()) 947 with self.assertRaises(TypeError): 948 self.open(bad_path, 'w', encoding="utf-8") 949 950 bad_path = FakePath(None) 951 with self.assertRaises(TypeError): 952 self.open(bad_path, 'w', encoding="utf-8") 953 954 bad_path = FakePath(FloatingPointError) 955 with self.assertRaises(FloatingPointError): 956 self.open(bad_path, 'w', encoding="utf-8") 957 958 # ensure that refcounting is correct with some error conditions 959 with self.assertRaisesRegex(ValueError, 'read/write/append mode'): 960 self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8") 961 962 def test_RawIOBase_readall(self): 963 # Exercise the default unlimited RawIOBase.read() and readall() 964 # implementations. 965 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 966 self.assertEqual(rawio.read(), b"abcdefg") 967 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 968 self.assertEqual(rawio.readall(), b"abcdefg") 969 970 def test_BufferedIOBase_readinto(self): 971 # Exercise the default BufferedIOBase.readinto() and readinto1() 972 # implementations (which call read() or read1() internally). 973 class Reader(self.BufferedIOBase): 974 def __init__(self, avail): 975 self.avail = avail 976 def read(self, size): 977 result = self.avail[:size] 978 self.avail = self.avail[size:] 979 return result 980 def read1(self, size): 981 """Returns no more than 5 bytes at once""" 982 return self.read(min(size, 5)) 983 tests = ( 984 # (test method, total data available, read buffer size, expected 985 # read size) 986 ("readinto", 10, 5, 5), 987 ("readinto", 10, 6, 6), # More than read1() can return 988 ("readinto", 5, 6, 5), # Buffer larger than total available 989 ("readinto", 6, 7, 6), 990 ("readinto", 10, 0, 0), # Empty buffer 991 ("readinto1", 10, 5, 5), # Result limited to single read1() call 992 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return 993 ("readinto1", 5, 6, 5), # Buffer larger than total available 994 ("readinto1", 6, 7, 5), 995 ("readinto1", 10, 0, 0), # Empty buffer 996 ) 997 UNUSED_BYTE = 0x81 998 for test in tests: 999 with self.subTest(test): 1000 method, avail, request, result = test 1001 reader = Reader(bytes(range(avail))) 1002 buffer = bytearray((UNUSED_BYTE,) * request) 1003 method = getattr(reader, method) 1004 self.assertEqual(method(buffer), result) 1005 self.assertEqual(len(buffer), request) 1006 self.assertSequenceEqual(buffer[:result], range(result)) 1007 unused = (UNUSED_BYTE,) * (request - result) 1008 self.assertSequenceEqual(buffer[result:], unused) 1009 self.assertEqual(len(reader.avail), avail - result) 1010 1011 def test_close_assert(self): 1012 class R(self.IOBase): 1013 def __setattr__(self, name, value): 1014 pass 1015 def flush(self): 1016 raise OSError() 1017 f = R() 1018 # This would cause an assertion failure. 1019 self.assertRaises(OSError, f.close) 1020 1021 # Silence destructor error 1022 R.flush = lambda self: None 1023 1024 1025class CIOTest(IOTest): 1026 1027 def test_IOBase_finalize(self): 1028 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 1029 # class which inherits IOBase and an object of this class are caught 1030 # in a reference cycle and close() is already in the method cache. 1031 class MyIO(self.IOBase): 1032 def close(self): 1033 pass 1034 1035 # create an instance to populate the method cache 1036 MyIO() 1037 obj = MyIO() 1038 obj.obj = obj 1039 wr = weakref.ref(obj) 1040 del MyIO 1041 del obj 1042 support.gc_collect() 1043 self.assertIsNone(wr(), wr) 1044 1045class PyIOTest(IOTest): 1046 pass 1047 1048 1049@support.cpython_only 1050class APIMismatchTest(unittest.TestCase): 1051 1052 def test_RawIOBase_io_in_pyio_match(self): 1053 """Test that pyio RawIOBase class has all c RawIOBase methods""" 1054 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase, 1055 ignore=('__weakref__',)) 1056 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods') 1057 1058 def test_RawIOBase_pyio_in_io_match(self): 1059 """Test that c RawIOBase class has all pyio RawIOBase methods""" 1060 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase) 1061 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods') 1062 1063 1064class CommonBufferedTests: 1065 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 1066 1067 def test_detach(self): 1068 raw = self.MockRawIO() 1069 buf = self.tp(raw) 1070 self.assertIs(buf.detach(), raw) 1071 self.assertRaises(ValueError, buf.detach) 1072 1073 repr(buf) # Should still work 1074 1075 def test_fileno(self): 1076 rawio = self.MockRawIO() 1077 bufio = self.tp(rawio) 1078 1079 self.assertEqual(42, bufio.fileno()) 1080 1081 def test_invalid_args(self): 1082 rawio = self.MockRawIO() 1083 bufio = self.tp(rawio) 1084 # Invalid whence 1085 self.assertRaises(ValueError, bufio.seek, 0, -1) 1086 self.assertRaises(ValueError, bufio.seek, 0, 9) 1087 1088 def test_override_destructor(self): 1089 tp = self.tp 1090 record = [] 1091 class MyBufferedIO(tp): 1092 def __del__(self): 1093 record.append(1) 1094 try: 1095 f = super().__del__ 1096 except AttributeError: 1097 pass 1098 else: 1099 f() 1100 def close(self): 1101 record.append(2) 1102 super().close() 1103 def flush(self): 1104 record.append(3) 1105 super().flush() 1106 rawio = self.MockRawIO() 1107 bufio = MyBufferedIO(rawio) 1108 del bufio 1109 support.gc_collect() 1110 self.assertEqual(record, [1, 2, 3]) 1111 1112 def test_context_manager(self): 1113 # Test usability as a context manager 1114 rawio = self.MockRawIO() 1115 bufio = self.tp(rawio) 1116 def _with(): 1117 with bufio: 1118 pass 1119 _with() 1120 # bufio should now be closed, and using it a second time should raise 1121 # a ValueError. 1122 self.assertRaises(ValueError, _with) 1123 1124 def test_error_through_destructor(self): 1125 # Test that the exception state is not modified by a destructor, 1126 # even if close() fails. 1127 rawio = self.CloseFailureIO() 1128 with support.catch_unraisable_exception() as cm: 1129 with self.assertRaises(AttributeError): 1130 self.tp(rawio).xyzzy 1131 1132 if not IOBASE_EMITS_UNRAISABLE: 1133 self.assertIsNone(cm.unraisable) 1134 elif cm.unraisable is not None: 1135 self.assertEqual(cm.unraisable.exc_type, OSError) 1136 1137 def test_repr(self): 1138 raw = self.MockRawIO() 1139 b = self.tp(raw) 1140 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__) 1141 self.assertRegex(repr(b), "<%s>" % clsname) 1142 raw.name = "dummy" 1143 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname) 1144 raw.name = b"dummy" 1145 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname) 1146 1147 def test_recursive_repr(self): 1148 # Issue #25455 1149 raw = self.MockRawIO() 1150 b = self.tp(raw) 1151 with support.swap_attr(raw, 'name', b): 1152 try: 1153 repr(b) # Should not crash 1154 except RuntimeError: 1155 pass 1156 1157 def test_flush_error_on_close(self): 1158 # Test that buffered file is closed despite failed flush 1159 # and that flush() is called before file closed. 1160 raw = self.MockRawIO() 1161 closed = [] 1162 def bad_flush(): 1163 closed[:] = [b.closed, raw.closed] 1164 raise OSError() 1165 raw.flush = bad_flush 1166 b = self.tp(raw) 1167 self.assertRaises(OSError, b.close) # exception not swallowed 1168 self.assertTrue(b.closed) 1169 self.assertTrue(raw.closed) 1170 self.assertTrue(closed) # flush() called 1171 self.assertFalse(closed[0]) # flush() called before file closed 1172 self.assertFalse(closed[1]) 1173 raw.flush = lambda: None # break reference loop 1174 1175 def test_close_error_on_close(self): 1176 raw = self.MockRawIO() 1177 def bad_flush(): 1178 raise OSError('flush') 1179 def bad_close(): 1180 raise OSError('close') 1181 raw.close = bad_close 1182 b = self.tp(raw) 1183 b.flush = bad_flush 1184 with self.assertRaises(OSError) as err: # exception not swallowed 1185 b.close() 1186 self.assertEqual(err.exception.args, ('close',)) 1187 self.assertIsInstance(err.exception.__context__, OSError) 1188 self.assertEqual(err.exception.__context__.args, ('flush',)) 1189 self.assertFalse(b.closed) 1190 1191 # Silence destructor error 1192 raw.close = lambda: None 1193 b.flush = lambda: None 1194 1195 def test_nonnormalized_close_error_on_close(self): 1196 # Issue #21677 1197 raw = self.MockRawIO() 1198 def bad_flush(): 1199 raise non_existing_flush 1200 def bad_close(): 1201 raise non_existing_close 1202 raw.close = bad_close 1203 b = self.tp(raw) 1204 b.flush = bad_flush 1205 with self.assertRaises(NameError) as err: # exception not swallowed 1206 b.close() 1207 self.assertIn('non_existing_close', str(err.exception)) 1208 self.assertIsInstance(err.exception.__context__, NameError) 1209 self.assertIn('non_existing_flush', str(err.exception.__context__)) 1210 self.assertFalse(b.closed) 1211 1212 # Silence destructor error 1213 b.flush = lambda: None 1214 raw.close = lambda: None 1215 1216 def test_multi_close(self): 1217 raw = self.MockRawIO() 1218 b = self.tp(raw) 1219 b.close() 1220 b.close() 1221 b.close() 1222 self.assertRaises(ValueError, b.flush) 1223 1224 def test_unseekable(self): 1225 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1226 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1227 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1228 1229 def test_readonly_attributes(self): 1230 raw = self.MockRawIO() 1231 buf = self.tp(raw) 1232 x = self.MockRawIO() 1233 with self.assertRaises(AttributeError): 1234 buf.raw = x 1235 1236 1237class SizeofTest: 1238 1239 @support.cpython_only 1240 def test_sizeof(self): 1241 bufsize1 = 4096 1242 bufsize2 = 8192 1243 rawio = self.MockRawIO() 1244 bufio = self.tp(rawio, buffer_size=bufsize1) 1245 size = sys.getsizeof(bufio) - bufsize1 1246 rawio = self.MockRawIO() 1247 bufio = self.tp(rawio, buffer_size=bufsize2) 1248 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 1249 1250 @support.cpython_only 1251 def test_buffer_freeing(self) : 1252 bufsize = 4096 1253 rawio = self.MockRawIO() 1254 bufio = self.tp(rawio, buffer_size=bufsize) 1255 size = sys.getsizeof(bufio) - bufsize 1256 bufio.close() 1257 self.assertEqual(sys.getsizeof(bufio), size) 1258 1259class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 1260 read_mode = "rb" 1261 1262 def test_constructor(self): 1263 rawio = self.MockRawIO([b"abc"]) 1264 bufio = self.tp(rawio) 1265 bufio.__init__(rawio) 1266 bufio.__init__(rawio, buffer_size=1024) 1267 bufio.__init__(rawio, buffer_size=16) 1268 self.assertEqual(b"abc", bufio.read()) 1269 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1270 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1271 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1272 rawio = self.MockRawIO([b"abc"]) 1273 bufio.__init__(rawio) 1274 self.assertEqual(b"abc", bufio.read()) 1275 1276 def test_uninitialized(self): 1277 bufio = self.tp.__new__(self.tp) 1278 del bufio 1279 bufio = self.tp.__new__(self.tp) 1280 self.assertRaisesRegex((ValueError, AttributeError), 1281 'uninitialized|has no attribute', 1282 bufio.read, 0) 1283 bufio.__init__(self.MockRawIO()) 1284 self.assertEqual(bufio.read(0), b'') 1285 1286 def test_read(self): 1287 for arg in (None, 7): 1288 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1289 bufio = self.tp(rawio) 1290 self.assertEqual(b"abcdefg", bufio.read(arg)) 1291 # Invalid args 1292 self.assertRaises(ValueError, bufio.read, -2) 1293 1294 def test_read1(self): 1295 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1296 bufio = self.tp(rawio) 1297 self.assertEqual(b"a", bufio.read(1)) 1298 self.assertEqual(b"b", bufio.read1(1)) 1299 self.assertEqual(rawio._reads, 1) 1300 self.assertEqual(b"", bufio.read1(0)) 1301 self.assertEqual(b"c", bufio.read1(100)) 1302 self.assertEqual(rawio._reads, 1) 1303 self.assertEqual(b"d", bufio.read1(100)) 1304 self.assertEqual(rawio._reads, 2) 1305 self.assertEqual(b"efg", bufio.read1(100)) 1306 self.assertEqual(rawio._reads, 3) 1307 self.assertEqual(b"", bufio.read1(100)) 1308 self.assertEqual(rawio._reads, 4) 1309 1310 def test_read1_arbitrary(self): 1311 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1312 bufio = self.tp(rawio) 1313 self.assertEqual(b"a", bufio.read(1)) 1314 self.assertEqual(b"bc", bufio.read1()) 1315 self.assertEqual(b"d", bufio.read1()) 1316 self.assertEqual(b"efg", bufio.read1(-1)) 1317 self.assertEqual(rawio._reads, 3) 1318 self.assertEqual(b"", bufio.read1()) 1319 self.assertEqual(rawio._reads, 4) 1320 1321 def test_readinto(self): 1322 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1323 bufio = self.tp(rawio) 1324 b = bytearray(2) 1325 self.assertEqual(bufio.readinto(b), 2) 1326 self.assertEqual(b, b"ab") 1327 self.assertEqual(bufio.readinto(b), 2) 1328 self.assertEqual(b, b"cd") 1329 self.assertEqual(bufio.readinto(b), 2) 1330 self.assertEqual(b, b"ef") 1331 self.assertEqual(bufio.readinto(b), 1) 1332 self.assertEqual(b, b"gf") 1333 self.assertEqual(bufio.readinto(b), 0) 1334 self.assertEqual(b, b"gf") 1335 rawio = self.MockRawIO((b"abc", None)) 1336 bufio = self.tp(rawio) 1337 self.assertEqual(bufio.readinto(b), 2) 1338 self.assertEqual(b, b"ab") 1339 self.assertEqual(bufio.readinto(b), 1) 1340 self.assertEqual(b, b"cb") 1341 1342 def test_readinto1(self): 1343 buffer_size = 10 1344 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) 1345 bufio = self.tp(rawio, buffer_size=buffer_size) 1346 b = bytearray(2) 1347 self.assertEqual(bufio.peek(3), b'abc') 1348 self.assertEqual(rawio._reads, 1) 1349 self.assertEqual(bufio.readinto1(b), 2) 1350 self.assertEqual(b, b"ab") 1351 self.assertEqual(rawio._reads, 1) 1352 self.assertEqual(bufio.readinto1(b), 1) 1353 self.assertEqual(b[:1], b"c") 1354 self.assertEqual(rawio._reads, 1) 1355 self.assertEqual(bufio.readinto1(b), 2) 1356 self.assertEqual(b, b"de") 1357 self.assertEqual(rawio._reads, 2) 1358 b = bytearray(2*buffer_size) 1359 self.assertEqual(bufio.peek(3), b'fgh') 1360 self.assertEqual(rawio._reads, 3) 1361 self.assertEqual(bufio.readinto1(b), 6) 1362 self.assertEqual(b[:6], b"fghjkl") 1363 self.assertEqual(rawio._reads, 4) 1364 1365 def test_readinto_array(self): 1366 buffer_size = 60 1367 data = b"a" * 26 1368 rawio = self.MockRawIO((data,)) 1369 bufio = self.tp(rawio, buffer_size=buffer_size) 1370 1371 # Create an array with element size > 1 byte 1372 b = array.array('i', b'x' * 32) 1373 assert len(b) != 16 1374 1375 # Read into it. We should get as many *bytes* as we can fit into b 1376 # (which is more than the number of elements) 1377 n = bufio.readinto(b) 1378 self.assertGreater(n, len(b)) 1379 1380 # Check that old contents of b are preserved 1381 bm = memoryview(b).cast('B') 1382 self.assertLess(n, len(bm)) 1383 self.assertEqual(bm[:n], data[:n]) 1384 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1385 1386 def test_readinto1_array(self): 1387 buffer_size = 60 1388 data = b"a" * 26 1389 rawio = self.MockRawIO((data,)) 1390 bufio = self.tp(rawio, buffer_size=buffer_size) 1391 1392 # Create an array with element size > 1 byte 1393 b = array.array('i', b'x' * 32) 1394 assert len(b) != 16 1395 1396 # Read into it. We should get as many *bytes* as we can fit into b 1397 # (which is more than the number of elements) 1398 n = bufio.readinto1(b) 1399 self.assertGreater(n, len(b)) 1400 1401 # Check that old contents of b are preserved 1402 bm = memoryview(b).cast('B') 1403 self.assertLess(n, len(bm)) 1404 self.assertEqual(bm[:n], data[:n]) 1405 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1406 1407 def test_readlines(self): 1408 def bufio(): 1409 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 1410 return self.tp(rawio) 1411 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 1412 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 1413 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 1414 1415 def test_buffering(self): 1416 data = b"abcdefghi" 1417 dlen = len(data) 1418 1419 tests = [ 1420 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 1421 [ 100, [ 3, 3, 3], [ dlen ] ], 1422 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 1423 ] 1424 1425 for bufsize, buf_read_sizes, raw_read_sizes in tests: 1426 rawio = self.MockFileIO(data) 1427 bufio = self.tp(rawio, buffer_size=bufsize) 1428 pos = 0 1429 for nbytes in buf_read_sizes: 1430 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 1431 pos += nbytes 1432 # this is mildly implementation-dependent 1433 self.assertEqual(rawio.read_history, raw_read_sizes) 1434 1435 def test_read_non_blocking(self): 1436 # Inject some None's in there to simulate EWOULDBLOCK 1437 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 1438 bufio = self.tp(rawio) 1439 self.assertEqual(b"abcd", bufio.read(6)) 1440 self.assertEqual(b"e", bufio.read(1)) 1441 self.assertEqual(b"fg", bufio.read()) 1442 self.assertEqual(b"", bufio.peek(1)) 1443 self.assertIsNone(bufio.read()) 1444 self.assertEqual(b"", bufio.read()) 1445 1446 rawio = self.MockRawIO((b"a", None, None)) 1447 self.assertEqual(b"a", rawio.readall()) 1448 self.assertIsNone(rawio.readall()) 1449 1450 def test_read_past_eof(self): 1451 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1452 bufio = self.tp(rawio) 1453 1454 self.assertEqual(b"abcdefg", bufio.read(9000)) 1455 1456 def test_read_all(self): 1457 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1458 bufio = self.tp(rawio) 1459 1460 self.assertEqual(b"abcdefg", bufio.read()) 1461 1462 @support.requires_resource('cpu') 1463 @threading_helper.requires_working_threading() 1464 def test_threads(self): 1465 try: 1466 # Write out many bytes with exactly the same number of 0's, 1467 # 1's... 255's. This will help us check that concurrent reading 1468 # doesn't duplicate or forget contents. 1469 N = 1000 1470 l = list(range(256)) * N 1471 random.shuffle(l) 1472 s = bytes(bytearray(l)) 1473 with self.open(os_helper.TESTFN, "wb") as f: 1474 f.write(s) 1475 with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw: 1476 bufio = self.tp(raw, 8) 1477 errors = [] 1478 results = [] 1479 def f(): 1480 try: 1481 # Intra-buffer read then buffer-flushing read 1482 for n in cycle([1, 19]): 1483 s = bufio.read(n) 1484 if not s: 1485 break 1486 # list.append() is atomic 1487 results.append(s) 1488 except Exception as e: 1489 errors.append(e) 1490 raise 1491 threads = [threading.Thread(target=f) for x in range(20)] 1492 with threading_helper.start_threads(threads): 1493 time.sleep(0.02) # yield 1494 self.assertFalse(errors, 1495 "the following exceptions were caught: %r" % errors) 1496 s = b''.join(results) 1497 for i in range(256): 1498 c = bytes(bytearray([i])) 1499 self.assertEqual(s.count(c), N) 1500 finally: 1501 os_helper.unlink(os_helper.TESTFN) 1502 1503 def test_unseekable(self): 1504 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1505 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1506 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1507 bufio.read(1) 1508 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1509 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1510 1511 def test_misbehaved_io(self): 1512 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1513 bufio = self.tp(rawio) 1514 self.assertRaises(OSError, bufio.seek, 0) 1515 self.assertRaises(OSError, bufio.tell) 1516 1517 # Silence destructor error 1518 bufio.close = lambda: None 1519 1520 def test_no_extraneous_read(self): 1521 # Issue #9550; when the raw IO object has satisfied the read request, 1522 # we should not issue any additional reads, otherwise it may block 1523 # (e.g. socket). 1524 bufsize = 16 1525 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 1526 rawio = self.MockRawIO([b"x" * n]) 1527 bufio = self.tp(rawio, bufsize) 1528 self.assertEqual(bufio.read(n), b"x" * n) 1529 # Simple case: one raw read is enough to satisfy the request. 1530 self.assertEqual(rawio._extraneous_reads, 0, 1531 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1532 # A more complex case where two raw reads are needed to satisfy 1533 # the request. 1534 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 1535 bufio = self.tp(rawio, bufsize) 1536 self.assertEqual(bufio.read(n), b"x" * n) 1537 self.assertEqual(rawio._extraneous_reads, 0, 1538 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1539 1540 def test_read_on_closed(self): 1541 # Issue #23796 1542 b = io.BufferedReader(io.BytesIO(b"12")) 1543 b.read(1) 1544 b.close() 1545 self.assertRaises(ValueError, b.peek) 1546 self.assertRaises(ValueError, b.read1, 1) 1547 1548 def test_truncate_on_read_only(self): 1549 rawio = self.MockFileIO(b"abc") 1550 bufio = self.tp(rawio) 1551 self.assertFalse(bufio.writable()) 1552 self.assertRaises(self.UnsupportedOperation, bufio.truncate) 1553 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0) 1554 1555 1556class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 1557 tp = io.BufferedReader 1558 1559 @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing " 1560 "instead of returning NULL for malloc failure.") 1561 def test_constructor(self): 1562 BufferedReaderTest.test_constructor(self) 1563 # The allocation can succeed on 32-bit builds, e.g. with more 1564 # than 2 GiB RAM and a 64-bit kernel. 1565 if sys.maxsize > 0x7FFFFFFF: 1566 rawio = self.MockRawIO() 1567 bufio = self.tp(rawio) 1568 self.assertRaises((OverflowError, MemoryError, ValueError), 1569 bufio.__init__, rawio, sys.maxsize) 1570 1571 def test_initialization(self): 1572 rawio = self.MockRawIO([b"abc"]) 1573 bufio = self.tp(rawio) 1574 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1575 self.assertRaises(ValueError, bufio.read) 1576 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1577 self.assertRaises(ValueError, bufio.read) 1578 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1579 self.assertRaises(ValueError, bufio.read) 1580 1581 def test_misbehaved_io_read(self): 1582 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1583 bufio = self.tp(rawio) 1584 # _pyio.BufferedReader seems to implement reading different, so that 1585 # checking this is not so easy. 1586 self.assertRaises(OSError, bufio.read, 10) 1587 1588 def test_garbage_collection(self): 1589 # C BufferedReader objects are collected. 1590 # The Python version has __del__, so it ends into gc.garbage instead 1591 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1592 with warnings_helper.check_warnings(('', ResourceWarning)): 1593 rawio = self.FileIO(os_helper.TESTFN, "w+b") 1594 f = self.tp(rawio) 1595 f.f = f 1596 wr = weakref.ref(f) 1597 del f 1598 support.gc_collect() 1599 self.assertIsNone(wr(), wr) 1600 1601 def test_args_error(self): 1602 # Issue #17275 1603 with self.assertRaisesRegex(TypeError, "BufferedReader"): 1604 self.tp(io.BytesIO(), 1024, 1024, 1024) 1605 1606 def test_bad_readinto_value(self): 1607 rawio = io.BufferedReader(io.BytesIO(b"12")) 1608 rawio.readinto = lambda buf: -1 1609 bufio = self.tp(rawio) 1610 with self.assertRaises(OSError) as cm: 1611 bufio.readline() 1612 self.assertIsNone(cm.exception.__cause__) 1613 1614 def test_bad_readinto_type(self): 1615 rawio = io.BufferedReader(io.BytesIO(b"12")) 1616 rawio.readinto = lambda buf: b'' 1617 bufio = self.tp(rawio) 1618 with self.assertRaises(OSError) as cm: 1619 bufio.readline() 1620 self.assertIsInstance(cm.exception.__cause__, TypeError) 1621 1622 1623class PyBufferedReaderTest(BufferedReaderTest): 1624 tp = pyio.BufferedReader 1625 1626 1627class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1628 write_mode = "wb" 1629 1630 def test_constructor(self): 1631 rawio = self.MockRawIO() 1632 bufio = self.tp(rawio) 1633 bufio.__init__(rawio) 1634 bufio.__init__(rawio, buffer_size=1024) 1635 bufio.__init__(rawio, buffer_size=16) 1636 self.assertEqual(3, bufio.write(b"abc")) 1637 bufio.flush() 1638 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1639 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1640 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1641 bufio.__init__(rawio) 1642 self.assertEqual(3, bufio.write(b"ghi")) 1643 bufio.flush() 1644 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1645 1646 def test_uninitialized(self): 1647 bufio = self.tp.__new__(self.tp) 1648 del bufio 1649 bufio = self.tp.__new__(self.tp) 1650 self.assertRaisesRegex((ValueError, AttributeError), 1651 'uninitialized|has no attribute', 1652 bufio.write, b'') 1653 bufio.__init__(self.MockRawIO()) 1654 self.assertEqual(bufio.write(b''), 0) 1655 1656 def test_detach_flush(self): 1657 raw = self.MockRawIO() 1658 buf = self.tp(raw) 1659 buf.write(b"howdy!") 1660 self.assertFalse(raw._write_stack) 1661 buf.detach() 1662 self.assertEqual(raw._write_stack, [b"howdy!"]) 1663 1664 def test_write(self): 1665 # Write to the buffered IO but don't overflow the buffer. 1666 writer = self.MockRawIO() 1667 bufio = self.tp(writer, 8) 1668 bufio.write(b"abc") 1669 self.assertFalse(writer._write_stack) 1670 buffer = bytearray(b"def") 1671 bufio.write(buffer) 1672 buffer[:] = b"***" # Overwrite our copy of the data 1673 bufio.flush() 1674 self.assertEqual(b"".join(writer._write_stack), b"abcdef") 1675 1676 def test_write_overflow(self): 1677 writer = self.MockRawIO() 1678 bufio = self.tp(writer, 8) 1679 contents = b"abcdefghijklmnop" 1680 for n in range(0, len(contents), 3): 1681 bufio.write(contents[n:n+3]) 1682 flushed = b"".join(writer._write_stack) 1683 # At least (total - 8) bytes were implicitly flushed, perhaps more 1684 # depending on the implementation. 1685 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1686 1687 def check_writes(self, intermediate_func): 1688 # Lots of writes, test the flushed output is as expected. 1689 contents = bytes(range(256)) * 1000 1690 n = 0 1691 writer = self.MockRawIO() 1692 bufio = self.tp(writer, 13) 1693 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1694 def gen_sizes(): 1695 for size in count(1): 1696 for i in range(15): 1697 yield size 1698 sizes = gen_sizes() 1699 while n < len(contents): 1700 size = min(next(sizes), len(contents) - n) 1701 self.assertEqual(bufio.write(contents[n:n+size]), size) 1702 intermediate_func(bufio) 1703 n += size 1704 bufio.flush() 1705 self.assertEqual(contents, b"".join(writer._write_stack)) 1706 1707 def test_writes(self): 1708 self.check_writes(lambda bufio: None) 1709 1710 def test_writes_and_flushes(self): 1711 self.check_writes(lambda bufio: bufio.flush()) 1712 1713 def test_writes_and_seeks(self): 1714 def _seekabs(bufio): 1715 pos = bufio.tell() 1716 bufio.seek(pos + 1, 0) 1717 bufio.seek(pos - 1, 0) 1718 bufio.seek(pos, 0) 1719 self.check_writes(_seekabs) 1720 def _seekrel(bufio): 1721 pos = bufio.seek(0, 1) 1722 bufio.seek(+1, 1) 1723 bufio.seek(-1, 1) 1724 bufio.seek(pos, 0) 1725 self.check_writes(_seekrel) 1726 1727 def test_writes_and_truncates(self): 1728 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1729 1730 def test_write_non_blocking(self): 1731 raw = self.MockNonBlockWriterIO() 1732 bufio = self.tp(raw, 8) 1733 1734 self.assertEqual(bufio.write(b"abcd"), 4) 1735 self.assertEqual(bufio.write(b"efghi"), 5) 1736 # 1 byte will be written, the rest will be buffered 1737 raw.block_on(b"k") 1738 self.assertEqual(bufio.write(b"jklmn"), 5) 1739 1740 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1741 raw.block_on(b"0") 1742 try: 1743 bufio.write(b"opqrwxyz0123456789") 1744 except self.BlockingIOError as e: 1745 written = e.characters_written 1746 else: 1747 self.fail("BlockingIOError should have been raised") 1748 self.assertEqual(written, 16) 1749 self.assertEqual(raw.pop_written(), 1750 b"abcdefghijklmnopqrwxyz") 1751 1752 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1753 s = raw.pop_written() 1754 # Previously buffered bytes were flushed 1755 self.assertTrue(s.startswith(b"01234567A"), s) 1756 1757 def test_write_and_rewind(self): 1758 raw = io.BytesIO() 1759 bufio = self.tp(raw, 4) 1760 self.assertEqual(bufio.write(b"abcdef"), 6) 1761 self.assertEqual(bufio.tell(), 6) 1762 bufio.seek(0, 0) 1763 self.assertEqual(bufio.write(b"XY"), 2) 1764 bufio.seek(6, 0) 1765 self.assertEqual(raw.getvalue(), b"XYcdef") 1766 self.assertEqual(bufio.write(b"123456"), 6) 1767 bufio.flush() 1768 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1769 1770 def test_flush(self): 1771 writer = self.MockRawIO() 1772 bufio = self.tp(writer, 8) 1773 bufio.write(b"abc") 1774 bufio.flush() 1775 self.assertEqual(b"abc", writer._write_stack[0]) 1776 1777 def test_writelines(self): 1778 l = [b'ab', b'cd', b'ef'] 1779 writer = self.MockRawIO() 1780 bufio = self.tp(writer, 8) 1781 bufio.writelines(l) 1782 bufio.flush() 1783 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1784 1785 def test_writelines_userlist(self): 1786 l = UserList([b'ab', b'cd', b'ef']) 1787 writer = self.MockRawIO() 1788 bufio = self.tp(writer, 8) 1789 bufio.writelines(l) 1790 bufio.flush() 1791 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1792 1793 def test_writelines_error(self): 1794 writer = self.MockRawIO() 1795 bufio = self.tp(writer, 8) 1796 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1797 self.assertRaises(TypeError, bufio.writelines, None) 1798 self.assertRaises(TypeError, bufio.writelines, 'abc') 1799 1800 def test_destructor(self): 1801 writer = self.MockRawIO() 1802 bufio = self.tp(writer, 8) 1803 bufio.write(b"abc") 1804 del bufio 1805 support.gc_collect() 1806 self.assertEqual(b"abc", writer._write_stack[0]) 1807 1808 def test_truncate(self): 1809 # Truncate implicitly flushes the buffer. 1810 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1811 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw: 1812 bufio = self.tp(raw, 8) 1813 bufio.write(b"abcdef") 1814 self.assertEqual(bufio.truncate(3), 3) 1815 self.assertEqual(bufio.tell(), 6) 1816 with self.open(os_helper.TESTFN, "rb", buffering=0) as f: 1817 self.assertEqual(f.read(), b"abc") 1818 1819 def test_truncate_after_write(self): 1820 # Ensure that truncate preserves the file position after 1821 # writes longer than the buffer size. 1822 # Issue: https://bugs.python.org/issue32228 1823 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1824 with self.open(os_helper.TESTFN, "wb") as f: 1825 # Fill with some buffer 1826 f.write(b'\x00' * 10000) 1827 buffer_sizes = [8192, 4096, 200] 1828 for buffer_size in buffer_sizes: 1829 with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f: 1830 f.write(b'\x00' * (buffer_size + 1)) 1831 # After write write_pos and write_end are set to 0 1832 f.read(1) 1833 # read operation makes sure that pos != raw_pos 1834 f.truncate() 1835 self.assertEqual(f.tell(), buffer_size + 2) 1836 1837 @support.requires_resource('cpu') 1838 @threading_helper.requires_working_threading() 1839 def test_threads(self): 1840 try: 1841 # Write out many bytes from many threads and test they were 1842 # all flushed. 1843 N = 1000 1844 contents = bytes(range(256)) * N 1845 sizes = cycle([1, 19]) 1846 n = 0 1847 queue = deque() 1848 while n < len(contents): 1849 size = next(sizes) 1850 queue.append(contents[n:n+size]) 1851 n += size 1852 del contents 1853 # We use a real file object because it allows us to 1854 # exercise situations where the GIL is released before 1855 # writing the buffer to the raw streams. This is in addition 1856 # to concurrency issues due to switching threads in the middle 1857 # of Python code. 1858 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw: 1859 bufio = self.tp(raw, 8) 1860 errors = [] 1861 def f(): 1862 try: 1863 while True: 1864 try: 1865 s = queue.popleft() 1866 except IndexError: 1867 return 1868 bufio.write(s) 1869 except Exception as e: 1870 errors.append(e) 1871 raise 1872 threads = [threading.Thread(target=f) for x in range(20)] 1873 with threading_helper.start_threads(threads): 1874 time.sleep(0.02) # yield 1875 self.assertFalse(errors, 1876 "the following exceptions were caught: %r" % errors) 1877 bufio.close() 1878 with self.open(os_helper.TESTFN, "rb") as f: 1879 s = f.read() 1880 for i in range(256): 1881 self.assertEqual(s.count(bytes([i])), N) 1882 finally: 1883 os_helper.unlink(os_helper.TESTFN) 1884 1885 def test_misbehaved_io(self): 1886 rawio = self.MisbehavedRawIO() 1887 bufio = self.tp(rawio, 5) 1888 self.assertRaises(OSError, bufio.seek, 0) 1889 self.assertRaises(OSError, bufio.tell) 1890 self.assertRaises(OSError, bufio.write, b"abcdef") 1891 1892 # Silence destructor error 1893 bufio.close = lambda: None 1894 1895 def test_max_buffer_size_removal(self): 1896 with self.assertRaises(TypeError): 1897 self.tp(self.MockRawIO(), 8, 12) 1898 1899 def test_write_error_on_close(self): 1900 raw = self.MockRawIO() 1901 def bad_write(b): 1902 raise OSError() 1903 raw.write = bad_write 1904 b = self.tp(raw) 1905 b.write(b'spam') 1906 self.assertRaises(OSError, b.close) # exception not swallowed 1907 self.assertTrue(b.closed) 1908 1909 @threading_helper.requires_working_threading() 1910 def test_slow_close_from_thread(self): 1911 # Issue #31976 1912 rawio = self.SlowFlushRawIO() 1913 bufio = self.tp(rawio, 8) 1914 t = threading.Thread(target=bufio.close) 1915 t.start() 1916 rawio.in_flush.wait() 1917 self.assertRaises(ValueError, bufio.write, b'spam') 1918 self.assertTrue(bufio.closed) 1919 t.join() 1920 1921 1922 1923class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1924 tp = io.BufferedWriter 1925 1926 @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing " 1927 "instead of returning NULL for malloc failure.") 1928 def test_constructor(self): 1929 BufferedWriterTest.test_constructor(self) 1930 # The allocation can succeed on 32-bit builds, e.g. with more 1931 # than 2 GiB RAM and a 64-bit kernel. 1932 if sys.maxsize > 0x7FFFFFFF: 1933 rawio = self.MockRawIO() 1934 bufio = self.tp(rawio) 1935 self.assertRaises((OverflowError, MemoryError, ValueError), 1936 bufio.__init__, rawio, sys.maxsize) 1937 1938 def test_initialization(self): 1939 rawio = self.MockRawIO() 1940 bufio = self.tp(rawio) 1941 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1942 self.assertRaises(ValueError, bufio.write, b"def") 1943 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1944 self.assertRaises(ValueError, bufio.write, b"def") 1945 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1946 self.assertRaises(ValueError, bufio.write, b"def") 1947 1948 def test_garbage_collection(self): 1949 # C BufferedWriter objects are collected, and collecting them flushes 1950 # all data to disk. 1951 # The Python version has __del__, so it ends into gc.garbage instead 1952 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1953 with warnings_helper.check_warnings(('', ResourceWarning)): 1954 rawio = self.FileIO(os_helper.TESTFN, "w+b") 1955 f = self.tp(rawio) 1956 f.write(b"123xxx") 1957 f.x = f 1958 wr = weakref.ref(f) 1959 del f 1960 support.gc_collect() 1961 self.assertIsNone(wr(), wr) 1962 with self.open(os_helper.TESTFN, "rb") as f: 1963 self.assertEqual(f.read(), b"123xxx") 1964 1965 def test_args_error(self): 1966 # Issue #17275 1967 with self.assertRaisesRegex(TypeError, "BufferedWriter"): 1968 self.tp(io.BytesIO(), 1024, 1024, 1024) 1969 1970 1971class PyBufferedWriterTest(BufferedWriterTest): 1972 tp = pyio.BufferedWriter 1973 1974class BufferedRWPairTest(unittest.TestCase): 1975 1976 def test_constructor(self): 1977 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1978 self.assertFalse(pair.closed) 1979 1980 def test_uninitialized(self): 1981 pair = self.tp.__new__(self.tp) 1982 del pair 1983 pair = self.tp.__new__(self.tp) 1984 self.assertRaisesRegex((ValueError, AttributeError), 1985 'uninitialized|has no attribute', 1986 pair.read, 0) 1987 self.assertRaisesRegex((ValueError, AttributeError), 1988 'uninitialized|has no attribute', 1989 pair.write, b'') 1990 pair.__init__(self.MockRawIO(), self.MockRawIO()) 1991 self.assertEqual(pair.read(0), b'') 1992 self.assertEqual(pair.write(b''), 0) 1993 1994 def test_detach(self): 1995 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1996 self.assertRaises(self.UnsupportedOperation, pair.detach) 1997 1998 def test_constructor_max_buffer_size_removal(self): 1999 with self.assertRaises(TypeError): 2000 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 2001 2002 def test_constructor_with_not_readable(self): 2003 class NotReadable(MockRawIO): 2004 def readable(self): 2005 return False 2006 2007 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO()) 2008 2009 def test_constructor_with_not_writeable(self): 2010 class NotWriteable(MockRawIO): 2011 def writable(self): 2012 return False 2013 2014 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable()) 2015 2016 def test_read(self): 2017 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2018 2019 self.assertEqual(pair.read(3), b"abc") 2020 self.assertEqual(pair.read(1), b"d") 2021 self.assertEqual(pair.read(), b"ef") 2022 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 2023 self.assertEqual(pair.read(None), b"abc") 2024 2025 def test_readlines(self): 2026 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 2027 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 2028 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 2029 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 2030 2031 def test_read1(self): 2032 # .read1() is delegated to the underlying reader object, so this test 2033 # can be shallow. 2034 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2035 2036 self.assertEqual(pair.read1(3), b"abc") 2037 self.assertEqual(pair.read1(), b"def") 2038 2039 def test_readinto(self): 2040 for method in ("readinto", "readinto1"): 2041 with self.subTest(method): 2042 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2043 2044 data = byteslike(b'\0' * 5) 2045 self.assertEqual(getattr(pair, method)(data), 5) 2046 self.assertEqual(bytes(data), b"abcde") 2047 2048 def test_write(self): 2049 w = self.MockRawIO() 2050 pair = self.tp(self.MockRawIO(), w) 2051 2052 pair.write(b"abc") 2053 pair.flush() 2054 buffer = bytearray(b"def") 2055 pair.write(buffer) 2056 buffer[:] = b"***" # Overwrite our copy of the data 2057 pair.flush() 2058 self.assertEqual(w._write_stack, [b"abc", b"def"]) 2059 2060 def test_peek(self): 2061 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2062 2063 self.assertTrue(pair.peek(3).startswith(b"abc")) 2064 self.assertEqual(pair.read(3), b"abc") 2065 2066 def test_readable(self): 2067 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2068 self.assertTrue(pair.readable()) 2069 2070 def test_writeable(self): 2071 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2072 self.assertTrue(pair.writable()) 2073 2074 def test_seekable(self): 2075 # BufferedRWPairs are never seekable, even if their readers and writers 2076 # are. 2077 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2078 self.assertFalse(pair.seekable()) 2079 2080 # .flush() is delegated to the underlying writer object and has been 2081 # tested in the test_write method. 2082 2083 def test_close_and_closed(self): 2084 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2085 self.assertFalse(pair.closed) 2086 pair.close() 2087 self.assertTrue(pair.closed) 2088 2089 def test_reader_close_error_on_close(self): 2090 def reader_close(): 2091 reader_non_existing 2092 reader = self.MockRawIO() 2093 reader.close = reader_close 2094 writer = self.MockRawIO() 2095 pair = self.tp(reader, writer) 2096 with self.assertRaises(NameError) as err: 2097 pair.close() 2098 self.assertIn('reader_non_existing', str(err.exception)) 2099 self.assertTrue(pair.closed) 2100 self.assertFalse(reader.closed) 2101 self.assertTrue(writer.closed) 2102 2103 # Silence destructor error 2104 reader.close = lambda: None 2105 2106 def test_writer_close_error_on_close(self): 2107 def writer_close(): 2108 writer_non_existing 2109 reader = self.MockRawIO() 2110 writer = self.MockRawIO() 2111 writer.close = writer_close 2112 pair = self.tp(reader, writer) 2113 with self.assertRaises(NameError) as err: 2114 pair.close() 2115 self.assertIn('writer_non_existing', str(err.exception)) 2116 self.assertFalse(pair.closed) 2117 self.assertTrue(reader.closed) 2118 self.assertFalse(writer.closed) 2119 2120 # Silence destructor error 2121 writer.close = lambda: None 2122 writer = None 2123 2124 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception 2125 with support.catch_unraisable_exception(): 2126 # Ignore BufferedRWPair unraisable exception 2127 with support.catch_unraisable_exception(): 2128 pair = None 2129 support.gc_collect() 2130 support.gc_collect() 2131 2132 def test_reader_writer_close_error_on_close(self): 2133 def reader_close(): 2134 reader_non_existing 2135 def writer_close(): 2136 writer_non_existing 2137 reader = self.MockRawIO() 2138 reader.close = reader_close 2139 writer = self.MockRawIO() 2140 writer.close = writer_close 2141 pair = self.tp(reader, writer) 2142 with self.assertRaises(NameError) as err: 2143 pair.close() 2144 self.assertIn('reader_non_existing', str(err.exception)) 2145 self.assertIsInstance(err.exception.__context__, NameError) 2146 self.assertIn('writer_non_existing', str(err.exception.__context__)) 2147 self.assertFalse(pair.closed) 2148 self.assertFalse(reader.closed) 2149 self.assertFalse(writer.closed) 2150 2151 # Silence destructor error 2152 reader.close = lambda: None 2153 writer.close = lambda: None 2154 2155 def test_isatty(self): 2156 class SelectableIsAtty(MockRawIO): 2157 def __init__(self, isatty): 2158 MockRawIO.__init__(self) 2159 self._isatty = isatty 2160 2161 def isatty(self): 2162 return self._isatty 2163 2164 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 2165 self.assertFalse(pair.isatty()) 2166 2167 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 2168 self.assertTrue(pair.isatty()) 2169 2170 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 2171 self.assertTrue(pair.isatty()) 2172 2173 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 2174 self.assertTrue(pair.isatty()) 2175 2176 def test_weakref_clearing(self): 2177 brw = self.tp(self.MockRawIO(), self.MockRawIO()) 2178 ref = weakref.ref(brw) 2179 brw = None 2180 ref = None # Shouldn't segfault. 2181 2182class CBufferedRWPairTest(BufferedRWPairTest): 2183 tp = io.BufferedRWPair 2184 2185class PyBufferedRWPairTest(BufferedRWPairTest): 2186 tp = pyio.BufferedRWPair 2187 2188 2189class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 2190 read_mode = "rb+" 2191 write_mode = "wb+" 2192 2193 def test_constructor(self): 2194 BufferedReaderTest.test_constructor(self) 2195 BufferedWriterTest.test_constructor(self) 2196 2197 def test_uninitialized(self): 2198 BufferedReaderTest.test_uninitialized(self) 2199 BufferedWriterTest.test_uninitialized(self) 2200 2201 def test_read_and_write(self): 2202 raw = self.MockRawIO((b"asdf", b"ghjk")) 2203 rw = self.tp(raw, 8) 2204 2205 self.assertEqual(b"as", rw.read(2)) 2206 rw.write(b"ddd") 2207 rw.write(b"eee") 2208 self.assertFalse(raw._write_stack) # Buffer writes 2209 self.assertEqual(b"ghjk", rw.read()) 2210 self.assertEqual(b"dddeee", raw._write_stack[0]) 2211 2212 def test_seek_and_tell(self): 2213 raw = self.BytesIO(b"asdfghjkl") 2214 rw = self.tp(raw) 2215 2216 self.assertEqual(b"as", rw.read(2)) 2217 self.assertEqual(2, rw.tell()) 2218 rw.seek(0, 0) 2219 self.assertEqual(b"asdf", rw.read(4)) 2220 2221 rw.write(b"123f") 2222 rw.seek(0, 0) 2223 self.assertEqual(b"asdf123fl", rw.read()) 2224 self.assertEqual(9, rw.tell()) 2225 rw.seek(-4, 2) 2226 self.assertEqual(5, rw.tell()) 2227 rw.seek(2, 1) 2228 self.assertEqual(7, rw.tell()) 2229 self.assertEqual(b"fl", rw.read(11)) 2230 rw.flush() 2231 self.assertEqual(b"asdf123fl", raw.getvalue()) 2232 2233 self.assertRaises(TypeError, rw.seek, 0.0) 2234 2235 def check_flush_and_read(self, read_func): 2236 raw = self.BytesIO(b"abcdefghi") 2237 bufio = self.tp(raw) 2238 2239 self.assertEqual(b"ab", read_func(bufio, 2)) 2240 bufio.write(b"12") 2241 self.assertEqual(b"ef", read_func(bufio, 2)) 2242 self.assertEqual(6, bufio.tell()) 2243 bufio.flush() 2244 self.assertEqual(6, bufio.tell()) 2245 self.assertEqual(b"ghi", read_func(bufio)) 2246 raw.seek(0, 0) 2247 raw.write(b"XYZ") 2248 # flush() resets the read buffer 2249 bufio.flush() 2250 bufio.seek(0, 0) 2251 self.assertEqual(b"XYZ", read_func(bufio, 3)) 2252 2253 def test_flush_and_read(self): 2254 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 2255 2256 def test_flush_and_readinto(self): 2257 def _readinto(bufio, n=-1): 2258 b = bytearray(n if n >= 0 else 9999) 2259 n = bufio.readinto(b) 2260 return bytes(b[:n]) 2261 self.check_flush_and_read(_readinto) 2262 2263 def test_flush_and_peek(self): 2264 def _peek(bufio, n=-1): 2265 # This relies on the fact that the buffer can contain the whole 2266 # raw stream, otherwise peek() can return less. 2267 b = bufio.peek(n) 2268 if n != -1: 2269 b = b[:n] 2270 bufio.seek(len(b), 1) 2271 return b 2272 self.check_flush_and_read(_peek) 2273 2274 def test_flush_and_write(self): 2275 raw = self.BytesIO(b"abcdefghi") 2276 bufio = self.tp(raw) 2277 2278 bufio.write(b"123") 2279 bufio.flush() 2280 bufio.write(b"45") 2281 bufio.flush() 2282 bufio.seek(0, 0) 2283 self.assertEqual(b"12345fghi", raw.getvalue()) 2284 self.assertEqual(b"12345fghi", bufio.read()) 2285 2286 def test_threads(self): 2287 BufferedReaderTest.test_threads(self) 2288 BufferedWriterTest.test_threads(self) 2289 2290 def test_writes_and_peek(self): 2291 def _peek(bufio): 2292 bufio.peek(1) 2293 self.check_writes(_peek) 2294 def _peek(bufio): 2295 pos = bufio.tell() 2296 bufio.seek(-1, 1) 2297 bufio.peek(1) 2298 bufio.seek(pos, 0) 2299 self.check_writes(_peek) 2300 2301 def test_writes_and_reads(self): 2302 def _read(bufio): 2303 bufio.seek(-1, 1) 2304 bufio.read(1) 2305 self.check_writes(_read) 2306 2307 def test_writes_and_read1s(self): 2308 def _read1(bufio): 2309 bufio.seek(-1, 1) 2310 bufio.read1(1) 2311 self.check_writes(_read1) 2312 2313 def test_writes_and_readintos(self): 2314 def _read(bufio): 2315 bufio.seek(-1, 1) 2316 bufio.readinto(bytearray(1)) 2317 self.check_writes(_read) 2318 2319 def test_write_after_readahead(self): 2320 # Issue #6629: writing after the buffer was filled by readahead should 2321 # first rewind the raw stream. 2322 for overwrite_size in [1, 5]: 2323 raw = self.BytesIO(b"A" * 10) 2324 bufio = self.tp(raw, 4) 2325 # Trigger readahead 2326 self.assertEqual(bufio.read(1), b"A") 2327 self.assertEqual(bufio.tell(), 1) 2328 # Overwriting should rewind the raw stream if it needs so 2329 bufio.write(b"B" * overwrite_size) 2330 self.assertEqual(bufio.tell(), overwrite_size + 1) 2331 # If the write size was smaller than the buffer size, flush() and 2332 # check that rewind happens. 2333 bufio.flush() 2334 self.assertEqual(bufio.tell(), overwrite_size + 1) 2335 s = raw.getvalue() 2336 self.assertEqual(s, 2337 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 2338 2339 def test_write_rewind_write(self): 2340 # Various combinations of reading / writing / seeking backwards / writing again 2341 def mutate(bufio, pos1, pos2): 2342 assert pos2 >= pos1 2343 # Fill the buffer 2344 bufio.seek(pos1) 2345 bufio.read(pos2 - pos1) 2346 bufio.write(b'\x02') 2347 # This writes earlier than the previous write, but still inside 2348 # the buffer. 2349 bufio.seek(pos1) 2350 bufio.write(b'\x01') 2351 2352 b = b"\x80\x81\x82\x83\x84" 2353 for i in range(0, len(b)): 2354 for j in range(i, len(b)): 2355 raw = self.BytesIO(b) 2356 bufio = self.tp(raw, 100) 2357 mutate(bufio, i, j) 2358 bufio.flush() 2359 expected = bytearray(b) 2360 expected[j] = 2 2361 expected[i] = 1 2362 self.assertEqual(raw.getvalue(), expected, 2363 "failed result for i=%d, j=%d" % (i, j)) 2364 2365 def test_truncate_after_read_or_write(self): 2366 raw = self.BytesIO(b"A" * 10) 2367 bufio = self.tp(raw, 100) 2368 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 2369 self.assertEqual(bufio.truncate(), 2) 2370 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 2371 self.assertEqual(bufio.truncate(), 4) 2372 2373 def test_misbehaved_io(self): 2374 BufferedReaderTest.test_misbehaved_io(self) 2375 BufferedWriterTest.test_misbehaved_io(self) 2376 2377 def test_interleaved_read_write(self): 2378 # Test for issue #12213 2379 with self.BytesIO(b'abcdefgh') as raw: 2380 with self.tp(raw, 100) as f: 2381 f.write(b"1") 2382 self.assertEqual(f.read(1), b'b') 2383 f.write(b'2') 2384 self.assertEqual(f.read1(1), b'd') 2385 f.write(b'3') 2386 buf = bytearray(1) 2387 f.readinto(buf) 2388 self.assertEqual(buf, b'f') 2389 f.write(b'4') 2390 self.assertEqual(f.peek(1), b'h') 2391 f.flush() 2392 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 2393 2394 with self.BytesIO(b'abc') as raw: 2395 with self.tp(raw, 100) as f: 2396 self.assertEqual(f.read(1), b'a') 2397 f.write(b"2") 2398 self.assertEqual(f.read(1), b'c') 2399 f.flush() 2400 self.assertEqual(raw.getvalue(), b'a2c') 2401 2402 def test_interleaved_readline_write(self): 2403 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 2404 with self.tp(raw) as f: 2405 f.write(b'1') 2406 self.assertEqual(f.readline(), b'b\n') 2407 f.write(b'2') 2408 self.assertEqual(f.readline(), b'def\n') 2409 f.write(b'3') 2410 self.assertEqual(f.readline(), b'\n') 2411 f.flush() 2412 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 2413 2414 # You can't construct a BufferedRandom over a non-seekable stream. 2415 test_unseekable = None 2416 2417 # writable() returns True, so there's no point to test it over 2418 # a writable stream. 2419 test_truncate_on_read_only = None 2420 2421 2422class CBufferedRandomTest(BufferedRandomTest, SizeofTest): 2423 tp = io.BufferedRandom 2424 2425 @skip_if_sanitizer(memory=True, address=True, reason= "sanitizer defaults to crashing " 2426 "instead of returning NULL for malloc failure.") 2427 def test_constructor(self): 2428 BufferedRandomTest.test_constructor(self) 2429 # The allocation can succeed on 32-bit builds, e.g. with more 2430 # than 2 GiB RAM and a 64-bit kernel. 2431 if sys.maxsize > 0x7FFFFFFF: 2432 rawio = self.MockRawIO() 2433 bufio = self.tp(rawio) 2434 self.assertRaises((OverflowError, MemoryError, ValueError), 2435 bufio.__init__, rawio, sys.maxsize) 2436 2437 def test_garbage_collection(self): 2438 CBufferedReaderTest.test_garbage_collection(self) 2439 CBufferedWriterTest.test_garbage_collection(self) 2440 2441 def test_args_error(self): 2442 # Issue #17275 2443 with self.assertRaisesRegex(TypeError, "BufferedRandom"): 2444 self.tp(io.BytesIO(), 1024, 1024, 1024) 2445 2446 2447class PyBufferedRandomTest(BufferedRandomTest): 2448 tp = pyio.BufferedRandom 2449 2450 2451# To fully exercise seek/tell, the StatefulIncrementalDecoder has these 2452# properties: 2453# - A single output character can correspond to many bytes of input. 2454# - The number of input bytes to complete the character can be 2455# undetermined until the last input byte is received. 2456# - The number of input bytes can vary depending on previous input. 2457# - A single input byte can correspond to many characters of output. 2458# - The number of output characters can be undetermined until the 2459# last input byte is received. 2460# - The number of output characters can vary depending on previous input. 2461 2462class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 2463 """ 2464 For testing seek/tell behavior with a stateful, buffering decoder. 2465 2466 Input is a sequence of words. Words may be fixed-length (length set 2467 by input) or variable-length (period-terminated). In variable-length 2468 mode, extra periods are ignored. Possible words are: 2469 - 'i' followed by a number sets the input length, I (maximum 99). 2470 When I is set to 0, words are space-terminated. 2471 - 'o' followed by a number sets the output length, O (maximum 99). 2472 - Any other word is converted into a word followed by a period on 2473 the output. The output word consists of the input word truncated 2474 or padded out with hyphens to make its length equal to O. If O 2475 is 0, the word is output verbatim without truncating or padding. 2476 I and O are initially set to 1. When I changes, any buffered input is 2477 re-scanned according to the new I. EOF also terminates the last word. 2478 """ 2479 2480 def __init__(self, errors='strict'): 2481 codecs.IncrementalDecoder.__init__(self, errors) 2482 self.reset() 2483 2484 def __repr__(self): 2485 return '<SID %x>' % id(self) 2486 2487 def reset(self): 2488 self.i = 1 2489 self.o = 1 2490 self.buffer = bytearray() 2491 2492 def getstate(self): 2493 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 2494 return bytes(self.buffer), i*100 + o 2495 2496 def setstate(self, state): 2497 buffer, io = state 2498 self.buffer = bytearray(buffer) 2499 i, o = divmod(io, 100) 2500 self.i, self.o = i ^ 1, o ^ 1 2501 2502 def decode(self, input, final=False): 2503 output = '' 2504 for b in input: 2505 if self.i == 0: # variable-length, terminated with period 2506 if b == ord('.'): 2507 if self.buffer: 2508 output += self.process_word() 2509 else: 2510 self.buffer.append(b) 2511 else: # fixed-length, terminate after self.i bytes 2512 self.buffer.append(b) 2513 if len(self.buffer) == self.i: 2514 output += self.process_word() 2515 if final and self.buffer: # EOF terminates the last word 2516 output += self.process_word() 2517 return output 2518 2519 def process_word(self): 2520 output = '' 2521 if self.buffer[0] == ord('i'): 2522 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 2523 elif self.buffer[0] == ord('o'): 2524 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 2525 else: 2526 output = self.buffer.decode('ascii') 2527 if len(output) < self.o: 2528 output += '-'*self.o # pad out with hyphens 2529 if self.o: 2530 output = output[:self.o] # truncate to output length 2531 output += '.' 2532 self.buffer = bytearray() 2533 return output 2534 2535 codecEnabled = False 2536 2537 2538# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak 2539# when registering codecs and cleanup functions. 2540def lookupTestDecoder(name): 2541 if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder': 2542 latin1 = codecs.lookup('latin-1') 2543 return codecs.CodecInfo( 2544 name='test_decoder', encode=latin1.encode, decode=None, 2545 incrementalencoder=None, 2546 streamreader=None, streamwriter=None, 2547 incrementaldecoder=StatefulIncrementalDecoder) 2548 2549 2550class StatefulIncrementalDecoderTest(unittest.TestCase): 2551 """ 2552 Make sure the StatefulIncrementalDecoder actually works. 2553 """ 2554 2555 test_cases = [ 2556 # I=1, O=1 (fixed-length input == fixed-length output) 2557 (b'abcd', False, 'a.b.c.d.'), 2558 # I=0, O=0 (variable-length input, variable-length output) 2559 (b'oiabcd', True, 'abcd.'), 2560 # I=0, O=0 (should ignore extra periods) 2561 (b'oi...abcd...', True, 'abcd.'), 2562 # I=0, O=6 (variable-length input, fixed-length output) 2563 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 2564 # I=2, O=6 (fixed-length input < fixed-length output) 2565 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 2566 # I=6, O=3 (fixed-length input > fixed-length output) 2567 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 2568 # I=0, then 3; O=29, then 15 (with longer output) 2569 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 2570 'a----------------------------.' + 2571 'b----------------------------.' + 2572 'cde--------------------------.' + 2573 'abcdefghijabcde.' + 2574 'a.b------------.' + 2575 '.c.------------.' + 2576 'd.e------------.' + 2577 'k--------------.' + 2578 'l--------------.' + 2579 'm--------------.') 2580 ] 2581 2582 def test_decoder(self): 2583 # Try a few one-shot test cases. 2584 for input, eof, output in self.test_cases: 2585 d = StatefulIncrementalDecoder() 2586 self.assertEqual(d.decode(input, eof), output) 2587 2588 # Also test an unfinished decode, followed by forcing EOF. 2589 d = StatefulIncrementalDecoder() 2590 self.assertEqual(d.decode(b'oiabcd'), '') 2591 self.assertEqual(d.decode(b'', 1), 'abcd.') 2592 2593class TextIOWrapperTest(unittest.TestCase): 2594 2595 def setUp(self): 2596 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 2597 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 2598 os_helper.unlink(os_helper.TESTFN) 2599 codecs.register(lookupTestDecoder) 2600 self.addCleanup(codecs.unregister, lookupTestDecoder) 2601 2602 def tearDown(self): 2603 os_helper.unlink(os_helper.TESTFN) 2604 2605 def test_constructor(self): 2606 r = self.BytesIO(b"\xc3\xa9\n\n") 2607 b = self.BufferedReader(r, 1000) 2608 t = self.TextIOWrapper(b, encoding="utf-8") 2609 t.__init__(b, encoding="latin-1", newline="\r\n") 2610 self.assertEqual(t.encoding, "latin-1") 2611 self.assertEqual(t.line_buffering, False) 2612 t.__init__(b, encoding="utf-8", line_buffering=True) 2613 self.assertEqual(t.encoding, "utf-8") 2614 self.assertEqual(t.line_buffering, True) 2615 self.assertEqual("\xe9\n", t.readline()) 2616 self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42) 2617 self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy') 2618 2619 def test_uninitialized(self): 2620 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2621 del t 2622 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2623 self.assertRaises(Exception, repr, t) 2624 self.assertRaisesRegex((ValueError, AttributeError), 2625 'uninitialized|has no attribute', 2626 t.read, 0) 2627 t.__init__(self.MockRawIO(), encoding="utf-8") 2628 self.assertEqual(t.read(0), '') 2629 2630 def test_non_text_encoding_codecs_are_rejected(self): 2631 # Ensure the constructor complains if passed a codec that isn't 2632 # marked as a text encoding 2633 # http://bugs.python.org/issue20404 2634 r = self.BytesIO() 2635 b = self.BufferedWriter(r) 2636 with self.assertRaisesRegex(LookupError, "is not a text encoding"): 2637 self.TextIOWrapper(b, encoding="hex") 2638 2639 def test_detach(self): 2640 r = self.BytesIO() 2641 b = self.BufferedWriter(r) 2642 t = self.TextIOWrapper(b, encoding="ascii") 2643 self.assertIs(t.detach(), b) 2644 2645 t = self.TextIOWrapper(b, encoding="ascii") 2646 t.write("howdy") 2647 self.assertFalse(r.getvalue()) 2648 t.detach() 2649 self.assertEqual(r.getvalue(), b"howdy") 2650 self.assertRaises(ValueError, t.detach) 2651 2652 # Operations independent of the detached stream should still work 2653 repr(t) 2654 self.assertEqual(t.encoding, "ascii") 2655 self.assertEqual(t.errors, "strict") 2656 self.assertFalse(t.line_buffering) 2657 self.assertFalse(t.write_through) 2658 2659 def test_repr(self): 2660 raw = self.BytesIO("hello".encode("utf-8")) 2661 b = self.BufferedReader(raw) 2662 t = self.TextIOWrapper(b, encoding="utf-8") 2663 modname = self.TextIOWrapper.__module__ 2664 self.assertRegex(repr(t), 2665 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname) 2666 raw.name = "dummy" 2667 self.assertRegex(repr(t), 2668 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 2669 t.mode = "r" 2670 self.assertRegex(repr(t), 2671 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) 2672 raw.name = b"dummy" 2673 self.assertRegex(repr(t), 2674 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) 2675 2676 t.buffer.detach() 2677 repr(t) # Should not raise an exception 2678 2679 def test_recursive_repr(self): 2680 # Issue #25455 2681 raw = self.BytesIO() 2682 t = self.TextIOWrapper(raw, encoding="utf-8") 2683 with support.swap_attr(raw, 'name', t): 2684 try: 2685 repr(t) # Should not crash 2686 except RuntimeError: 2687 pass 2688 2689 def test_line_buffering(self): 2690 r = self.BytesIO() 2691 b = self.BufferedWriter(r, 1000) 2692 t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True) 2693 t.write("X") 2694 self.assertEqual(r.getvalue(), b"") # No flush happened 2695 t.write("Y\nZ") 2696 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 2697 t.write("A\rB") 2698 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 2699 2700 def test_reconfigure_line_buffering(self): 2701 r = self.BytesIO() 2702 b = self.BufferedWriter(r, 1000) 2703 t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False) 2704 t.write("AB\nC") 2705 self.assertEqual(r.getvalue(), b"") 2706 2707 t.reconfigure(line_buffering=True) # implicit flush 2708 self.assertEqual(r.getvalue(), b"AB\nC") 2709 t.write("DEF\nG") 2710 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2711 t.write("H") 2712 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2713 t.reconfigure(line_buffering=False) # implicit flush 2714 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2715 t.write("IJ") 2716 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2717 2718 # Keeping default value 2719 t.reconfigure() 2720 t.reconfigure(line_buffering=None) 2721 self.assertEqual(t.line_buffering, False) 2722 t.reconfigure(line_buffering=True) 2723 t.reconfigure() 2724 t.reconfigure(line_buffering=None) 2725 self.assertEqual(t.line_buffering, True) 2726 2727 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2728 def test_default_encoding(self): 2729 old_environ = dict(os.environ) 2730 try: 2731 # try to get a user preferred encoding different than the current 2732 # locale encoding to check that TextIOWrapper() uses the current 2733 # locale encoding and not the user preferred encoding 2734 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'): 2735 if key in os.environ: 2736 del os.environ[key] 2737 2738 current_locale_encoding = locale.getencoding() 2739 b = self.BytesIO() 2740 with warnings.catch_warnings(): 2741 warnings.simplefilter("ignore", EncodingWarning) 2742 t = self.TextIOWrapper(b) 2743 self.assertEqual(t.encoding, current_locale_encoding) 2744 finally: 2745 os.environ.clear() 2746 os.environ.update(old_environ) 2747 2748 def test_encoding(self): 2749 # Check the encoding attribute is always set, and valid 2750 b = self.BytesIO() 2751 t = self.TextIOWrapper(b, encoding="utf-8") 2752 self.assertEqual(t.encoding, "utf-8") 2753 with warnings.catch_warnings(): 2754 warnings.simplefilter("ignore", EncodingWarning) 2755 t = self.TextIOWrapper(b) 2756 self.assertIsNotNone(t.encoding) 2757 codecs.lookup(t.encoding) 2758 2759 def test_encoding_errors_reading(self): 2760 # (1) default 2761 b = self.BytesIO(b"abc\n\xff\n") 2762 t = self.TextIOWrapper(b, encoding="ascii") 2763 self.assertRaises(UnicodeError, t.read) 2764 # (2) explicit strict 2765 b = self.BytesIO(b"abc\n\xff\n") 2766 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2767 self.assertRaises(UnicodeError, t.read) 2768 # (3) ignore 2769 b = self.BytesIO(b"abc\n\xff\n") 2770 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 2771 self.assertEqual(t.read(), "abc\n\n") 2772 # (4) replace 2773 b = self.BytesIO(b"abc\n\xff\n") 2774 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 2775 self.assertEqual(t.read(), "abc\n\ufffd\n") 2776 2777 def test_encoding_errors_writing(self): 2778 # (1) default 2779 b = self.BytesIO() 2780 t = self.TextIOWrapper(b, encoding="ascii") 2781 self.assertRaises(UnicodeError, t.write, "\xff") 2782 # (2) explicit strict 2783 b = self.BytesIO() 2784 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2785 self.assertRaises(UnicodeError, t.write, "\xff") 2786 # (3) ignore 2787 b = self.BytesIO() 2788 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 2789 newline="\n") 2790 t.write("abc\xffdef\n") 2791 t.flush() 2792 self.assertEqual(b.getvalue(), b"abcdef\n") 2793 # (4) replace 2794 b = self.BytesIO() 2795 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 2796 newline="\n") 2797 t.write("abc\xffdef\n") 2798 t.flush() 2799 self.assertEqual(b.getvalue(), b"abc?def\n") 2800 2801 def test_newlines(self): 2802 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 2803 2804 tests = [ 2805 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 2806 [ '', input_lines ], 2807 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 2808 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 2809 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 2810 ] 2811 encodings = ( 2812 'utf-8', 'latin-1', 2813 'utf-16', 'utf-16-le', 'utf-16-be', 2814 'utf-32', 'utf-32-le', 'utf-32-be', 2815 ) 2816 2817 # Try a range of buffer sizes to test the case where \r is the last 2818 # character in TextIOWrapper._pending_line. 2819 for encoding in encodings: 2820 # XXX: str.encode() should return bytes 2821 data = bytes(''.join(input_lines).encode(encoding)) 2822 for do_reads in (False, True): 2823 for bufsize in range(1, 10): 2824 for newline, exp_lines in tests: 2825 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 2826 textio = self.TextIOWrapper(bufio, newline=newline, 2827 encoding=encoding) 2828 if do_reads: 2829 got_lines = [] 2830 while True: 2831 c2 = textio.read(2) 2832 if c2 == '': 2833 break 2834 self.assertEqual(len(c2), 2) 2835 got_lines.append(c2 + textio.readline()) 2836 else: 2837 got_lines = list(textio) 2838 2839 for got_line, exp_line in zip(got_lines, exp_lines): 2840 self.assertEqual(got_line, exp_line) 2841 self.assertEqual(len(got_lines), len(exp_lines)) 2842 2843 def test_newlines_input(self): 2844 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 2845 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2846 for newline, expected in [ 2847 (None, normalized.decode("ascii").splitlines(keepends=True)), 2848 ("", testdata.decode("ascii").splitlines(keepends=True)), 2849 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2850 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2851 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 2852 ]: 2853 buf = self.BytesIO(testdata) 2854 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2855 self.assertEqual(txt.readlines(), expected) 2856 txt.seek(0) 2857 self.assertEqual(txt.read(), "".join(expected)) 2858 2859 def test_newlines_output(self): 2860 testdict = { 2861 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2862 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2863 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2864 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2865 } 2866 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2867 for newline, expected in tests: 2868 buf = self.BytesIO() 2869 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2870 txt.write("AAA\nB") 2871 txt.write("BB\nCCC\n") 2872 txt.write("X\rY\r\nZ") 2873 txt.flush() 2874 self.assertEqual(buf.closed, False) 2875 self.assertEqual(buf.getvalue(), expected) 2876 2877 def test_destructor(self): 2878 l = [] 2879 base = self.BytesIO 2880 class MyBytesIO(base): 2881 def close(self): 2882 l.append(self.getvalue()) 2883 base.close(self) 2884 b = MyBytesIO() 2885 t = self.TextIOWrapper(b, encoding="ascii") 2886 t.write("abc") 2887 del t 2888 support.gc_collect() 2889 self.assertEqual([b"abc"], l) 2890 2891 def test_override_destructor(self): 2892 record = [] 2893 class MyTextIO(self.TextIOWrapper): 2894 def __del__(self): 2895 record.append(1) 2896 try: 2897 f = super().__del__ 2898 except AttributeError: 2899 pass 2900 else: 2901 f() 2902 def close(self): 2903 record.append(2) 2904 super().close() 2905 def flush(self): 2906 record.append(3) 2907 super().flush() 2908 b = self.BytesIO() 2909 t = MyTextIO(b, encoding="ascii") 2910 del t 2911 support.gc_collect() 2912 self.assertEqual(record, [1, 2, 3]) 2913 2914 def test_error_through_destructor(self): 2915 # Test that the exception state is not modified by a destructor, 2916 # even if close() fails. 2917 rawio = self.CloseFailureIO() 2918 with support.catch_unraisable_exception() as cm: 2919 with self.assertRaises(AttributeError): 2920 self.TextIOWrapper(rawio, encoding="utf-8").xyzzy 2921 2922 if not IOBASE_EMITS_UNRAISABLE: 2923 self.assertIsNone(cm.unraisable) 2924 elif cm.unraisable is not None: 2925 self.assertEqual(cm.unraisable.exc_type, OSError) 2926 2927 # Systematic tests of the text I/O API 2928 2929 def test_basic_io(self): 2930 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2931 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": 2932 f = self.open(os_helper.TESTFN, "w+", encoding=enc) 2933 f._CHUNK_SIZE = chunksize 2934 self.assertEqual(f.write("abc"), 3) 2935 f.close() 2936 f = self.open(os_helper.TESTFN, "r+", encoding=enc) 2937 f._CHUNK_SIZE = chunksize 2938 self.assertEqual(f.tell(), 0) 2939 self.assertEqual(f.read(), "abc") 2940 cookie = f.tell() 2941 self.assertEqual(f.seek(0), 0) 2942 self.assertEqual(f.read(None), "abc") 2943 f.seek(0) 2944 self.assertEqual(f.read(2), "ab") 2945 self.assertEqual(f.read(1), "c") 2946 self.assertEqual(f.read(1), "") 2947 self.assertEqual(f.read(), "") 2948 self.assertEqual(f.tell(), cookie) 2949 self.assertEqual(f.seek(0), 0) 2950 self.assertEqual(f.seek(0, 2), cookie) 2951 self.assertEqual(f.write("def"), 3) 2952 self.assertEqual(f.seek(cookie), cookie) 2953 self.assertEqual(f.read(), "def") 2954 if enc.startswith("utf"): 2955 self.multi_line_test(f, enc) 2956 f.close() 2957 2958 def multi_line_test(self, f, enc): 2959 f.seek(0) 2960 f.truncate() 2961 sample = "s\xff\u0fff\uffff" 2962 wlines = [] 2963 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2964 chars = [] 2965 for i in range(size): 2966 chars.append(sample[i % len(sample)]) 2967 line = "".join(chars) + "\n" 2968 wlines.append((f.tell(), line)) 2969 f.write(line) 2970 f.seek(0) 2971 rlines = [] 2972 while True: 2973 pos = f.tell() 2974 line = f.readline() 2975 if not line: 2976 break 2977 rlines.append((pos, line)) 2978 self.assertEqual(rlines, wlines) 2979 2980 def test_telling(self): 2981 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8") 2982 p0 = f.tell() 2983 f.write("\xff\n") 2984 p1 = f.tell() 2985 f.write("\xff\n") 2986 p2 = f.tell() 2987 f.seek(0) 2988 self.assertEqual(f.tell(), p0) 2989 self.assertEqual(f.readline(), "\xff\n") 2990 self.assertEqual(f.tell(), p1) 2991 self.assertEqual(f.readline(), "\xff\n") 2992 self.assertEqual(f.tell(), p2) 2993 f.seek(0) 2994 for line in f: 2995 self.assertEqual(line, "\xff\n") 2996 self.assertRaises(OSError, f.tell) 2997 self.assertEqual(f.tell(), p2) 2998 f.close() 2999 3000 def test_seeking(self): 3001 chunk_size = _default_chunk_size() 3002 prefix_size = chunk_size - 2 3003 u_prefix = "a" * prefix_size 3004 prefix = bytes(u_prefix.encode("utf-8")) 3005 self.assertEqual(len(u_prefix), len(prefix)) 3006 u_suffix = "\u8888\n" 3007 suffix = bytes(u_suffix.encode("utf-8")) 3008 line = prefix + suffix 3009 with self.open(os_helper.TESTFN, "wb") as f: 3010 f.write(line*2) 3011 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 3012 s = f.read(prefix_size) 3013 self.assertEqual(s, str(prefix, "ascii")) 3014 self.assertEqual(f.tell(), prefix_size) 3015 self.assertEqual(f.readline(), u_suffix) 3016 3017 def test_seeking_too(self): 3018 # Regression test for a specific bug 3019 data = b'\xe0\xbf\xbf\n' 3020 with self.open(os_helper.TESTFN, "wb") as f: 3021 f.write(data) 3022 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: 3023 f._CHUNK_SIZE # Just test that it exists 3024 f._CHUNK_SIZE = 2 3025 f.readline() 3026 f.tell() 3027 3028 def test_seek_and_tell(self): 3029 #Test seek/tell using the StatefulIncrementalDecoder. 3030 # Make test faster by doing smaller seeks 3031 CHUNK_SIZE = 128 3032 3033 def test_seek_and_tell_with_data(data, min_pos=0): 3034 """Tell/seek to various points within a data stream and ensure 3035 that the decoded data returned by read() is consistent.""" 3036 f = self.open(os_helper.TESTFN, 'wb') 3037 f.write(data) 3038 f.close() 3039 f = self.open(os_helper.TESTFN, encoding='test_decoder') 3040 f._CHUNK_SIZE = CHUNK_SIZE 3041 decoded = f.read() 3042 f.close() 3043 3044 for i in range(min_pos, len(decoded) + 1): # seek positions 3045 for j in [1, 5, len(decoded) - i]: # read lengths 3046 f = self.open(os_helper.TESTFN, encoding='test_decoder') 3047 self.assertEqual(f.read(i), decoded[:i]) 3048 cookie = f.tell() 3049 self.assertEqual(f.read(j), decoded[i:i + j]) 3050 f.seek(cookie) 3051 self.assertEqual(f.read(), decoded[i:]) 3052 f.close() 3053 3054 # Enable the test decoder. 3055 StatefulIncrementalDecoder.codecEnabled = 1 3056 3057 # Run the tests. 3058 try: 3059 # Try each test case. 3060 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 3061 test_seek_and_tell_with_data(input) 3062 3063 # Position each test case so that it crosses a chunk boundary. 3064 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 3065 offset = CHUNK_SIZE - len(input)//2 3066 prefix = b'.'*offset 3067 # Don't bother seeking into the prefix (takes too long). 3068 min_pos = offset*2 3069 test_seek_and_tell_with_data(prefix + input, min_pos) 3070 3071 # Ensure our test decoder won't interfere with subsequent tests. 3072 finally: 3073 StatefulIncrementalDecoder.codecEnabled = 0 3074 3075 def test_multibyte_seek_and_tell(self): 3076 f = self.open(os_helper.TESTFN, "w", encoding="euc_jp") 3077 f.write("AB\n\u3046\u3048\n") 3078 f.close() 3079 3080 f = self.open(os_helper.TESTFN, "r", encoding="euc_jp") 3081 self.assertEqual(f.readline(), "AB\n") 3082 p0 = f.tell() 3083 self.assertEqual(f.readline(), "\u3046\u3048\n") 3084 p1 = f.tell() 3085 f.seek(p0) 3086 self.assertEqual(f.readline(), "\u3046\u3048\n") 3087 self.assertEqual(f.tell(), p1) 3088 f.close() 3089 3090 def test_seek_with_encoder_state(self): 3091 f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004") 3092 f.write("\u00e6\u0300") 3093 p0 = f.tell() 3094 f.write("\u00e6") 3095 f.seek(p0) 3096 f.write("\u0300") 3097 f.close() 3098 3099 f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004") 3100 self.assertEqual(f.readline(), "\u00e6\u0300\u0300") 3101 f.close() 3102 3103 def test_encoded_writes(self): 3104 data = "1234567890" 3105 tests = ("utf-16", 3106 "utf-16-le", 3107 "utf-16-be", 3108 "utf-32", 3109 "utf-32-le", 3110 "utf-32-be") 3111 for encoding in tests: 3112 buf = self.BytesIO() 3113 f = self.TextIOWrapper(buf, encoding=encoding) 3114 # Check if the BOM is written only once (see issue1753). 3115 f.write(data) 3116 f.write(data) 3117 f.seek(0) 3118 self.assertEqual(f.read(), data * 2) 3119 f.seek(0) 3120 self.assertEqual(f.read(), data * 2) 3121 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 3122 3123 def test_unreadable(self): 3124 class UnReadable(self.BytesIO): 3125 def readable(self): 3126 return False 3127 txt = self.TextIOWrapper(UnReadable(), encoding="utf-8") 3128 self.assertRaises(OSError, txt.read) 3129 3130 def test_read_one_by_one(self): 3131 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8") 3132 reads = "" 3133 while True: 3134 c = txt.read(1) 3135 if not c: 3136 break 3137 reads += c 3138 self.assertEqual(reads, "AA\nBB") 3139 3140 def test_readlines(self): 3141 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8") 3142 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 3143 txt.seek(0) 3144 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 3145 txt.seek(0) 3146 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 3147 3148 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 3149 def test_read_by_chunk(self): 3150 # make sure "\r\n" straddles 128 char boundary. 3151 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8") 3152 reads = "" 3153 while True: 3154 c = txt.read(128) 3155 if not c: 3156 break 3157 reads += c 3158 self.assertEqual(reads, "A"*127+"\nB") 3159 3160 def test_writelines(self): 3161 l = ['ab', 'cd', 'ef'] 3162 buf = self.BytesIO() 3163 txt = self.TextIOWrapper(buf, encoding="utf-8") 3164 txt.writelines(l) 3165 txt.flush() 3166 self.assertEqual(buf.getvalue(), b'abcdef') 3167 3168 def test_writelines_userlist(self): 3169 l = UserList(['ab', 'cd', 'ef']) 3170 buf = self.BytesIO() 3171 txt = self.TextIOWrapper(buf, encoding="utf-8") 3172 txt.writelines(l) 3173 txt.flush() 3174 self.assertEqual(buf.getvalue(), b'abcdef') 3175 3176 def test_writelines_error(self): 3177 txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8") 3178 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 3179 self.assertRaises(TypeError, txt.writelines, None) 3180 self.assertRaises(TypeError, txt.writelines, b'abc') 3181 3182 def test_issue1395_1(self): 3183 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3184 3185 # read one char at a time 3186 reads = "" 3187 while True: 3188 c = txt.read(1) 3189 if not c: 3190 break 3191 reads += c 3192 self.assertEqual(reads, self.normalized) 3193 3194 def test_issue1395_2(self): 3195 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3196 txt._CHUNK_SIZE = 4 3197 3198 reads = "" 3199 while True: 3200 c = txt.read(4) 3201 if not c: 3202 break 3203 reads += c 3204 self.assertEqual(reads, self.normalized) 3205 3206 def test_issue1395_3(self): 3207 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3208 txt._CHUNK_SIZE = 4 3209 3210 reads = txt.read(4) 3211 reads += txt.read(4) 3212 reads += txt.readline() 3213 reads += txt.readline() 3214 reads += txt.readline() 3215 self.assertEqual(reads, self.normalized) 3216 3217 def test_issue1395_4(self): 3218 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3219 txt._CHUNK_SIZE = 4 3220 3221 reads = txt.read(4) 3222 reads += txt.read() 3223 self.assertEqual(reads, self.normalized) 3224 3225 def test_issue1395_5(self): 3226 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3227 txt._CHUNK_SIZE = 4 3228 3229 reads = txt.read(4) 3230 pos = txt.tell() 3231 txt.seek(0) 3232 txt.seek(pos) 3233 self.assertEqual(txt.read(4), "BBB\n") 3234 3235 def test_issue2282(self): 3236 buffer = self.BytesIO(self.testdata) 3237 txt = self.TextIOWrapper(buffer, encoding="ascii") 3238 3239 self.assertEqual(buffer.seekable(), txt.seekable()) 3240 3241 def test_append_bom(self): 3242 # The BOM is not written again when appending to a non-empty file 3243 filename = os_helper.TESTFN 3244 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3245 with self.open(filename, 'w', encoding=charset) as f: 3246 f.write('aaa') 3247 pos = f.tell() 3248 with self.open(filename, 'rb') as f: 3249 self.assertEqual(f.read(), 'aaa'.encode(charset)) 3250 3251 with self.open(filename, 'a', encoding=charset) as f: 3252 f.write('xxx') 3253 with self.open(filename, 'rb') as f: 3254 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3255 3256 def test_seek_bom(self): 3257 # Same test, but when seeking manually 3258 filename = os_helper.TESTFN 3259 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3260 with self.open(filename, 'w', encoding=charset) as f: 3261 f.write('aaa') 3262 pos = f.tell() 3263 with self.open(filename, 'r+', encoding=charset) as f: 3264 f.seek(pos) 3265 f.write('zzz') 3266 f.seek(0) 3267 f.write('bbb') 3268 with self.open(filename, 'rb') as f: 3269 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 3270 3271 def test_seek_append_bom(self): 3272 # Same test, but first seek to the start and then to the end 3273 filename = os_helper.TESTFN 3274 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3275 with self.open(filename, 'w', encoding=charset) as f: 3276 f.write('aaa') 3277 with self.open(filename, 'a', encoding=charset) as f: 3278 f.seek(0) 3279 f.seek(0, self.SEEK_END) 3280 f.write('xxx') 3281 with self.open(filename, 'rb') as f: 3282 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3283 3284 def test_errors_property(self): 3285 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 3286 self.assertEqual(f.errors, "strict") 3287 with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f: 3288 self.assertEqual(f.errors, "replace") 3289 3290 @support.no_tracing 3291 @threading_helper.requires_working_threading() 3292 def test_threads_write(self): 3293 # Issue6750: concurrent writes could duplicate data 3294 event = threading.Event() 3295 with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f: 3296 def run(n): 3297 text = "Thread%03d\n" % n 3298 event.wait() 3299 f.write(text) 3300 threads = [threading.Thread(target=run, args=(x,)) 3301 for x in range(20)] 3302 with threading_helper.start_threads(threads, event.set): 3303 time.sleep(0.02) 3304 with self.open(os_helper.TESTFN, encoding="utf-8") as f: 3305 content = f.read() 3306 for n in range(20): 3307 self.assertEqual(content.count("Thread%03d\n" % n), 1) 3308 3309 def test_flush_error_on_close(self): 3310 # Test that text file is closed despite failed flush 3311 # and that flush() is called before file closed. 3312 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3313 closed = [] 3314 def bad_flush(): 3315 closed[:] = [txt.closed, txt.buffer.closed] 3316 raise OSError() 3317 txt.flush = bad_flush 3318 self.assertRaises(OSError, txt.close) # exception not swallowed 3319 self.assertTrue(txt.closed) 3320 self.assertTrue(txt.buffer.closed) 3321 self.assertTrue(closed) # flush() called 3322 self.assertFalse(closed[0]) # flush() called before file closed 3323 self.assertFalse(closed[1]) 3324 txt.flush = lambda: None # break reference loop 3325 3326 def test_close_error_on_close(self): 3327 buffer = self.BytesIO(self.testdata) 3328 def bad_flush(): 3329 raise OSError('flush') 3330 def bad_close(): 3331 raise OSError('close') 3332 buffer.close = bad_close 3333 txt = self.TextIOWrapper(buffer, encoding="ascii") 3334 txt.flush = bad_flush 3335 with self.assertRaises(OSError) as err: # exception not swallowed 3336 txt.close() 3337 self.assertEqual(err.exception.args, ('close',)) 3338 self.assertIsInstance(err.exception.__context__, OSError) 3339 self.assertEqual(err.exception.__context__.args, ('flush',)) 3340 self.assertFalse(txt.closed) 3341 3342 # Silence destructor error 3343 buffer.close = lambda: None 3344 txt.flush = lambda: None 3345 3346 def test_nonnormalized_close_error_on_close(self): 3347 # Issue #21677 3348 buffer = self.BytesIO(self.testdata) 3349 def bad_flush(): 3350 raise non_existing_flush 3351 def bad_close(): 3352 raise non_existing_close 3353 buffer.close = bad_close 3354 txt = self.TextIOWrapper(buffer, encoding="ascii") 3355 txt.flush = bad_flush 3356 with self.assertRaises(NameError) as err: # exception not swallowed 3357 txt.close() 3358 self.assertIn('non_existing_close', str(err.exception)) 3359 self.assertIsInstance(err.exception.__context__, NameError) 3360 self.assertIn('non_existing_flush', str(err.exception.__context__)) 3361 self.assertFalse(txt.closed) 3362 3363 # Silence destructor error 3364 buffer.close = lambda: None 3365 txt.flush = lambda: None 3366 3367 def test_multi_close(self): 3368 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3369 txt.close() 3370 txt.close() 3371 txt.close() 3372 self.assertRaises(ValueError, txt.flush) 3373 3374 def test_unseekable(self): 3375 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8") 3376 self.assertRaises(self.UnsupportedOperation, txt.tell) 3377 self.assertRaises(self.UnsupportedOperation, txt.seek, 0) 3378 3379 def test_readonly_attributes(self): 3380 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3381 buf = self.BytesIO(self.testdata) 3382 with self.assertRaises(AttributeError): 3383 txt.buffer = buf 3384 3385 def test_rawio(self): 3386 # Issue #12591: TextIOWrapper must work with raw I/O objects, so 3387 # that subprocess.Popen() can have the required unbuffered 3388 # semantics with universal_newlines=True. 3389 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3390 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3391 # Reads 3392 self.assertEqual(txt.read(4), 'abcd') 3393 self.assertEqual(txt.readline(), 'efghi\n') 3394 self.assertEqual(list(txt), ['jkl\n', 'opq\n']) 3395 3396 def test_rawio_write_through(self): 3397 # Issue #12591: with write_through=True, writes don't need a flush 3398 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3399 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', 3400 write_through=True) 3401 txt.write('1') 3402 txt.write('23\n4') 3403 txt.write('5') 3404 self.assertEqual(b''.join(raw._write_stack), b'123\n45') 3405 3406 def test_bufio_write_through(self): 3407 # Issue #21396: write_through=True doesn't force a flush() 3408 # on the underlying binary buffered object. 3409 flush_called, write_called = [], [] 3410 class BufferedWriter(self.BufferedWriter): 3411 def flush(self, *args, **kwargs): 3412 flush_called.append(True) 3413 return super().flush(*args, **kwargs) 3414 def write(self, *args, **kwargs): 3415 write_called.append(True) 3416 return super().write(*args, **kwargs) 3417 3418 rawio = self.BytesIO() 3419 data = b"a" 3420 bufio = BufferedWriter(rawio, len(data)*2) 3421 textio = self.TextIOWrapper(bufio, encoding='ascii', 3422 write_through=True) 3423 # write to the buffered io but don't overflow the buffer 3424 text = data.decode('ascii') 3425 textio.write(text) 3426 3427 # buffer.flush is not called with write_through=True 3428 self.assertFalse(flush_called) 3429 # buffer.write *is* called with write_through=True 3430 self.assertTrue(write_called) 3431 self.assertEqual(rawio.getvalue(), b"") # no flush 3432 3433 write_called = [] # reset 3434 textio.write(text * 10) # total content is larger than bufio buffer 3435 self.assertTrue(write_called) 3436 self.assertEqual(rawio.getvalue(), data * 11) # all flushed 3437 3438 def test_reconfigure_write_through(self): 3439 raw = self.MockRawIO([]) 3440 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3441 t.write('1') 3442 t.reconfigure(write_through=True) # implied flush 3443 self.assertEqual(t.write_through, True) 3444 self.assertEqual(b''.join(raw._write_stack), b'1') 3445 t.write('23') 3446 self.assertEqual(b''.join(raw._write_stack), b'123') 3447 t.reconfigure(write_through=False) 3448 self.assertEqual(t.write_through, False) 3449 t.write('45') 3450 t.flush() 3451 self.assertEqual(b''.join(raw._write_stack), b'12345') 3452 # Keeping default value 3453 t.reconfigure() 3454 t.reconfigure(write_through=None) 3455 self.assertEqual(t.write_through, False) 3456 t.reconfigure(write_through=True) 3457 t.reconfigure() 3458 t.reconfigure(write_through=None) 3459 self.assertEqual(t.write_through, True) 3460 3461 def test_read_nonbytes(self): 3462 # Issue #17106 3463 # Crash when underlying read() returns non-bytes 3464 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") 3465 self.assertRaises(TypeError, t.read, 1) 3466 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") 3467 self.assertRaises(TypeError, t.readline) 3468 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") 3469 self.assertRaises(TypeError, t.read) 3470 3471 def test_illegal_encoder(self): 3472 # Issue 31271: Calling write() while the return value of encoder's 3473 # encode() is invalid shouldn't cause an assertion failure. 3474 rot13 = codecs.lookup("rot13") 3475 with support.swap_attr(rot13, '_is_text_encoding', True): 3476 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13") 3477 self.assertRaises(TypeError, t.write, 'bar') 3478 3479 def test_illegal_decoder(self): 3480 # Issue #17106 3481 # Bypass the early encoding check added in issue 20404 3482 def _make_illegal_wrapper(): 3483 quopri = codecs.lookup("quopri") 3484 quopri._is_text_encoding = True 3485 try: 3486 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), 3487 newline='\n', encoding="quopri") 3488 finally: 3489 quopri._is_text_encoding = False 3490 return t 3491 # Crash when decoder returns non-string 3492 t = _make_illegal_wrapper() 3493 self.assertRaises(TypeError, t.read, 1) 3494 t = _make_illegal_wrapper() 3495 self.assertRaises(TypeError, t.readline) 3496 t = _make_illegal_wrapper() 3497 self.assertRaises(TypeError, t.read) 3498 3499 # Issue 31243: calling read() while the return value of decoder's 3500 # getstate() is invalid should neither crash the interpreter nor 3501 # raise a SystemError. 3502 def _make_very_illegal_wrapper(getstate_ret_val): 3503 class BadDecoder: 3504 def getstate(self): 3505 return getstate_ret_val 3506 def _get_bad_decoder(dummy): 3507 return BadDecoder() 3508 quopri = codecs.lookup("quopri") 3509 with support.swap_attr(quopri, 'incrementaldecoder', 3510 _get_bad_decoder): 3511 return _make_illegal_wrapper() 3512 t = _make_very_illegal_wrapper(42) 3513 self.assertRaises(TypeError, t.read, 42) 3514 t = _make_very_illegal_wrapper(()) 3515 self.assertRaises(TypeError, t.read, 42) 3516 t = _make_very_illegal_wrapper((1, 2)) 3517 self.assertRaises(TypeError, t.read, 42) 3518 3519 def _check_create_at_shutdown(self, **kwargs): 3520 # Issue #20037: creating a TextIOWrapper at shutdown 3521 # shouldn't crash the interpreter. 3522 iomod = self.io.__name__ 3523 code = """if 1: 3524 import codecs 3525 import {iomod} as io 3526 3527 # Avoid looking up codecs at shutdown 3528 codecs.lookup('utf-8') 3529 3530 class C: 3531 def __init__(self): 3532 self.buf = io.BytesIO() 3533 def __del__(self): 3534 io.TextIOWrapper(self.buf, **{kwargs}) 3535 print("ok") 3536 c = C() 3537 """.format(iomod=iomod, kwargs=kwargs) 3538 return assert_python_ok("-c", code) 3539 3540 def test_create_at_shutdown_without_encoding(self): 3541 rc, out, err = self._check_create_at_shutdown() 3542 if err: 3543 # Can error out with a RuntimeError if the module state 3544 # isn't found. 3545 self.assertIn(self.shutdown_error, err.decode()) 3546 else: 3547 self.assertEqual("ok", out.decode().strip()) 3548 3549 def test_create_at_shutdown_with_encoding(self): 3550 rc, out, err = self._check_create_at_shutdown(encoding='utf-8', 3551 errors='strict') 3552 self.assertFalse(err) 3553 self.assertEqual("ok", out.decode().strip()) 3554 3555 def test_read_byteslike(self): 3556 r = MemviewBytesIO(b'Just some random string\n') 3557 t = self.TextIOWrapper(r, 'utf-8') 3558 3559 # TextIOwrapper will not read the full string, because 3560 # we truncate it to a multiple of the native int size 3561 # so that we can construct a more complex memoryview. 3562 bytes_val = _to_memoryview(r.getvalue()).tobytes() 3563 3564 self.assertEqual(t.read(200), bytes_val.decode('utf-8')) 3565 3566 def test_issue22849(self): 3567 class F(object): 3568 def readable(self): return True 3569 def writable(self): return True 3570 def seekable(self): return True 3571 3572 for i in range(10): 3573 try: 3574 self.TextIOWrapper(F(), encoding='utf-8') 3575 except Exception: 3576 pass 3577 3578 F.tell = lambda x: 0 3579 t = self.TextIOWrapper(F(), encoding='utf-8') 3580 3581 def test_reconfigure_locale(self): 3582 wrapper = io.TextIOWrapper(io.BytesIO(b"test")) 3583 wrapper.reconfigure(encoding="locale") 3584 3585 def test_reconfigure_encoding_read(self): 3586 # latin1 -> utf8 3587 # (latin1 can decode utf-8 encoded string) 3588 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') 3589 raw = self.BytesIO(data) 3590 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3591 self.assertEqual(txt.readline(), 'abc\xe9\n') 3592 with self.assertRaises(self.UnsupportedOperation): 3593 txt.reconfigure(encoding='utf-8') 3594 with self.assertRaises(self.UnsupportedOperation): 3595 txt.reconfigure(newline=None) 3596 3597 def test_reconfigure_write_fromascii(self): 3598 # ascii has a specific encodefunc in the C implementation, 3599 # but utf-8-sig has not. Make sure that we get rid of the 3600 # cached encodefunc when we switch encoders. 3601 raw = self.BytesIO() 3602 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3603 txt.write('foo\n') 3604 txt.reconfigure(encoding='utf-8-sig') 3605 txt.write('\xe9\n') 3606 txt.flush() 3607 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n') 3608 3609 def test_reconfigure_write(self): 3610 # latin -> utf8 3611 raw = self.BytesIO() 3612 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3613 txt.write('abc\xe9\n') 3614 txt.reconfigure(encoding='utf-8') 3615 self.assertEqual(raw.getvalue(), b'abc\xe9\n') 3616 txt.write('d\xe9f\n') 3617 txt.flush() 3618 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') 3619 3620 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of 3621 # the file 3622 raw = self.BytesIO() 3623 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3624 txt.write('abc\n') 3625 txt.reconfigure(encoding='utf-8-sig') 3626 txt.write('d\xe9f\n') 3627 txt.flush() 3628 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') 3629 3630 def test_reconfigure_write_non_seekable(self): 3631 raw = self.BytesIO() 3632 raw.seekable = lambda: False 3633 raw.seek = None 3634 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3635 txt.write('abc\n') 3636 txt.reconfigure(encoding='utf-8-sig') 3637 txt.write('d\xe9f\n') 3638 txt.flush() 3639 3640 # If the raw stream is not seekable, there'll be a BOM 3641 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n') 3642 3643 def test_reconfigure_defaults(self): 3644 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n') 3645 txt.reconfigure(encoding=None) 3646 self.assertEqual(txt.encoding, 'ascii') 3647 self.assertEqual(txt.errors, 'replace') 3648 txt.write('LF\n') 3649 3650 txt.reconfigure(newline='\r\n') 3651 self.assertEqual(txt.encoding, 'ascii') 3652 self.assertEqual(txt.errors, 'replace') 3653 3654 txt.reconfigure(errors='ignore') 3655 self.assertEqual(txt.encoding, 'ascii') 3656 self.assertEqual(txt.errors, 'ignore') 3657 txt.write('CRLF\n') 3658 3659 txt.reconfigure(encoding='utf-8', newline=None) 3660 self.assertEqual(txt.errors, 'strict') 3661 txt.seek(0) 3662 self.assertEqual(txt.read(), 'LF\nCRLF\n') 3663 3664 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') 3665 3666 def test_reconfigure_newline(self): 3667 raw = self.BytesIO(b'CR\rEOF') 3668 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3669 txt.reconfigure(newline=None) 3670 self.assertEqual(txt.readline(), 'CR\n') 3671 raw = self.BytesIO(b'CR\rEOF') 3672 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3673 txt.reconfigure(newline='') 3674 self.assertEqual(txt.readline(), 'CR\r') 3675 raw = self.BytesIO(b'CR\rLF\nEOF') 3676 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3677 txt.reconfigure(newline='\n') 3678 self.assertEqual(txt.readline(), 'CR\rLF\n') 3679 raw = self.BytesIO(b'LF\nCR\rEOF') 3680 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3681 txt.reconfigure(newline='\r') 3682 self.assertEqual(txt.readline(), 'LF\nCR\r') 3683 raw = self.BytesIO(b'CR\rCRLF\r\nEOF') 3684 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3685 txt.reconfigure(newline='\r\n') 3686 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n') 3687 3688 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r') 3689 txt.reconfigure(newline=None) 3690 txt.write('linesep\n') 3691 txt.reconfigure(newline='') 3692 txt.write('LF\n') 3693 txt.reconfigure(newline='\n') 3694 txt.write('LF\n') 3695 txt.reconfigure(newline='\r') 3696 txt.write('CR\n') 3697 txt.reconfigure(newline='\r\n') 3698 txt.write('CRLF\n') 3699 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n' 3700 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected) 3701 3702 def test_issue25862(self): 3703 # Assertion failures occurred in tell() after read() and write(). 3704 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3705 t.read(1) 3706 t.read() 3707 t.tell() 3708 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3709 t.read(1) 3710 t.write('x') 3711 t.tell() 3712 3713 3714class MemviewBytesIO(io.BytesIO): 3715 '''A BytesIO object whose read method returns memoryviews 3716 rather than bytes''' 3717 3718 def read1(self, len_): 3719 return _to_memoryview(super().read1(len_)) 3720 3721 def read(self, len_): 3722 return _to_memoryview(super().read(len_)) 3723 3724def _to_memoryview(buf): 3725 '''Convert bytes-object *buf* to a non-trivial memoryview''' 3726 3727 arr = array.array('i') 3728 idx = len(buf) - len(buf) % arr.itemsize 3729 arr.frombytes(buf[:idx]) 3730 return memoryview(arr) 3731 3732 3733class CTextIOWrapperTest(TextIOWrapperTest): 3734 io = io 3735 shutdown_error = "LookupError: unknown encoding: ascii" 3736 3737 def test_initialization(self): 3738 r = self.BytesIO(b"\xc3\xa9\n\n") 3739 b = self.BufferedReader(r, 1000) 3740 t = self.TextIOWrapper(b, encoding="utf-8") 3741 self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy') 3742 self.assertRaises(ValueError, t.read) 3743 3744 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 3745 self.assertRaises(Exception, repr, t) 3746 3747 def test_garbage_collection(self): 3748 # C TextIOWrapper objects are collected, and collecting them flushes 3749 # all data to disk. 3750 # The Python version has __del__, so it ends in gc.garbage instead. 3751 with warnings_helper.check_warnings(('', ResourceWarning)): 3752 rawio = io.FileIO(os_helper.TESTFN, "wb") 3753 b = self.BufferedWriter(rawio) 3754 t = self.TextIOWrapper(b, encoding="ascii") 3755 t.write("456def") 3756 t.x = t 3757 wr = weakref.ref(t) 3758 del t 3759 support.gc_collect() 3760 self.assertIsNone(wr(), wr) 3761 with self.open(os_helper.TESTFN, "rb") as f: 3762 self.assertEqual(f.read(), b"456def") 3763 3764 def test_rwpair_cleared_before_textio(self): 3765 # Issue 13070: TextIOWrapper's finalization would crash when called 3766 # after the reference to the underlying BufferedRWPair's writer got 3767 # cleared by the GC. 3768 for i in range(1000): 3769 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3770 t1 = self.TextIOWrapper(b1, encoding="ascii") 3771 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3772 t2 = self.TextIOWrapper(b2, encoding="ascii") 3773 # circular references 3774 t1.buddy = t2 3775 t2.buddy = t1 3776 support.gc_collect() 3777 3778 def test_del__CHUNK_SIZE_SystemError(self): 3779 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii') 3780 with self.assertRaises(AttributeError): 3781 del t._CHUNK_SIZE 3782 3783 def test_internal_buffer_size(self): 3784 # bpo-43260: TextIOWrapper's internal buffer should not store 3785 # data larger than chunk size. 3786 chunk_size = 8192 # default chunk size, updated later 3787 3788 class MockIO(self.MockRawIO): 3789 def write(self, data): 3790 if len(data) > chunk_size: 3791 raise RuntimeError 3792 return super().write(data) 3793 3794 buf = MockIO() 3795 t = self.TextIOWrapper(buf, encoding="ascii") 3796 chunk_size = t._CHUNK_SIZE 3797 t.write("abc") 3798 t.write("def") 3799 # default chunk size is 8192 bytes so t don't write data to buf. 3800 self.assertEqual([], buf._write_stack) 3801 3802 with self.assertRaises(RuntimeError): 3803 t.write("x"*(chunk_size+1)) 3804 3805 self.assertEqual([b"abcdef"], buf._write_stack) 3806 t.write("ghi") 3807 t.write("x"*chunk_size) 3808 self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack) 3809 3810 3811class PyTextIOWrapperTest(TextIOWrapperTest): 3812 io = pyio 3813 shutdown_error = "LookupError: unknown encoding: ascii" 3814 3815 3816class IncrementalNewlineDecoderTest(unittest.TestCase): 3817 3818 def check_newline_decoding_utf8(self, decoder): 3819 # UTF-8 specific tests for a newline decoder 3820 def _check_decode(b, s, **kwargs): 3821 # We exercise getstate() / setstate() as well as decode() 3822 state = decoder.getstate() 3823 self.assertEqual(decoder.decode(b, **kwargs), s) 3824 decoder.setstate(state) 3825 self.assertEqual(decoder.decode(b, **kwargs), s) 3826 3827 _check_decode(b'\xe8\xa2\x88', "\u8888") 3828 3829 _check_decode(b'\xe8', "") 3830 _check_decode(b'\xa2', "") 3831 _check_decode(b'\x88', "\u8888") 3832 3833 _check_decode(b'\xe8', "") 3834 _check_decode(b'\xa2', "") 3835 _check_decode(b'\x88', "\u8888") 3836 3837 _check_decode(b'\xe8', "") 3838 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 3839 3840 decoder.reset() 3841 _check_decode(b'\n', "\n") 3842 _check_decode(b'\r', "") 3843 _check_decode(b'', "\n", final=True) 3844 _check_decode(b'\r', "\n", final=True) 3845 3846 _check_decode(b'\r', "") 3847 _check_decode(b'a', "\na") 3848 3849 _check_decode(b'\r\r\n', "\n\n") 3850 _check_decode(b'\r', "") 3851 _check_decode(b'\r', "\n") 3852 _check_decode(b'\na', "\na") 3853 3854 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 3855 _check_decode(b'\xe8\xa2\x88', "\u8888") 3856 _check_decode(b'\n', "\n") 3857 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 3858 _check_decode(b'\n', "\n") 3859 3860 def check_newline_decoding(self, decoder, encoding): 3861 result = [] 3862 if encoding is not None: 3863 encoder = codecs.getincrementalencoder(encoding)() 3864 def _decode_bytewise(s): 3865 # Decode one byte at a time 3866 for b in encoder.encode(s): 3867 result.append(decoder.decode(bytes([b]))) 3868 else: 3869 encoder = None 3870 def _decode_bytewise(s): 3871 # Decode one char at a time 3872 for c in s: 3873 result.append(decoder.decode(c)) 3874 self.assertEqual(decoder.newlines, None) 3875 _decode_bytewise("abc\n\r") 3876 self.assertEqual(decoder.newlines, '\n') 3877 _decode_bytewise("\nabc") 3878 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3879 _decode_bytewise("abc\r") 3880 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3881 _decode_bytewise("abc") 3882 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 3883 _decode_bytewise("abc\r") 3884 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 3885 decoder.reset() 3886 input = "abc" 3887 if encoder is not None: 3888 encoder.reset() 3889 input = encoder.encode(input) 3890 self.assertEqual(decoder.decode(input), "abc") 3891 self.assertEqual(decoder.newlines, None) 3892 3893 def test_newline_decoder(self): 3894 encodings = ( 3895 # None meaning the IncrementalNewlineDecoder takes unicode input 3896 # rather than bytes input 3897 None, 'utf-8', 'latin-1', 3898 'utf-16', 'utf-16-le', 'utf-16-be', 3899 'utf-32', 'utf-32-le', 'utf-32-be', 3900 ) 3901 for enc in encodings: 3902 decoder = enc and codecs.getincrementaldecoder(enc)() 3903 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3904 self.check_newline_decoding(decoder, enc) 3905 decoder = codecs.getincrementaldecoder("utf-8")() 3906 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3907 self.check_newline_decoding_utf8(decoder) 3908 self.assertRaises(TypeError, decoder.setstate, 42) 3909 3910 def test_newline_bytes(self): 3911 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 3912 def _check(dec): 3913 self.assertEqual(dec.newlines, None) 3914 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 3915 self.assertEqual(dec.newlines, None) 3916 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 3917 self.assertEqual(dec.newlines, None) 3918 dec = self.IncrementalNewlineDecoder(None, translate=False) 3919 _check(dec) 3920 dec = self.IncrementalNewlineDecoder(None, translate=True) 3921 _check(dec) 3922 3923 def test_translate(self): 3924 # issue 35062 3925 for translate in (-2, -1, 1, 2): 3926 decoder = codecs.getincrementaldecoder("utf-8")() 3927 decoder = self.IncrementalNewlineDecoder(decoder, translate) 3928 self.check_newline_decoding_utf8(decoder) 3929 decoder = codecs.getincrementaldecoder("utf-8")() 3930 decoder = self.IncrementalNewlineDecoder(decoder, translate=0) 3931 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n") 3932 3933class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3934 @support.cpython_only 3935 def test_uninitialized(self): 3936 uninitialized = self.IncrementalNewlineDecoder.__new__( 3937 self.IncrementalNewlineDecoder) 3938 self.assertRaises(ValueError, uninitialized.decode, b'bar') 3939 self.assertRaises(ValueError, uninitialized.getstate) 3940 self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0)) 3941 self.assertRaises(ValueError, uninitialized.reset) 3942 3943 3944class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3945 pass 3946 3947 3948# XXX Tests for open() 3949 3950class MiscIOTest(unittest.TestCase): 3951 3952 def tearDown(self): 3953 os_helper.unlink(os_helper.TESTFN) 3954 3955 def test___all__(self): 3956 for name in self.io.__all__: 3957 obj = getattr(self.io, name, None) 3958 self.assertIsNotNone(obj, name) 3959 if name in ("open", "open_code"): 3960 continue 3961 elif "error" in name.lower() or name == "UnsupportedOperation": 3962 self.assertTrue(issubclass(obj, Exception), name) 3963 elif not name.startswith("SEEK_"): 3964 self.assertTrue(issubclass(obj, self.IOBase)) 3965 3966 def test_attributes(self): 3967 f = self.open(os_helper.TESTFN, "wb", buffering=0) 3968 self.assertEqual(f.mode, "wb") 3969 f.close() 3970 3971 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8") 3972 self.assertEqual(f.mode, "w+") 3973 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 3974 self.assertEqual(f.buffer.raw.mode, "rb+") 3975 3976 g = self.open(f.fileno(), "wb", closefd=False) 3977 self.assertEqual(g.mode, "wb") 3978 self.assertEqual(g.raw.mode, "wb") 3979 self.assertEqual(g.name, f.fileno()) 3980 self.assertEqual(g.raw.name, f.fileno()) 3981 f.close() 3982 g.close() 3983 3984 def test_removed_u_mode(self): 3985 # bpo-37330: The "U" mode has been removed in Python 3.11 3986 for mode in ("U", "rU", "r+U"): 3987 with self.assertRaises(ValueError) as cm: 3988 self.open(os_helper.TESTFN, mode) 3989 self.assertIn('invalid mode', str(cm.exception)) 3990 3991 @unittest.skipIf( 3992 support.is_emscripten, "fstat() of a pipe fd is not supported" 3993 ) 3994 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 3995 def test_open_pipe_with_append(self): 3996 # bpo-27805: Ignore ESPIPE from lseek() in open(). 3997 r, w = os.pipe() 3998 self.addCleanup(os.close, r) 3999 f = self.open(w, 'a', encoding="utf-8") 4000 self.addCleanup(f.close) 4001 # Check that the file is marked non-seekable. On Windows, however, lseek 4002 # somehow succeeds on pipes. 4003 if sys.platform != 'win32': 4004 self.assertFalse(f.seekable()) 4005 4006 def test_io_after_close(self): 4007 for kwargs in [ 4008 {"mode": "w"}, 4009 {"mode": "wb"}, 4010 {"mode": "w", "buffering": 1}, 4011 {"mode": "w", "buffering": 2}, 4012 {"mode": "wb", "buffering": 0}, 4013 {"mode": "r"}, 4014 {"mode": "rb"}, 4015 {"mode": "r", "buffering": 1}, 4016 {"mode": "r", "buffering": 2}, 4017 {"mode": "rb", "buffering": 0}, 4018 {"mode": "w+"}, 4019 {"mode": "w+b"}, 4020 {"mode": "w+", "buffering": 1}, 4021 {"mode": "w+", "buffering": 2}, 4022 {"mode": "w+b", "buffering": 0}, 4023 ]: 4024 if "b" not in kwargs["mode"]: 4025 kwargs["encoding"] = "utf-8" 4026 f = self.open(os_helper.TESTFN, **kwargs) 4027 f.close() 4028 self.assertRaises(ValueError, f.flush) 4029 self.assertRaises(ValueError, f.fileno) 4030 self.assertRaises(ValueError, f.isatty) 4031 self.assertRaises(ValueError, f.__iter__) 4032 if hasattr(f, "peek"): 4033 self.assertRaises(ValueError, f.peek, 1) 4034 self.assertRaises(ValueError, f.read) 4035 if hasattr(f, "read1"): 4036 self.assertRaises(ValueError, f.read1, 1024) 4037 self.assertRaises(ValueError, f.read1) 4038 if hasattr(f, "readall"): 4039 self.assertRaises(ValueError, f.readall) 4040 if hasattr(f, "readinto"): 4041 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 4042 if hasattr(f, "readinto1"): 4043 self.assertRaises(ValueError, f.readinto1, bytearray(1024)) 4044 self.assertRaises(ValueError, f.readline) 4045 self.assertRaises(ValueError, f.readlines) 4046 self.assertRaises(ValueError, f.readlines, 1) 4047 self.assertRaises(ValueError, f.seek, 0) 4048 self.assertRaises(ValueError, f.tell) 4049 self.assertRaises(ValueError, f.truncate) 4050 self.assertRaises(ValueError, f.write, 4051 b"" if "b" in kwargs['mode'] else "") 4052 self.assertRaises(ValueError, f.writelines, []) 4053 self.assertRaises(ValueError, next, f) 4054 4055 def test_blockingioerror(self): 4056 # Various BlockingIOError issues 4057 class C(str): 4058 pass 4059 c = C("") 4060 b = self.BlockingIOError(1, c) 4061 c.b = b 4062 b.c = c 4063 wr = weakref.ref(c) 4064 del c, b 4065 support.gc_collect() 4066 self.assertIsNone(wr(), wr) 4067 4068 def test_abcs(self): 4069 # Test the visible base classes are ABCs. 4070 self.assertIsInstance(self.IOBase, abc.ABCMeta) 4071 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 4072 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 4073 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 4074 4075 def _check_abc_inheritance(self, abcmodule): 4076 with self.open(os_helper.TESTFN, "wb", buffering=0) as f: 4077 self.assertIsInstance(f, abcmodule.IOBase) 4078 self.assertIsInstance(f, abcmodule.RawIOBase) 4079 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 4080 self.assertNotIsInstance(f, abcmodule.TextIOBase) 4081 with self.open(os_helper.TESTFN, "wb") as f: 4082 self.assertIsInstance(f, abcmodule.IOBase) 4083 self.assertNotIsInstance(f, abcmodule.RawIOBase) 4084 self.assertIsInstance(f, abcmodule.BufferedIOBase) 4085 self.assertNotIsInstance(f, abcmodule.TextIOBase) 4086 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: 4087 self.assertIsInstance(f, abcmodule.IOBase) 4088 self.assertNotIsInstance(f, abcmodule.RawIOBase) 4089 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 4090 self.assertIsInstance(f, abcmodule.TextIOBase) 4091 4092 def test_abc_inheritance(self): 4093 # Test implementations inherit from their respective ABCs 4094 self._check_abc_inheritance(self) 4095 4096 def test_abc_inheritance_official(self): 4097 # Test implementations inherit from the official ABCs of the 4098 # baseline "io" module. 4099 self._check_abc_inheritance(io) 4100 4101 def _check_warn_on_dealloc(self, *args, **kwargs): 4102 f = open(*args, **kwargs) 4103 r = repr(f) 4104 with self.assertWarns(ResourceWarning) as cm: 4105 f = None 4106 support.gc_collect() 4107 self.assertIn(r, str(cm.warning.args[0])) 4108 4109 def test_warn_on_dealloc(self): 4110 self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0) 4111 self._check_warn_on_dealloc(os_helper.TESTFN, "wb") 4112 self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8") 4113 4114 def _check_warn_on_dealloc_fd(self, *args, **kwargs): 4115 fds = [] 4116 def cleanup_fds(): 4117 for fd in fds: 4118 try: 4119 os.close(fd) 4120 except OSError as e: 4121 if e.errno != errno.EBADF: 4122 raise 4123 self.addCleanup(cleanup_fds) 4124 r, w = os.pipe() 4125 fds += r, w 4126 self._check_warn_on_dealloc(r, *args, **kwargs) 4127 # When using closefd=False, there's no warning 4128 r, w = os.pipe() 4129 fds += r, w 4130 with warnings_helper.check_no_resource_warning(self): 4131 open(r, *args, closefd=False, **kwargs) 4132 4133 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 4134 def test_warn_on_dealloc_fd(self): 4135 self._check_warn_on_dealloc_fd("rb", buffering=0) 4136 self._check_warn_on_dealloc_fd("rb") 4137 self._check_warn_on_dealloc_fd("r", encoding="utf-8") 4138 4139 4140 def test_pickling(self): 4141 # Pickling file objects is forbidden 4142 for kwargs in [ 4143 {"mode": "w"}, 4144 {"mode": "wb"}, 4145 {"mode": "wb", "buffering": 0}, 4146 {"mode": "r"}, 4147 {"mode": "rb"}, 4148 {"mode": "rb", "buffering": 0}, 4149 {"mode": "w+"}, 4150 {"mode": "w+b"}, 4151 {"mode": "w+b", "buffering": 0}, 4152 ]: 4153 if "b" not in kwargs["mode"]: 4154 kwargs["encoding"] = "utf-8" 4155 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 4156 with self.open(os_helper.TESTFN, **kwargs) as f: 4157 self.assertRaises(TypeError, pickle.dumps, f, protocol) 4158 4159 @unittest.skipIf( 4160 support.is_emscripten, "fstat() of a pipe fd is not supported" 4161 ) 4162 def test_nonblock_pipe_write_bigbuf(self): 4163 self._test_nonblock_pipe_write(16*1024) 4164 4165 @unittest.skipIf( 4166 support.is_emscripten, "fstat() of a pipe fd is not supported" 4167 ) 4168 def test_nonblock_pipe_write_smallbuf(self): 4169 self._test_nonblock_pipe_write(1024) 4170 4171 @unittest.skipUnless(hasattr(os, 'set_blocking'), 4172 'os.set_blocking() required for this test') 4173 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 4174 def _test_nonblock_pipe_write(self, bufsize): 4175 sent = [] 4176 received = [] 4177 r, w = os.pipe() 4178 os.set_blocking(r, False) 4179 os.set_blocking(w, False) 4180 4181 # To exercise all code paths in the C implementation we need 4182 # to play with buffer sizes. For instance, if we choose a 4183 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 4184 # then we will never get a partial write of the buffer. 4185 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 4186 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 4187 4188 with rf, wf: 4189 for N in 9999, 73, 7574: 4190 try: 4191 i = 0 4192 while True: 4193 msg = bytes([i % 26 + 97]) * N 4194 sent.append(msg) 4195 wf.write(msg) 4196 i += 1 4197 4198 except self.BlockingIOError as e: 4199 self.assertEqual(e.args[0], errno.EAGAIN) 4200 self.assertEqual(e.args[2], e.characters_written) 4201 sent[-1] = sent[-1][:e.characters_written] 4202 received.append(rf.read()) 4203 msg = b'BLOCKED' 4204 wf.write(msg) 4205 sent.append(msg) 4206 4207 while True: 4208 try: 4209 wf.flush() 4210 break 4211 except self.BlockingIOError as e: 4212 self.assertEqual(e.args[0], errno.EAGAIN) 4213 self.assertEqual(e.args[2], e.characters_written) 4214 self.assertEqual(e.characters_written, 0) 4215 received.append(rf.read()) 4216 4217 received += iter(rf.read, None) 4218 4219 sent, received = b''.join(sent), b''.join(received) 4220 self.assertEqual(sent, received) 4221 self.assertTrue(wf.closed) 4222 self.assertTrue(rf.closed) 4223 4224 def test_create_fail(self): 4225 # 'x' mode fails if file is existing 4226 with self.open(os_helper.TESTFN, 'w', encoding="utf-8"): 4227 pass 4228 self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8") 4229 4230 def test_create_writes(self): 4231 # 'x' mode opens for writing 4232 with self.open(os_helper.TESTFN, 'xb') as f: 4233 f.write(b"spam") 4234 with self.open(os_helper.TESTFN, 'rb') as f: 4235 self.assertEqual(b"spam", f.read()) 4236 4237 def test_open_allargs(self): 4238 # there used to be a buffer overflow in the parser for rawmode 4239 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8") 4240 4241 def test_check_encoding_errors(self): 4242 # bpo-37388: open() and TextIOWrapper must check encoding and errors 4243 # arguments in dev mode 4244 mod = self.io.__name__ 4245 filename = __file__ 4246 invalid = 'Boom, Shaka Laka, Boom!' 4247 code = textwrap.dedent(f''' 4248 import sys 4249 from {mod} import open, TextIOWrapper 4250 4251 try: 4252 open({filename!r}, encoding={invalid!r}) 4253 except LookupError: 4254 pass 4255 else: 4256 sys.exit(21) 4257 4258 try: 4259 open({filename!r}, errors={invalid!r}) 4260 except LookupError: 4261 pass 4262 else: 4263 sys.exit(22) 4264 4265 fp = open({filename!r}, "rb") 4266 with fp: 4267 try: 4268 TextIOWrapper(fp, encoding={invalid!r}) 4269 except LookupError: 4270 pass 4271 else: 4272 sys.exit(23) 4273 4274 try: 4275 TextIOWrapper(fp, errors={invalid!r}) 4276 except LookupError: 4277 pass 4278 else: 4279 sys.exit(24) 4280 4281 sys.exit(10) 4282 ''') 4283 proc = assert_python_failure('-X', 'dev', '-c', code) 4284 self.assertEqual(proc.rc, 10, proc) 4285 4286 def test_check_encoding_warning(self): 4287 # PEP 597: Raise warning when encoding is not specified 4288 # and sys.flags.warn_default_encoding is set. 4289 mod = self.io.__name__ 4290 filename = __file__ 4291 code = textwrap.dedent(f'''\ 4292 import sys 4293 from {mod} import open, TextIOWrapper 4294 import pathlib 4295 4296 with open({filename!r}) as f: # line 5 4297 pass 4298 4299 pathlib.Path({filename!r}).read_text() # line 8 4300 ''') 4301 proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code) 4302 warnings = proc.err.splitlines() 4303 self.assertEqual(len(warnings), 2) 4304 self.assertTrue( 4305 warnings[0].startswith(b"<string>:5: EncodingWarning: ")) 4306 self.assertTrue( 4307 warnings[1].startswith(b"<string>:8: EncodingWarning: ")) 4308 4309 def test_text_encoding(self): 4310 # PEP 597, bpo-47000. io.text_encoding() returns "locale" or "utf-8" 4311 # based on sys.flags.utf8_mode 4312 code = "import io; print(io.text_encoding(None))" 4313 4314 proc = assert_python_ok('-X', 'utf8=0', '-c', code) 4315 self.assertEqual(b"locale", proc.out.strip()) 4316 4317 proc = assert_python_ok('-X', 'utf8=1', '-c', code) 4318 self.assertEqual(b"utf-8", proc.out.strip()) 4319 4320 @support.cpython_only 4321 # Depending if OpenWrapper was already created or not, the warning is 4322 # emitted or not. For example, the attribute is already created when this 4323 # test is run multiple times. 4324 @warnings_helper.ignore_warnings(category=DeprecationWarning) 4325 def test_openwrapper(self): 4326 self.assertIs(self.io.OpenWrapper, self.io.open) 4327 4328 4329class CMiscIOTest(MiscIOTest): 4330 io = io 4331 4332 def test_readinto_buffer_overflow(self): 4333 # Issue #18025 4334 class BadReader(self.io.BufferedIOBase): 4335 def read(self, n=-1): 4336 return b'x' * 10**6 4337 bufio = BadReader() 4338 b = bytearray(2) 4339 self.assertRaises(ValueError, bufio.readinto, b) 4340 4341 def check_daemon_threads_shutdown_deadlock(self, stream_name): 4342 # Issue #23309: deadlocks at shutdown should be avoided when a 4343 # daemon thread and the main thread both write to a file. 4344 code = """if 1: 4345 import sys 4346 import time 4347 import threading 4348 from test.support import SuppressCrashReport 4349 4350 file = sys.{stream_name} 4351 4352 def run(): 4353 while True: 4354 file.write('.') 4355 file.flush() 4356 4357 crash = SuppressCrashReport() 4358 crash.__enter__() 4359 # don't call __exit__(): the crash occurs at Python shutdown 4360 4361 thread = threading.Thread(target=run) 4362 thread.daemon = True 4363 thread.start() 4364 4365 time.sleep(0.5) 4366 file.write('!') 4367 file.flush() 4368 """.format_map(locals()) 4369 res, _ = run_python_until_end("-c", code) 4370 err = res.err.decode() 4371 if res.rc != 0: 4372 # Failure: should be a fatal error 4373 pattern = (r"Fatal Python error: _enter_buffered_busy: " 4374 r"could not acquire lock " 4375 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> " 4376 r"at interpreter shutdown, possibly due to " 4377 r"daemon threads".format_map(locals())) 4378 self.assertRegex(err, pattern) 4379 else: 4380 self.assertFalse(err.strip('.!')) 4381 4382 @threading_helper.requires_working_threading() 4383 def test_daemon_threads_shutdown_stdout_deadlock(self): 4384 self.check_daemon_threads_shutdown_deadlock('stdout') 4385 4386 @threading_helper.requires_working_threading() 4387 def test_daemon_threads_shutdown_stderr_deadlock(self): 4388 self.check_daemon_threads_shutdown_deadlock('stderr') 4389 4390 4391class PyMiscIOTest(MiscIOTest): 4392 io = pyio 4393 4394 4395@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 4396class SignalsTest(unittest.TestCase): 4397 4398 def setUp(self): 4399 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 4400 4401 def tearDown(self): 4402 signal.signal(signal.SIGALRM, self.oldalrm) 4403 4404 def alarm_interrupt(self, sig, frame): 4405 1/0 4406 4407 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 4408 """Check that a partial write, when it gets interrupted, properly 4409 invokes the signal handler, and bubbles up the exception raised 4410 in the latter.""" 4411 4412 # XXX This test has three flaws that appear when objects are 4413 # XXX not reference counted. 4414 4415 # - if wio.write() happens to trigger a garbage collection, 4416 # the signal exception may be raised when some __del__ 4417 # method is running; it will not reach the assertRaises() 4418 # call. 4419 4420 # - more subtle, if the wio object is not destroyed at once 4421 # and survives this function, the next opened file is likely 4422 # to have the same fileno (since the file descriptor was 4423 # actively closed). When wio.__del__ is finally called, it 4424 # will close the other's test file... To trigger this with 4425 # CPython, try adding "global wio" in this function. 4426 4427 # - This happens only for streams created by the _pyio module, 4428 # because a wio.close() that fails still consider that the 4429 # file needs to be closed again. You can try adding an 4430 # "assert wio.closed" at the end of the function. 4431 4432 # Fortunately, a little gc.collect() seems to be enough to 4433 # work around all these issues. 4434 support.gc_collect() # For PyPy or other GCs. 4435 4436 read_results = [] 4437 def _read(): 4438 s = os.read(r, 1) 4439 read_results.append(s) 4440 4441 t = threading.Thread(target=_read) 4442 t.daemon = True 4443 r, w = os.pipe() 4444 fdopen_kwargs["closefd"] = False 4445 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1) 4446 try: 4447 wio = self.io.open(w, **fdopen_kwargs) 4448 if hasattr(signal, 'pthread_sigmask'): 4449 # create the thread with SIGALRM signal blocked 4450 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM]) 4451 t.start() 4452 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM]) 4453 else: 4454 t.start() 4455 4456 # Fill the pipe enough that the write will be blocking. 4457 # It will be interrupted by the timer armed above. Since the 4458 # other thread has read one byte, the low-level write will 4459 # return with a successful (partial) result rather than an EINTR. 4460 # The buffered IO layer must check for pending signal 4461 # handlers, which in this case will invoke alarm_interrupt(). 4462 signal.alarm(1) 4463 try: 4464 self.assertRaises(ZeroDivisionError, wio.write, large_data) 4465 finally: 4466 signal.alarm(0) 4467 t.join() 4468 # We got one byte, get another one and check that it isn't a 4469 # repeat of the first one. 4470 read_results.append(os.read(r, 1)) 4471 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 4472 finally: 4473 os.close(w) 4474 os.close(r) 4475 # This is deliberate. If we didn't close the file descriptor 4476 # before closing wio, wio would try to flush its internal 4477 # buffer, and block again. 4478 try: 4479 wio.close() 4480 except OSError as e: 4481 if e.errno != errno.EBADF: 4482 raise 4483 4484 @requires_alarm 4485 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 4486 def test_interrupted_write_unbuffered(self): 4487 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 4488 4489 @requires_alarm 4490 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 4491 def test_interrupted_write_buffered(self): 4492 self.check_interrupted_write(b"xy", b"xy", mode="wb") 4493 4494 @requires_alarm 4495 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 4496 def test_interrupted_write_text(self): 4497 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 4498 4499 @support.no_tracing 4500 def check_reentrant_write(self, data, **fdopen_kwargs): 4501 def on_alarm(*args): 4502 # Will be called reentrantly from the same thread 4503 wio.write(data) 4504 1/0 4505 signal.signal(signal.SIGALRM, on_alarm) 4506 r, w = os.pipe() 4507 wio = self.io.open(w, **fdopen_kwargs) 4508 try: 4509 signal.alarm(1) 4510 # Either the reentrant call to wio.write() fails with RuntimeError, 4511 # or the signal handler raises ZeroDivisionError. 4512 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 4513 while 1: 4514 for i in range(100): 4515 wio.write(data) 4516 wio.flush() 4517 # Make sure the buffer doesn't fill up and block further writes 4518 os.read(r, len(data) * 100) 4519 exc = cm.exception 4520 if isinstance(exc, RuntimeError): 4521 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 4522 finally: 4523 signal.alarm(0) 4524 wio.close() 4525 os.close(r) 4526 4527 @requires_alarm 4528 def test_reentrant_write_buffered(self): 4529 self.check_reentrant_write(b"xy", mode="wb") 4530 4531 @requires_alarm 4532 def test_reentrant_write_text(self): 4533 self.check_reentrant_write("xy", mode="w", encoding="ascii") 4534 4535 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 4536 """Check that a buffered read, when it gets interrupted (either 4537 returning a partial result or EINTR), properly invokes the signal 4538 handler and retries if the latter returned successfully.""" 4539 r, w = os.pipe() 4540 fdopen_kwargs["closefd"] = False 4541 def alarm_handler(sig, frame): 4542 os.write(w, b"bar") 4543 signal.signal(signal.SIGALRM, alarm_handler) 4544 try: 4545 rio = self.io.open(r, **fdopen_kwargs) 4546 os.write(w, b"foo") 4547 signal.alarm(1) 4548 # Expected behaviour: 4549 # - first raw read() returns partial b"foo" 4550 # - second raw read() returns EINTR 4551 # - third raw read() returns b"bar" 4552 self.assertEqual(decode(rio.read(6)), "foobar") 4553 finally: 4554 signal.alarm(0) 4555 rio.close() 4556 os.close(w) 4557 os.close(r) 4558 4559 @requires_alarm 4560 def test_interrupted_read_retry_buffered(self): 4561 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 4562 mode="rb") 4563 4564 @requires_alarm 4565 def test_interrupted_read_retry_text(self): 4566 self.check_interrupted_read_retry(lambda x: x, 4567 mode="r", encoding="latin1") 4568 4569 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 4570 """Check that a buffered write, when it gets interrupted (either 4571 returning a partial result or EINTR), properly invokes the signal 4572 handler and retries if the latter returned successfully.""" 4573 select = import_helper.import_module("select") 4574 4575 # A quantity that exceeds the buffer size of an anonymous pipe's 4576 # write end. 4577 N = support.PIPE_MAX_SIZE 4578 r, w = os.pipe() 4579 fdopen_kwargs["closefd"] = False 4580 4581 # We need a separate thread to read from the pipe and allow the 4582 # write() to finish. This thread is started after the SIGALRM is 4583 # received (forcing a first EINTR in write()). 4584 read_results = [] 4585 write_finished = False 4586 error = None 4587 def _read(): 4588 try: 4589 while not write_finished: 4590 while r in select.select([r], [], [], 1.0)[0]: 4591 s = os.read(r, 1024) 4592 read_results.append(s) 4593 except BaseException as exc: 4594 nonlocal error 4595 error = exc 4596 t = threading.Thread(target=_read) 4597 t.daemon = True 4598 def alarm1(sig, frame): 4599 signal.signal(signal.SIGALRM, alarm2) 4600 signal.alarm(1) 4601 def alarm2(sig, frame): 4602 t.start() 4603 4604 large_data = item * N 4605 signal.signal(signal.SIGALRM, alarm1) 4606 try: 4607 wio = self.io.open(w, **fdopen_kwargs) 4608 signal.alarm(1) 4609 # Expected behaviour: 4610 # - first raw write() is partial (because of the limited pipe buffer 4611 # and the first alarm) 4612 # - second raw write() returns EINTR (because of the second alarm) 4613 # - subsequent write()s are successful (either partial or complete) 4614 written = wio.write(large_data) 4615 self.assertEqual(N, written) 4616 4617 wio.flush() 4618 write_finished = True 4619 t.join() 4620 4621 self.assertIsNone(error) 4622 self.assertEqual(N, sum(len(x) for x in read_results)) 4623 finally: 4624 signal.alarm(0) 4625 write_finished = True 4626 os.close(w) 4627 os.close(r) 4628 # This is deliberate. If we didn't close the file descriptor 4629 # before closing wio, wio would try to flush its internal 4630 # buffer, and could block (in case of failure). 4631 try: 4632 wio.close() 4633 except OSError as e: 4634 if e.errno != errno.EBADF: 4635 raise 4636 4637 @requires_alarm 4638 def test_interrupted_write_retry_buffered(self): 4639 self.check_interrupted_write_retry(b"x", mode="wb") 4640 4641 @requires_alarm 4642 def test_interrupted_write_retry_text(self): 4643 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 4644 4645 4646class CSignalsTest(SignalsTest): 4647 io = io 4648 4649class PySignalsTest(SignalsTest): 4650 io = pyio 4651 4652 # Handling reentrancy issues would slow down _pyio even more, so the 4653 # tests are disabled. 4654 test_reentrant_write_buffered = None 4655 test_reentrant_write_text = None 4656 4657 4658def load_tests(loader, tests, pattern): 4659 tests = (CIOTest, PyIOTest, APIMismatchTest, 4660 CBufferedReaderTest, PyBufferedReaderTest, 4661 CBufferedWriterTest, PyBufferedWriterTest, 4662 CBufferedRWPairTest, PyBufferedRWPairTest, 4663 CBufferedRandomTest, PyBufferedRandomTest, 4664 StatefulIncrementalDecoderTest, 4665 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 4666 CTextIOWrapperTest, PyTextIOWrapperTest, 4667 CMiscIOTest, PyMiscIOTest, 4668 CSignalsTest, PySignalsTest, 4669 ) 4670 4671 # Put the namespaces of the IO module we are testing and some useful mock 4672 # classes in the __dict__ of each test. 4673 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 4674 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead, 4675 SlowFlushRawIO) 4676 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 4677 c_io_ns = {name : getattr(io, name) for name in all_members} 4678 py_io_ns = {name : getattr(pyio, name) for name in all_members} 4679 globs = globals() 4680 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 4681 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 4682 for test in tests: 4683 if test.__name__.startswith("C"): 4684 for name, obj in c_io_ns.items(): 4685 setattr(test, name, obj) 4686 elif test.__name__.startswith("Py"): 4687 for name, obj in py_io_ns.items(): 4688 setattr(test, name, obj) 4689 4690 suite = loader.suiteClass() 4691 for test in tests: 4692 suite.addTest(loader.loadTestsFromTestCase(test)) 4693 return suite 4694 4695if __name__ == "__main__": 4696 unittest.main() 4697