1import gc 2import sys 3import doctest 4import unittest 5import collections 6import weakref 7import operator 8import contextlib 9import copy 10import threading 11import time 12import random 13 14from test import support 15from test.support import script_helper, ALWAYS_EQ 16from test.support import gc_collect 17from test.support import threading_helper 18 19# Used in ReferencesTestCase.test_ref_created_during_del() . 20ref_from_del = None 21 22# Used by FinalizeTestCase as a global that may be replaced by None 23# when the interpreter shuts down. 24_global_var = 'foobar' 25 26class C: 27 def method(self): 28 pass 29 30 31class Callable: 32 bar = None 33 34 def __call__(self, x): 35 self.bar = x 36 37 38def create_function(): 39 def f(): pass 40 return f 41 42def create_bound_method(): 43 return C().method 44 45 46class Object: 47 def __init__(self, arg): 48 self.arg = arg 49 def __repr__(self): 50 return "<Object %r>" % self.arg 51 def __eq__(self, other): 52 if isinstance(other, Object): 53 return self.arg == other.arg 54 return NotImplemented 55 def __lt__(self, other): 56 if isinstance(other, Object): 57 return self.arg < other.arg 58 return NotImplemented 59 def __hash__(self): 60 return hash(self.arg) 61 def some_method(self): 62 return 4 63 def other_method(self): 64 return 5 65 66 67class RefCycle: 68 def __init__(self): 69 self.cycle = self 70 71 72class TestBase(unittest.TestCase): 73 74 def setUp(self): 75 self.cbcalled = 0 76 77 def callback(self, ref): 78 self.cbcalled += 1 79 80 81@contextlib.contextmanager 82def collect_in_thread(period=0.0001): 83 """ 84 Ensure GC collections happen in a different thread, at a high frequency. 85 """ 86 please_stop = False 87 88 def collect(): 89 while not please_stop: 90 time.sleep(period) 91 gc.collect() 92 93 with support.disable_gc(): 94 t = threading.Thread(target=collect) 95 t.start() 96 try: 97 yield 98 finally: 99 please_stop = True 100 t.join() 101 102 103class ReferencesTestCase(TestBase): 104 105 def test_basic_ref(self): 106 self.check_basic_ref(C) 107 self.check_basic_ref(create_function) 108 self.check_basic_ref(create_bound_method) 109 110 # Just make sure the tp_repr handler doesn't raise an exception. 111 # Live reference: 112 o = C() 113 wr = weakref.ref(o) 114 repr(wr) 115 # Dead reference: 116 del o 117 repr(wr) 118 119 def test_repr_failure_gh99184(self): 120 class MyConfig(dict): 121 def __getattr__(self, x): 122 return self[x] 123 124 obj = MyConfig(offset=5) 125 obj_weakref = weakref.ref(obj) 126 127 self.assertIn('MyConfig', repr(obj_weakref)) 128 self.assertIn('MyConfig', str(obj_weakref)) 129 130 def test_basic_callback(self): 131 self.check_basic_callback(C) 132 self.check_basic_callback(create_function) 133 self.check_basic_callback(create_bound_method) 134 135 @support.cpython_only 136 def test_cfunction(self): 137 import _testcapi 138 create_cfunction = _testcapi.create_cfunction 139 f = create_cfunction() 140 wr = weakref.ref(f) 141 self.assertIs(wr(), f) 142 del f 143 self.assertIsNone(wr()) 144 self.check_basic_ref(create_cfunction) 145 self.check_basic_callback(create_cfunction) 146 147 def test_multiple_callbacks(self): 148 o = C() 149 ref1 = weakref.ref(o, self.callback) 150 ref2 = weakref.ref(o, self.callback) 151 del o 152 gc_collect() # For PyPy or other GCs. 153 self.assertIsNone(ref1(), "expected reference to be invalidated") 154 self.assertIsNone(ref2(), "expected reference to be invalidated") 155 self.assertEqual(self.cbcalled, 2, 156 "callback not called the right number of times") 157 158 def test_multiple_selfref_callbacks(self): 159 # Make sure all references are invalidated before callbacks are called 160 # 161 # What's important here is that we're using the first 162 # reference in the callback invoked on the second reference 163 # (the most recently created ref is cleaned up first). This 164 # tests that all references to the object are invalidated 165 # before any of the callbacks are invoked, so that we only 166 # have one invocation of _weakref.c:cleanup_helper() active 167 # for a particular object at a time. 168 # 169 def callback(object, self=self): 170 self.ref() 171 c = C() 172 self.ref = weakref.ref(c, callback) 173 ref1 = weakref.ref(c, callback) 174 del c 175 176 def test_constructor_kwargs(self): 177 c = C() 178 self.assertRaises(TypeError, weakref.ref, c, callback=None) 179 180 def test_proxy_ref(self): 181 o = C() 182 o.bar = 1 183 ref1 = weakref.proxy(o, self.callback) 184 ref2 = weakref.proxy(o, self.callback) 185 del o 186 gc_collect() # For PyPy or other GCs. 187 188 def check(proxy): 189 proxy.bar 190 191 self.assertRaises(ReferenceError, check, ref1) 192 self.assertRaises(ReferenceError, check, ref2) 193 ref3 = weakref.proxy(C()) 194 gc_collect() # For PyPy or other GCs. 195 self.assertRaises(ReferenceError, bool, ref3) 196 self.assertEqual(self.cbcalled, 2) 197 198 def check_basic_ref(self, factory): 199 o = factory() 200 ref = weakref.ref(o) 201 self.assertIsNotNone(ref(), 202 "weak reference to live object should be live") 203 o2 = ref() 204 self.assertIs(o, o2, 205 "<ref>() should return original object if live") 206 207 def check_basic_callback(self, factory): 208 self.cbcalled = 0 209 o = factory() 210 ref = weakref.ref(o, self.callback) 211 del o 212 gc_collect() # For PyPy or other GCs. 213 self.assertEqual(self.cbcalled, 1, 214 "callback did not properly set 'cbcalled'") 215 self.assertIsNone(ref(), 216 "ref2 should be dead after deleting object reference") 217 218 def test_ref_reuse(self): 219 o = C() 220 ref1 = weakref.ref(o) 221 # create a proxy to make sure that there's an intervening creation 222 # between these two; it should make no difference 223 proxy = weakref.proxy(o) 224 ref2 = weakref.ref(o) 225 self.assertIs(ref1, ref2, 226 "reference object w/out callback should be re-used") 227 228 o = C() 229 proxy = weakref.proxy(o) 230 ref1 = weakref.ref(o) 231 ref2 = weakref.ref(o) 232 self.assertIs(ref1, ref2, 233 "reference object w/out callback should be re-used") 234 self.assertEqual(weakref.getweakrefcount(o), 2, 235 "wrong weak ref count for object") 236 del proxy 237 gc_collect() # For PyPy or other GCs. 238 self.assertEqual(weakref.getweakrefcount(o), 1, 239 "wrong weak ref count for object after deleting proxy") 240 241 def test_proxy_reuse(self): 242 o = C() 243 proxy1 = weakref.proxy(o) 244 ref = weakref.ref(o) 245 proxy2 = weakref.proxy(o) 246 self.assertIs(proxy1, proxy2, 247 "proxy object w/out callback should have been re-used") 248 249 def test_basic_proxy(self): 250 o = C() 251 self.check_proxy(o, weakref.proxy(o)) 252 253 L = collections.UserList() 254 p = weakref.proxy(L) 255 self.assertFalse(p, "proxy for empty UserList should be false") 256 p.append(12) 257 self.assertEqual(len(L), 1) 258 self.assertTrue(p, "proxy for non-empty UserList should be true") 259 p[:] = [2, 3] 260 self.assertEqual(len(L), 2) 261 self.assertEqual(len(p), 2) 262 self.assertIn(3, p, "proxy didn't support __contains__() properly") 263 p[1] = 5 264 self.assertEqual(L[1], 5) 265 self.assertEqual(p[1], 5) 266 L2 = collections.UserList(L) 267 p2 = weakref.proxy(L2) 268 self.assertEqual(p, p2) 269 ## self.assertEqual(repr(L2), repr(p2)) 270 L3 = collections.UserList(range(10)) 271 p3 = weakref.proxy(L3) 272 self.assertEqual(L3[:], p3[:]) 273 self.assertEqual(L3[5:], p3[5:]) 274 self.assertEqual(L3[:5], p3[:5]) 275 self.assertEqual(L3[2:5], p3[2:5]) 276 277 def test_proxy_unicode(self): 278 # See bug 5037 279 class C(object): 280 def __str__(self): 281 return "string" 282 def __bytes__(self): 283 return b"bytes" 284 instance = C() 285 self.assertIn("__bytes__", dir(weakref.proxy(instance))) 286 self.assertEqual(bytes(weakref.proxy(instance)), b"bytes") 287 288 def test_proxy_index(self): 289 class C: 290 def __index__(self): 291 return 10 292 o = C() 293 p = weakref.proxy(o) 294 self.assertEqual(operator.index(p), 10) 295 296 def test_proxy_div(self): 297 class C: 298 def __floordiv__(self, other): 299 return 42 300 def __ifloordiv__(self, other): 301 return 21 302 o = C() 303 p = weakref.proxy(o) 304 self.assertEqual(p // 5, 42) 305 p //= 5 306 self.assertEqual(p, 21) 307 308 def test_proxy_matmul(self): 309 class C: 310 def __matmul__(self, other): 311 return 1729 312 def __rmatmul__(self, other): 313 return -163 314 def __imatmul__(self, other): 315 return 561 316 o = C() 317 p = weakref.proxy(o) 318 self.assertEqual(p @ 5, 1729) 319 self.assertEqual(5 @ p, -163) 320 p @= 5 321 self.assertEqual(p, 561) 322 323 # The PyWeakref_* C API is documented as allowing either NULL or 324 # None as the value for the callback, where either means "no 325 # callback". The "no callback" ref and proxy objects are supposed 326 # to be shared so long as they exist by all callers so long as 327 # they are active. In Python 2.3.3 and earlier, this guarantee 328 # was not honored, and was broken in different ways for 329 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) 330 331 def test_shared_ref_without_callback(self): 332 self.check_shared_without_callback(weakref.ref) 333 334 def test_shared_proxy_without_callback(self): 335 self.check_shared_without_callback(weakref.proxy) 336 337 def check_shared_without_callback(self, makeref): 338 o = Object(1) 339 p1 = makeref(o, None) 340 p2 = makeref(o, None) 341 self.assertIs(p1, p2, "both callbacks were None in the C API") 342 del p1, p2 343 p1 = makeref(o) 344 p2 = makeref(o, None) 345 self.assertIs(p1, p2, "callbacks were NULL, None in the C API") 346 del p1, p2 347 p1 = makeref(o) 348 p2 = makeref(o) 349 self.assertIs(p1, p2, "both callbacks were NULL in the C API") 350 del p1, p2 351 p1 = makeref(o, None) 352 p2 = makeref(o) 353 self.assertIs(p1, p2, "callbacks were None, NULL in the C API") 354 355 def test_callable_proxy(self): 356 o = Callable() 357 ref1 = weakref.proxy(o) 358 359 self.check_proxy(o, ref1) 360 361 self.assertIs(type(ref1), weakref.CallableProxyType, 362 "proxy is not of callable type") 363 ref1('twinkies!') 364 self.assertEqual(o.bar, 'twinkies!', 365 "call through proxy not passed through to original") 366 ref1(x='Splat.') 367 self.assertEqual(o.bar, 'Splat.', 368 "call through proxy not passed through to original") 369 370 # expect due to too few args 371 self.assertRaises(TypeError, ref1) 372 373 # expect due to too many args 374 self.assertRaises(TypeError, ref1, 1, 2, 3) 375 376 def check_proxy(self, o, proxy): 377 o.foo = 1 378 self.assertEqual(proxy.foo, 1, 379 "proxy does not reflect attribute addition") 380 o.foo = 2 381 self.assertEqual(proxy.foo, 2, 382 "proxy does not reflect attribute modification") 383 del o.foo 384 self.assertFalse(hasattr(proxy, 'foo'), 385 "proxy does not reflect attribute removal") 386 387 proxy.foo = 1 388 self.assertEqual(o.foo, 1, 389 "object does not reflect attribute addition via proxy") 390 proxy.foo = 2 391 self.assertEqual(o.foo, 2, 392 "object does not reflect attribute modification via proxy") 393 del proxy.foo 394 self.assertFalse(hasattr(o, 'foo'), 395 "object does not reflect attribute removal via proxy") 396 397 def test_proxy_deletion(self): 398 # Test clearing of SF bug #762891 399 class Foo: 400 result = None 401 def __delitem__(self, accessor): 402 self.result = accessor 403 g = Foo() 404 f = weakref.proxy(g) 405 del f[0] 406 self.assertEqual(f.result, 0) 407 408 def test_proxy_bool(self): 409 # Test clearing of SF bug #1170766 410 class List(list): pass 411 lyst = List() 412 self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) 413 414 def test_proxy_iter(self): 415 # Test fails with a debug build of the interpreter 416 # (see bpo-38395). 417 418 obj = None 419 420 class MyObj: 421 def __iter__(self): 422 nonlocal obj 423 del obj 424 return NotImplemented 425 426 obj = MyObj() 427 p = weakref.proxy(obj) 428 with self.assertRaises(TypeError): 429 # "blech" in p calls MyObj.__iter__ through the proxy, 430 # without keeping a reference to the real object, so it 431 # can be killed in the middle of the call 432 "blech" in p 433 434 def test_proxy_next(self): 435 arr = [4, 5, 6] 436 def iterator_func(): 437 yield from arr 438 it = iterator_func() 439 440 class IteratesWeakly: 441 def __iter__(self): 442 return weakref.proxy(it) 443 444 weak_it = IteratesWeakly() 445 446 # Calls proxy.__next__ 447 self.assertEqual(list(weak_it), [4, 5, 6]) 448 449 def test_proxy_bad_next(self): 450 # bpo-44720: PyIter_Next() shouldn't be called if the reference 451 # isn't an iterator. 452 453 not_an_iterator = lambda: 0 454 455 class A: 456 def __iter__(self): 457 return weakref.proxy(not_an_iterator) 458 a = A() 459 460 msg = "Weakref proxy referenced a non-iterator" 461 with self.assertRaisesRegex(TypeError, msg): 462 list(a) 463 464 def test_proxy_reversed(self): 465 class MyObj: 466 def __len__(self): 467 return 3 468 def __reversed__(self): 469 return iter('cba') 470 471 obj = MyObj() 472 self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba") 473 474 def test_proxy_hash(self): 475 class MyObj: 476 def __hash__(self): 477 return 42 478 479 obj = MyObj() 480 with self.assertRaises(TypeError): 481 hash(weakref.proxy(obj)) 482 483 class MyObj: 484 __hash__ = None 485 486 obj = MyObj() 487 with self.assertRaises(TypeError): 488 hash(weakref.proxy(obj)) 489 490 def test_getweakrefcount(self): 491 o = C() 492 ref1 = weakref.ref(o) 493 ref2 = weakref.ref(o, self.callback) 494 self.assertEqual(weakref.getweakrefcount(o), 2, 495 "got wrong number of weak reference objects") 496 497 proxy1 = weakref.proxy(o) 498 proxy2 = weakref.proxy(o, self.callback) 499 self.assertEqual(weakref.getweakrefcount(o), 4, 500 "got wrong number of weak reference objects") 501 502 del ref1, ref2, proxy1, proxy2 503 gc_collect() # For PyPy or other GCs. 504 self.assertEqual(weakref.getweakrefcount(o), 0, 505 "weak reference objects not unlinked from" 506 " referent when discarded.") 507 508 # assumes ints do not support weakrefs 509 self.assertEqual(weakref.getweakrefcount(1), 0, 510 "got wrong number of weak reference objects for int") 511 512 def test_getweakrefs(self): 513 o = C() 514 ref1 = weakref.ref(o, self.callback) 515 ref2 = weakref.ref(o, self.callback) 516 del ref1 517 gc_collect() # For PyPy or other GCs. 518 self.assertEqual(weakref.getweakrefs(o), [ref2], 519 "list of refs does not match") 520 521 o = C() 522 ref1 = weakref.ref(o, self.callback) 523 ref2 = weakref.ref(o, self.callback) 524 del ref2 525 gc_collect() # For PyPy or other GCs. 526 self.assertEqual(weakref.getweakrefs(o), [ref1], 527 "list of refs does not match") 528 529 del ref1 530 gc_collect() # For PyPy or other GCs. 531 self.assertEqual(weakref.getweakrefs(o), [], 532 "list of refs not cleared") 533 534 # assumes ints do not support weakrefs 535 self.assertEqual(weakref.getweakrefs(1), [], 536 "list of refs does not match for int") 537 538 def test_newstyle_number_ops(self): 539 class F(float): 540 pass 541 f = F(2.0) 542 p = weakref.proxy(f) 543 self.assertEqual(p + 1.0, 3.0) 544 self.assertEqual(1.0 + p, 3.0) # this used to SEGV 545 546 def test_callbacks_protected(self): 547 # Callbacks protected from already-set exceptions? 548 # Regression test for SF bug #478534. 549 class BogusError(Exception): 550 pass 551 data = {} 552 def remove(k): 553 del data[k] 554 def encapsulate(): 555 f = lambda : () 556 data[weakref.ref(f, remove)] = None 557 raise BogusError 558 try: 559 encapsulate() 560 except BogusError: 561 pass 562 else: 563 self.fail("exception not properly restored") 564 try: 565 encapsulate() 566 except BogusError: 567 pass 568 else: 569 self.fail("exception not properly restored") 570 571 def test_sf_bug_840829(self): 572 # "weakref callbacks and gc corrupt memory" 573 # subtype_dealloc erroneously exposed a new-style instance 574 # already in the process of getting deallocated to gc, 575 # causing double-deallocation if the instance had a weakref 576 # callback that triggered gc. 577 # If the bug exists, there probably won't be an obvious symptom 578 # in a release build. In a debug build, a segfault will occur 579 # when the second attempt to remove the instance from the "list 580 # of all objects" occurs. 581 582 import gc 583 584 class C(object): 585 pass 586 587 c = C() 588 wr = weakref.ref(c, lambda ignore: gc.collect()) 589 del c 590 591 # There endeth the first part. It gets worse. 592 del wr 593 594 c1 = C() 595 c1.i = C() 596 wr = weakref.ref(c1.i, lambda ignore: gc.collect()) 597 598 c2 = C() 599 c2.c1 = c1 600 del c1 # still alive because c2 points to it 601 602 # Now when subtype_dealloc gets called on c2, it's not enough just 603 # that c2 is immune from gc while the weakref callbacks associated 604 # with c2 execute (there are none in this 2nd half of the test, btw). 605 # subtype_dealloc goes on to call the base classes' deallocs too, 606 # so any gc triggered by weakref callbacks associated with anything 607 # torn down by a base class dealloc can also trigger double 608 # deallocation of c2. 609 del c2 610 611 def test_callback_in_cycle(self): 612 import gc 613 614 class J(object): 615 pass 616 617 class II(object): 618 def acallback(self, ignore): 619 self.J 620 621 I = II() 622 I.J = J 623 I.wr = weakref.ref(J, I.acallback) 624 625 # Now J and II are each in a self-cycle (as all new-style class 626 # objects are, since their __mro__ points back to them). I holds 627 # both a weak reference (I.wr) and a strong reference (I.J) to class 628 # J. I is also in a cycle (I.wr points to a weakref that references 629 # I.acallback). When we del these three, they all become trash, but 630 # the cycles prevent any of them from getting cleaned up immediately. 631 # Instead they have to wait for cyclic gc to deduce that they're 632 # trash. 633 # 634 # gc used to call tp_clear on all of them, and the order in which 635 # it does that is pretty accidental. The exact order in which we 636 # built up these things manages to provoke gc into running tp_clear 637 # in just the right order (I last). Calling tp_clear on II leaves 638 # behind an insane class object (its __mro__ becomes NULL). Calling 639 # tp_clear on J breaks its self-cycle, but J doesn't get deleted 640 # just then because of the strong reference from I.J. Calling 641 # tp_clear on I starts to clear I's __dict__, and just happens to 642 # clear I.J first -- I.wr is still intact. That removes the last 643 # reference to J, which triggers the weakref callback. The callback 644 # tries to do "self.J", and instances of new-style classes look up 645 # attributes ("J") in the class dict first. The class (II) wants to 646 # search II.__mro__, but that's NULL. The result was a segfault in 647 # a release build, and an assert failure in a debug build. 648 del I, J, II 649 gc.collect() 650 651 def test_callback_reachable_one_way(self): 652 import gc 653 654 # This one broke the first patch that fixed the previous test. In this case, 655 # the objects reachable from the callback aren't also reachable 656 # from the object (c1) *triggering* the callback: you can get to 657 # c1 from c2, but not vice-versa. The result was that c2's __dict__ 658 # got tp_clear'ed by the time the c2.cb callback got invoked. 659 660 class C: 661 def cb(self, ignore): 662 self.me 663 self.c1 664 self.wr 665 666 c1, c2 = C(), C() 667 668 c2.me = c2 669 c2.c1 = c1 670 c2.wr = weakref.ref(c1, c2.cb) 671 672 del c1, c2 673 gc.collect() 674 675 def test_callback_different_classes(self): 676 import gc 677 678 # Like test_callback_reachable_one_way, except c2 and c1 have different 679 # classes. c2's class (C) isn't reachable from c1 then, so protecting 680 # objects reachable from the dying object (c1) isn't enough to stop 681 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. 682 # The result was a segfault (C.__mro__ was NULL when the callback 683 # tried to look up self.me). 684 685 class C(object): 686 def cb(self, ignore): 687 self.me 688 self.c1 689 self.wr 690 691 class D: 692 pass 693 694 c1, c2 = D(), C() 695 696 c2.me = c2 697 c2.c1 = c1 698 c2.wr = weakref.ref(c1, c2.cb) 699 700 del c1, c2, C, D 701 gc.collect() 702 703 def test_callback_in_cycle_resurrection(self): 704 import gc 705 706 # Do something nasty in a weakref callback: resurrect objects 707 # from dead cycles. For this to be attempted, the weakref and 708 # its callback must also be part of the cyclic trash (else the 709 # objects reachable via the callback couldn't be in cyclic trash 710 # to begin with -- the callback would act like an external root). 711 # But gc clears trash weakrefs with callbacks early now, which 712 # disables the callbacks, so the callbacks shouldn't get called 713 # at all (and so nothing actually gets resurrected). 714 715 alist = [] 716 class C(object): 717 def __init__(self, value): 718 self.attribute = value 719 720 def acallback(self, ignore): 721 alist.append(self.c) 722 723 c1, c2 = C(1), C(2) 724 c1.c = c2 725 c2.c = c1 726 c1.wr = weakref.ref(c2, c1.acallback) 727 c2.wr = weakref.ref(c1, c2.acallback) 728 729 def C_went_away(ignore): 730 alist.append("C went away") 731 wr = weakref.ref(C, C_went_away) 732 733 del c1, c2, C # make them all trash 734 self.assertEqual(alist, []) # del isn't enough to reclaim anything 735 736 gc.collect() 737 # c1.wr and c2.wr were part of the cyclic trash, so should have 738 # been cleared without their callbacks executing. OTOH, the weakref 739 # to C is bound to a function local (wr), and wasn't trash, so that 740 # callback should have been invoked when C went away. 741 self.assertEqual(alist, ["C went away"]) 742 # The remaining weakref should be dead now (its callback ran). 743 self.assertEqual(wr(), None) 744 745 del alist[:] 746 gc.collect() 747 self.assertEqual(alist, []) 748 749 def test_callbacks_on_callback(self): 750 import gc 751 752 # Set up weakref callbacks *on* weakref callbacks. 753 alist = [] 754 def safe_callback(ignore): 755 alist.append("safe_callback called") 756 757 class C(object): 758 def cb(self, ignore): 759 alist.append("cb called") 760 761 c, d = C(), C() 762 c.other = d 763 d.other = c 764 callback = c.cb 765 c.wr = weakref.ref(d, callback) # this won't trigger 766 d.wr = weakref.ref(callback, d.cb) # ditto 767 external_wr = weakref.ref(callback, safe_callback) # but this will 768 self.assertIs(external_wr(), callback) 769 770 # The weakrefs attached to c and d should get cleared, so that 771 # C.cb is never called. But external_wr isn't part of the cyclic 772 # trash, and no cyclic trash is reachable from it, so safe_callback 773 # should get invoked when the bound method object callback (c.cb) 774 # -- which is itself a callback, and also part of the cyclic trash -- 775 # gets reclaimed at the end of gc. 776 777 del callback, c, d, C 778 self.assertEqual(alist, []) # del isn't enough to clean up cycles 779 gc.collect() 780 self.assertEqual(alist, ["safe_callback called"]) 781 self.assertEqual(external_wr(), None) 782 783 del alist[:] 784 gc.collect() 785 self.assertEqual(alist, []) 786 787 def test_gc_during_ref_creation(self): 788 self.check_gc_during_creation(weakref.ref) 789 790 def test_gc_during_proxy_creation(self): 791 self.check_gc_during_creation(weakref.proxy) 792 793 def check_gc_during_creation(self, makeref): 794 thresholds = gc.get_threshold() 795 gc.set_threshold(1, 1, 1) 796 gc.collect() 797 class A: 798 pass 799 800 def callback(*args): 801 pass 802 803 referenced = A() 804 805 a = A() 806 a.a = a 807 a.wr = makeref(referenced) 808 809 try: 810 # now make sure the object and the ref get labeled as 811 # cyclic trash: 812 a = A() 813 weakref.ref(referenced, callback) 814 815 finally: 816 gc.set_threshold(*thresholds) 817 818 def test_ref_created_during_del(self): 819 # Bug #1377858 820 # A weakref created in an object's __del__() would crash the 821 # interpreter when the weakref was cleaned up since it would refer to 822 # non-existent memory. This test should not segfault the interpreter. 823 class Target(object): 824 def __del__(self): 825 global ref_from_del 826 ref_from_del = weakref.ref(self) 827 828 w = Target() 829 830 def test_init(self): 831 # Issue 3634 832 # <weakref to class>.__init__() doesn't check errors correctly 833 r = weakref.ref(Exception) 834 self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0) 835 # No exception should be raised here 836 gc.collect() 837 838 def test_classes(self): 839 # Check that classes are weakrefable. 840 class A(object): 841 pass 842 l = [] 843 weakref.ref(int) 844 a = weakref.ref(A, l.append) 845 A = None 846 gc.collect() 847 self.assertEqual(a(), None) 848 self.assertEqual(l, [a]) 849 850 def test_equality(self): 851 # Alive weakrefs defer equality testing to their underlying object. 852 x = Object(1) 853 y = Object(1) 854 z = Object(2) 855 a = weakref.ref(x) 856 b = weakref.ref(y) 857 c = weakref.ref(z) 858 d = weakref.ref(x) 859 # Note how we directly test the operators here, to stress both 860 # __eq__ and __ne__. 861 self.assertTrue(a == b) 862 self.assertFalse(a != b) 863 self.assertFalse(a == c) 864 self.assertTrue(a != c) 865 self.assertTrue(a == d) 866 self.assertFalse(a != d) 867 self.assertFalse(a == x) 868 self.assertTrue(a != x) 869 self.assertTrue(a == ALWAYS_EQ) 870 self.assertFalse(a != ALWAYS_EQ) 871 del x, y, z 872 gc.collect() 873 for r in a, b, c: 874 # Sanity check 875 self.assertIs(r(), None) 876 # Dead weakrefs compare by identity: whether `a` and `d` are the 877 # same weakref object is an implementation detail, since they pointed 878 # to the same original object and didn't have a callback. 879 # (see issue #16453). 880 self.assertFalse(a == b) 881 self.assertTrue(a != b) 882 self.assertFalse(a == c) 883 self.assertTrue(a != c) 884 self.assertEqual(a == d, a is d) 885 self.assertEqual(a != d, a is not d) 886 887 def test_ordering(self): 888 # weakrefs cannot be ordered, even if the underlying objects can. 889 ops = [operator.lt, operator.gt, operator.le, operator.ge] 890 x = Object(1) 891 y = Object(1) 892 a = weakref.ref(x) 893 b = weakref.ref(y) 894 for op in ops: 895 self.assertRaises(TypeError, op, a, b) 896 # Same when dead. 897 del x, y 898 gc.collect() 899 for op in ops: 900 self.assertRaises(TypeError, op, a, b) 901 902 def test_hashing(self): 903 # Alive weakrefs hash the same as the underlying object 904 x = Object(42) 905 y = Object(42) 906 a = weakref.ref(x) 907 b = weakref.ref(y) 908 self.assertEqual(hash(a), hash(42)) 909 del x, y 910 gc.collect() 911 # Dead weakrefs: 912 # - retain their hash is they were hashed when alive; 913 # - otherwise, cannot be hashed. 914 self.assertEqual(hash(a), hash(42)) 915 self.assertRaises(TypeError, hash, b) 916 917 def test_trashcan_16602(self): 918 # Issue #16602: when a weakref's target was part of a long 919 # deallocation chain, the trashcan mechanism could delay clearing 920 # of the weakref and make the target object visible from outside 921 # code even though its refcount had dropped to 0. A crash ensued. 922 class C: 923 def __init__(self, parent): 924 if not parent: 925 return 926 wself = weakref.ref(self) 927 def cb(wparent): 928 o = wself() 929 self.wparent = weakref.ref(parent, cb) 930 931 d = weakref.WeakKeyDictionary() 932 root = c = C(None) 933 for n in range(100): 934 d[c] = c = C(c) 935 del root 936 gc.collect() 937 938 def test_callback_attribute(self): 939 x = Object(1) 940 callback = lambda ref: None 941 ref1 = weakref.ref(x, callback) 942 self.assertIs(ref1.__callback__, callback) 943 944 ref2 = weakref.ref(x) 945 self.assertIsNone(ref2.__callback__) 946 947 def test_callback_attribute_after_deletion(self): 948 x = Object(1) 949 ref = weakref.ref(x, self.callback) 950 self.assertIsNotNone(ref.__callback__) 951 del x 952 support.gc_collect() 953 self.assertIsNone(ref.__callback__) 954 955 def test_set_callback_attribute(self): 956 x = Object(1) 957 callback = lambda ref: None 958 ref1 = weakref.ref(x, callback) 959 with self.assertRaises(AttributeError): 960 ref1.__callback__ = lambda ref: None 961 962 def test_callback_gcs(self): 963 class ObjectWithDel(Object): 964 def __del__(self): pass 965 x = ObjectWithDel(1) 966 ref1 = weakref.ref(x, lambda ref: support.gc_collect()) 967 del x 968 support.gc_collect() 969 970 971class SubclassableWeakrefTestCase(TestBase): 972 973 def test_subclass_refs(self): 974 class MyRef(weakref.ref): 975 def __init__(self, ob, callback=None, value=42): 976 self.value = value 977 super().__init__(ob, callback) 978 def __call__(self): 979 self.called = True 980 return super().__call__() 981 o = Object("foo") 982 mr = MyRef(o, value=24) 983 self.assertIs(mr(), o) 984 self.assertTrue(mr.called) 985 self.assertEqual(mr.value, 24) 986 del o 987 gc_collect() # For PyPy or other GCs. 988 self.assertIsNone(mr()) 989 self.assertTrue(mr.called) 990 991 def test_subclass_refs_dont_replace_standard_refs(self): 992 class MyRef(weakref.ref): 993 pass 994 o = Object(42) 995 r1 = MyRef(o) 996 r2 = weakref.ref(o) 997 self.assertIsNot(r1, r2) 998 self.assertEqual(weakref.getweakrefs(o), [r2, r1]) 999 self.assertEqual(weakref.getweakrefcount(o), 2) 1000 r3 = MyRef(o) 1001 self.assertEqual(weakref.getweakrefcount(o), 3) 1002 refs = weakref.getweakrefs(o) 1003 self.assertEqual(len(refs), 3) 1004 self.assertIs(r2, refs[0]) 1005 self.assertIn(r1, refs[1:]) 1006 self.assertIn(r3, refs[1:]) 1007 1008 def test_subclass_refs_dont_conflate_callbacks(self): 1009 class MyRef(weakref.ref): 1010 pass 1011 o = Object(42) 1012 r1 = MyRef(o, id) 1013 r2 = MyRef(o, str) 1014 self.assertIsNot(r1, r2) 1015 refs = weakref.getweakrefs(o) 1016 self.assertIn(r1, refs) 1017 self.assertIn(r2, refs) 1018 1019 def test_subclass_refs_with_slots(self): 1020 class MyRef(weakref.ref): 1021 __slots__ = "slot1", "slot2" 1022 def __new__(type, ob, callback, slot1, slot2): 1023 return weakref.ref.__new__(type, ob, callback) 1024 def __init__(self, ob, callback, slot1, slot2): 1025 self.slot1 = slot1 1026 self.slot2 = slot2 1027 def meth(self): 1028 return self.slot1 + self.slot2 1029 o = Object(42) 1030 r = MyRef(o, None, "abc", "def") 1031 self.assertEqual(r.slot1, "abc") 1032 self.assertEqual(r.slot2, "def") 1033 self.assertEqual(r.meth(), "abcdef") 1034 self.assertFalse(hasattr(r, "__dict__")) 1035 1036 def test_subclass_refs_with_cycle(self): 1037 """Confirm https://bugs.python.org/issue3100 is fixed.""" 1038 # An instance of a weakref subclass can have attributes. 1039 # If such a weakref holds the only strong reference to the object, 1040 # deleting the weakref will delete the object. In this case, 1041 # the callback must not be called, because the ref object is 1042 # being deleted. 1043 class MyRef(weakref.ref): 1044 pass 1045 1046 # Use a local callback, for "regrtest -R::" 1047 # to detect refcounting problems 1048 def callback(w): 1049 self.cbcalled += 1 1050 1051 o = C() 1052 r1 = MyRef(o, callback) 1053 r1.o = o 1054 del o 1055 1056 del r1 # Used to crash here 1057 1058 self.assertEqual(self.cbcalled, 0) 1059 1060 # Same test, with two weakrefs to the same object 1061 # (since code paths are different) 1062 o = C() 1063 r1 = MyRef(o, callback) 1064 r2 = MyRef(o, callback) 1065 r1.r = r2 1066 r2.o = o 1067 del o 1068 del r2 1069 1070 del r1 # Used to crash here 1071 1072 self.assertEqual(self.cbcalled, 0) 1073 1074 1075class WeakMethodTestCase(unittest.TestCase): 1076 1077 def _subclass(self): 1078 """Return an Object subclass overriding `some_method`.""" 1079 class C(Object): 1080 def some_method(self): 1081 return 6 1082 return C 1083 1084 def test_alive(self): 1085 o = Object(1) 1086 r = weakref.WeakMethod(o.some_method) 1087 self.assertIsInstance(r, weakref.ReferenceType) 1088 self.assertIsInstance(r(), type(o.some_method)) 1089 self.assertIs(r().__self__, o) 1090 self.assertIs(r().__func__, o.some_method.__func__) 1091 self.assertEqual(r()(), 4) 1092 1093 def test_object_dead(self): 1094 o = Object(1) 1095 r = weakref.WeakMethod(o.some_method) 1096 del o 1097 gc.collect() 1098 self.assertIs(r(), None) 1099 1100 def test_method_dead(self): 1101 C = self._subclass() 1102 o = C(1) 1103 r = weakref.WeakMethod(o.some_method) 1104 del C.some_method 1105 gc.collect() 1106 self.assertIs(r(), None) 1107 1108 def test_callback_when_object_dead(self): 1109 # Test callback behaviour when object dies first. 1110 C = self._subclass() 1111 calls = [] 1112 def cb(arg): 1113 calls.append(arg) 1114 o = C(1) 1115 r = weakref.WeakMethod(o.some_method, cb) 1116 del o 1117 gc.collect() 1118 self.assertEqual(calls, [r]) 1119 # Callback is only called once. 1120 C.some_method = Object.some_method 1121 gc.collect() 1122 self.assertEqual(calls, [r]) 1123 1124 def test_callback_when_method_dead(self): 1125 # Test callback behaviour when method dies first. 1126 C = self._subclass() 1127 calls = [] 1128 def cb(arg): 1129 calls.append(arg) 1130 o = C(1) 1131 r = weakref.WeakMethod(o.some_method, cb) 1132 del C.some_method 1133 gc.collect() 1134 self.assertEqual(calls, [r]) 1135 # Callback is only called once. 1136 del o 1137 gc.collect() 1138 self.assertEqual(calls, [r]) 1139 1140 @support.cpython_only 1141 def test_no_cycles(self): 1142 # A WeakMethod doesn't create any reference cycle to itself. 1143 o = Object(1) 1144 def cb(_): 1145 pass 1146 r = weakref.WeakMethod(o.some_method, cb) 1147 wr = weakref.ref(r) 1148 del r 1149 self.assertIs(wr(), None) 1150 1151 def test_equality(self): 1152 def _eq(a, b): 1153 self.assertTrue(a == b) 1154 self.assertFalse(a != b) 1155 def _ne(a, b): 1156 self.assertTrue(a != b) 1157 self.assertFalse(a == b) 1158 x = Object(1) 1159 y = Object(1) 1160 a = weakref.WeakMethod(x.some_method) 1161 b = weakref.WeakMethod(y.some_method) 1162 c = weakref.WeakMethod(x.other_method) 1163 d = weakref.WeakMethod(y.other_method) 1164 # Objects equal, same method 1165 _eq(a, b) 1166 _eq(c, d) 1167 # Objects equal, different method 1168 _ne(a, c) 1169 _ne(a, d) 1170 _ne(b, c) 1171 _ne(b, d) 1172 # Objects unequal, same or different method 1173 z = Object(2) 1174 e = weakref.WeakMethod(z.some_method) 1175 f = weakref.WeakMethod(z.other_method) 1176 _ne(a, e) 1177 _ne(a, f) 1178 _ne(b, e) 1179 _ne(b, f) 1180 # Compare with different types 1181 _ne(a, x.some_method) 1182 _eq(a, ALWAYS_EQ) 1183 del x, y, z 1184 gc.collect() 1185 # Dead WeakMethods compare by identity 1186 refs = a, b, c, d, e, f 1187 for q in refs: 1188 for r in refs: 1189 self.assertEqual(q == r, q is r) 1190 self.assertEqual(q != r, q is not r) 1191 1192 def test_hashing(self): 1193 # Alive WeakMethods are hashable if the underlying object is 1194 # hashable. 1195 x = Object(1) 1196 y = Object(1) 1197 a = weakref.WeakMethod(x.some_method) 1198 b = weakref.WeakMethod(y.some_method) 1199 c = weakref.WeakMethod(y.other_method) 1200 # Since WeakMethod objects are equal, the hashes should be equal. 1201 self.assertEqual(hash(a), hash(b)) 1202 ha = hash(a) 1203 # Dead WeakMethods retain their old hash value 1204 del x, y 1205 gc.collect() 1206 self.assertEqual(hash(a), ha) 1207 self.assertEqual(hash(b), ha) 1208 # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed. 1209 self.assertRaises(TypeError, hash, c) 1210 1211 1212class MappingTestCase(TestBase): 1213 1214 COUNT = 10 1215 1216 def check_len_cycles(self, dict_type, cons): 1217 N = 20 1218 items = [RefCycle() for i in range(N)] 1219 dct = dict_type(cons(o) for o in items) 1220 # Keep an iterator alive 1221 it = dct.items() 1222 try: 1223 next(it) 1224 except StopIteration: 1225 pass 1226 del items 1227 gc.collect() 1228 n1 = len(dct) 1229 del it 1230 gc.collect() 1231 n2 = len(dct) 1232 # one item may be kept alive inside the iterator 1233 self.assertIn(n1, (0, 1)) 1234 self.assertEqual(n2, 0) 1235 1236 def test_weak_keyed_len_cycles(self): 1237 self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1238 1239 def test_weak_valued_len_cycles(self): 1240 self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k)) 1241 1242 def check_len_race(self, dict_type, cons): 1243 # Extended sanity checks for len() in the face of cyclic collection 1244 self.addCleanup(gc.set_threshold, *gc.get_threshold()) 1245 for th in range(1, 100): 1246 N = 20 1247 gc.collect(0) 1248 gc.set_threshold(th, th, th) 1249 items = [RefCycle() for i in range(N)] 1250 dct = dict_type(cons(o) for o in items) 1251 del items 1252 # All items will be collected at next garbage collection pass 1253 it = dct.items() 1254 try: 1255 next(it) 1256 except StopIteration: 1257 pass 1258 n1 = len(dct) 1259 del it 1260 n2 = len(dct) 1261 self.assertGreaterEqual(n1, 0) 1262 self.assertLessEqual(n1, N) 1263 self.assertGreaterEqual(n2, 0) 1264 self.assertLessEqual(n2, n1) 1265 1266 def test_weak_keyed_len_race(self): 1267 self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1268 1269 def test_weak_valued_len_race(self): 1270 self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k)) 1271 1272 def test_weak_values(self): 1273 # 1274 # This exercises d.copy(), d.items(), d[], del d[], len(d). 1275 # 1276 dict, objects = self.make_weak_valued_dict() 1277 for o in objects: 1278 self.assertEqual(weakref.getweakrefcount(o), 1) 1279 self.assertIs(o, dict[o.arg], 1280 "wrong object returned by weak dict!") 1281 items1 = list(dict.items()) 1282 items2 = list(dict.copy().items()) 1283 items1.sort() 1284 items2.sort() 1285 self.assertEqual(items1, items2, 1286 "cloning of weak-valued dictionary did not work!") 1287 del items1, items2 1288 self.assertEqual(len(dict), self.COUNT) 1289 del objects[0] 1290 gc_collect() # For PyPy or other GCs. 1291 self.assertEqual(len(dict), self.COUNT - 1, 1292 "deleting object did not cause dictionary update") 1293 del objects, o 1294 gc_collect() # For PyPy or other GCs. 1295 self.assertEqual(len(dict), 0, 1296 "deleting the values did not clear the dictionary") 1297 # regression on SF bug #447152: 1298 dict = weakref.WeakValueDictionary() 1299 self.assertRaises(KeyError, dict.__getitem__, 1) 1300 dict[2] = C() 1301 gc_collect() # For PyPy or other GCs. 1302 self.assertRaises(KeyError, dict.__getitem__, 2) 1303 1304 def test_weak_keys(self): 1305 # 1306 # This exercises d.copy(), d.items(), d[] = v, d[], del d[], 1307 # len(d), k in d. 1308 # 1309 dict, objects = self.make_weak_keyed_dict() 1310 for o in objects: 1311 self.assertEqual(weakref.getweakrefcount(o), 1, 1312 "wrong number of weak references to %r!" % o) 1313 self.assertIs(o.arg, dict[o], 1314 "wrong object returned by weak dict!") 1315 items1 = dict.items() 1316 items2 = dict.copy().items() 1317 self.assertEqual(set(items1), set(items2), 1318 "cloning of weak-keyed dictionary did not work!") 1319 del items1, items2 1320 self.assertEqual(len(dict), self.COUNT) 1321 del objects[0] 1322 gc_collect() # For PyPy or other GCs. 1323 self.assertEqual(len(dict), (self.COUNT - 1), 1324 "deleting object did not cause dictionary update") 1325 del objects, o 1326 gc_collect() # For PyPy or other GCs. 1327 self.assertEqual(len(dict), 0, 1328 "deleting the keys did not clear the dictionary") 1329 o = Object(42) 1330 dict[o] = "What is the meaning of the universe?" 1331 self.assertIn(o, dict) 1332 self.assertNotIn(34, dict) 1333 1334 def test_weak_keyed_iters(self): 1335 dict, objects = self.make_weak_keyed_dict() 1336 self.check_iters(dict) 1337 1338 # Test keyrefs() 1339 refs = dict.keyrefs() 1340 self.assertEqual(len(refs), len(objects)) 1341 objects2 = list(objects) 1342 for wr in refs: 1343 ob = wr() 1344 self.assertIn(ob, dict) 1345 self.assertIn(ob, dict) 1346 self.assertEqual(ob.arg, dict[ob]) 1347 objects2.remove(ob) 1348 self.assertEqual(len(objects2), 0) 1349 1350 # Test iterkeyrefs() 1351 objects2 = list(objects) 1352 self.assertEqual(len(list(dict.keyrefs())), len(objects)) 1353 for wr in dict.keyrefs(): 1354 ob = wr() 1355 self.assertIn(ob, dict) 1356 self.assertIn(ob, dict) 1357 self.assertEqual(ob.arg, dict[ob]) 1358 objects2.remove(ob) 1359 self.assertEqual(len(objects2), 0) 1360 1361 def test_weak_valued_iters(self): 1362 dict, objects = self.make_weak_valued_dict() 1363 self.check_iters(dict) 1364 1365 # Test valuerefs() 1366 refs = dict.valuerefs() 1367 self.assertEqual(len(refs), len(objects)) 1368 objects2 = list(objects) 1369 for wr in refs: 1370 ob = wr() 1371 self.assertEqual(ob, dict[ob.arg]) 1372 self.assertEqual(ob.arg, dict[ob.arg].arg) 1373 objects2.remove(ob) 1374 self.assertEqual(len(objects2), 0) 1375 1376 # Test itervaluerefs() 1377 objects2 = list(objects) 1378 self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) 1379 for wr in dict.itervaluerefs(): 1380 ob = wr() 1381 self.assertEqual(ob, dict[ob.arg]) 1382 self.assertEqual(ob.arg, dict[ob.arg].arg) 1383 objects2.remove(ob) 1384 self.assertEqual(len(objects2), 0) 1385 1386 def check_iters(self, dict): 1387 # item iterator: 1388 items = list(dict.items()) 1389 for item in dict.items(): 1390 items.remove(item) 1391 self.assertFalse(items, "items() did not touch all items") 1392 1393 # key iterator, via __iter__(): 1394 keys = list(dict.keys()) 1395 for k in dict: 1396 keys.remove(k) 1397 self.assertFalse(keys, "__iter__() did not touch all keys") 1398 1399 # key iterator, via iterkeys(): 1400 keys = list(dict.keys()) 1401 for k in dict.keys(): 1402 keys.remove(k) 1403 self.assertFalse(keys, "iterkeys() did not touch all keys") 1404 1405 # value iterator: 1406 values = list(dict.values()) 1407 for v in dict.values(): 1408 values.remove(v) 1409 self.assertFalse(values, 1410 "itervalues() did not touch all values") 1411 1412 def check_weak_destroy_while_iterating(self, dict, objects, iter_name): 1413 n = len(dict) 1414 it = iter(getattr(dict, iter_name)()) 1415 next(it) # Trigger internal iteration 1416 # Destroy an object 1417 del objects[-1] 1418 gc.collect() # just in case 1419 # We have removed either the first consumed object, or another one 1420 self.assertIn(len(list(it)), [len(objects), len(objects) - 1]) 1421 del it 1422 # The removal has been committed 1423 self.assertEqual(len(dict), n - 1) 1424 1425 def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext): 1426 # Check that we can explicitly mutate the weak dict without 1427 # interfering with delayed removal. 1428 # `testcontext` should create an iterator, destroy one of the 1429 # weakref'ed objects and then return a new key/value pair corresponding 1430 # to the destroyed object. 1431 with testcontext() as (k, v): 1432 self.assertNotIn(k, dict) 1433 with testcontext() as (k, v): 1434 self.assertRaises(KeyError, dict.__delitem__, k) 1435 self.assertNotIn(k, dict) 1436 with testcontext() as (k, v): 1437 self.assertRaises(KeyError, dict.pop, k) 1438 self.assertNotIn(k, dict) 1439 with testcontext() as (k, v): 1440 dict[k] = v 1441 self.assertEqual(dict[k], v) 1442 ddict = copy.copy(dict) 1443 with testcontext() as (k, v): 1444 dict.update(ddict) 1445 self.assertEqual(dict, ddict) 1446 with testcontext() as (k, v): 1447 dict.clear() 1448 self.assertEqual(len(dict), 0) 1449 1450 def check_weak_del_and_len_while_iterating(self, dict, testcontext): 1451 # Check that len() works when both iterating and removing keys 1452 # explicitly through various means (.pop(), .clear()...), while 1453 # implicit mutation is deferred because an iterator is alive. 1454 # (each call to testcontext() should schedule one item for removal 1455 # for this test to work properly) 1456 o = Object(123456) 1457 with testcontext(): 1458 n = len(dict) 1459 # Since underlying dict is ordered, first item is popped 1460 dict.pop(next(dict.keys())) 1461 self.assertEqual(len(dict), n - 1) 1462 dict[o] = o 1463 self.assertEqual(len(dict), n) 1464 # last item in objects is removed from dict in context shutdown 1465 with testcontext(): 1466 self.assertEqual(len(dict), n - 1) 1467 # Then, (o, o) is popped 1468 dict.popitem() 1469 self.assertEqual(len(dict), n - 2) 1470 with testcontext(): 1471 self.assertEqual(len(dict), n - 3) 1472 del dict[next(dict.keys())] 1473 self.assertEqual(len(dict), n - 4) 1474 with testcontext(): 1475 self.assertEqual(len(dict), n - 5) 1476 dict.popitem() 1477 self.assertEqual(len(dict), n - 6) 1478 with testcontext(): 1479 dict.clear() 1480 self.assertEqual(len(dict), 0) 1481 self.assertEqual(len(dict), 0) 1482 1483 def test_weak_keys_destroy_while_iterating(self): 1484 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1485 dict, objects = self.make_weak_keyed_dict() 1486 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1487 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1488 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1489 self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') 1490 dict, objects = self.make_weak_keyed_dict() 1491 @contextlib.contextmanager 1492 def testcontext(): 1493 try: 1494 it = iter(dict.items()) 1495 next(it) 1496 # Schedule a key/value for removal and recreate it 1497 v = objects.pop().arg 1498 gc.collect() # just in case 1499 yield Object(v), v 1500 finally: 1501 it = None # should commit all removals 1502 gc.collect() 1503 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1504 # Issue #21173: len() fragile when keys are both implicitly and 1505 # explicitly removed. 1506 dict, objects = self.make_weak_keyed_dict() 1507 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1508 1509 def test_weak_values_destroy_while_iterating(self): 1510 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1511 dict, objects = self.make_weak_valued_dict() 1512 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1513 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1514 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1515 self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') 1516 self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') 1517 dict, objects = self.make_weak_valued_dict() 1518 @contextlib.contextmanager 1519 def testcontext(): 1520 try: 1521 it = iter(dict.items()) 1522 next(it) 1523 # Schedule a key/value for removal and recreate it 1524 k = objects.pop().arg 1525 gc.collect() # just in case 1526 yield k, Object(k) 1527 finally: 1528 it = None # should commit all removals 1529 gc.collect() 1530 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1531 dict, objects = self.make_weak_valued_dict() 1532 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1533 1534 def test_make_weak_keyed_dict_from_dict(self): 1535 o = Object(3) 1536 dict = weakref.WeakKeyDictionary({o:364}) 1537 self.assertEqual(dict[o], 364) 1538 1539 def test_make_weak_keyed_dict_from_weak_keyed_dict(self): 1540 o = Object(3) 1541 dict = weakref.WeakKeyDictionary({o:364}) 1542 dict2 = weakref.WeakKeyDictionary(dict) 1543 self.assertEqual(dict[o], 364) 1544 1545 def make_weak_keyed_dict(self): 1546 dict = weakref.WeakKeyDictionary() 1547 objects = list(map(Object, range(self.COUNT))) 1548 for o in objects: 1549 dict[o] = o.arg 1550 return dict, objects 1551 1552 def test_make_weak_valued_dict_from_dict(self): 1553 o = Object(3) 1554 dict = weakref.WeakValueDictionary({364:o}) 1555 self.assertEqual(dict[364], o) 1556 1557 def test_make_weak_valued_dict_from_weak_valued_dict(self): 1558 o = Object(3) 1559 dict = weakref.WeakValueDictionary({364:o}) 1560 dict2 = weakref.WeakValueDictionary(dict) 1561 self.assertEqual(dict[364], o) 1562 1563 def test_make_weak_valued_dict_misc(self): 1564 # errors 1565 self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__) 1566 self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {}) 1567 self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ()) 1568 # special keyword arguments 1569 o = Object(3) 1570 for kw in 'self', 'dict', 'other', 'iterable': 1571 d = weakref.WeakValueDictionary(**{kw: o}) 1572 self.assertEqual(list(d.keys()), [kw]) 1573 self.assertEqual(d[kw], o) 1574 1575 def make_weak_valued_dict(self): 1576 dict = weakref.WeakValueDictionary() 1577 objects = list(map(Object, range(self.COUNT))) 1578 for o in objects: 1579 dict[o.arg] = o 1580 return dict, objects 1581 1582 def check_popitem(self, klass, key1, value1, key2, value2): 1583 weakdict = klass() 1584 weakdict[key1] = value1 1585 weakdict[key2] = value2 1586 self.assertEqual(len(weakdict), 2) 1587 k, v = weakdict.popitem() 1588 self.assertEqual(len(weakdict), 1) 1589 if k is key1: 1590 self.assertIs(v, value1) 1591 else: 1592 self.assertIs(v, value2) 1593 k, v = weakdict.popitem() 1594 self.assertEqual(len(weakdict), 0) 1595 if k is key1: 1596 self.assertIs(v, value1) 1597 else: 1598 self.assertIs(v, value2) 1599 1600 def test_weak_valued_dict_popitem(self): 1601 self.check_popitem(weakref.WeakValueDictionary, 1602 "key1", C(), "key2", C()) 1603 1604 def test_weak_keyed_dict_popitem(self): 1605 self.check_popitem(weakref.WeakKeyDictionary, 1606 C(), "value 1", C(), "value 2") 1607 1608 def check_setdefault(self, klass, key, value1, value2): 1609 self.assertIsNot(value1, value2, 1610 "invalid test" 1611 " -- value parameters must be distinct objects") 1612 weakdict = klass() 1613 o = weakdict.setdefault(key, value1) 1614 self.assertIs(o, value1) 1615 self.assertIn(key, weakdict) 1616 self.assertIs(weakdict.get(key), value1) 1617 self.assertIs(weakdict[key], value1) 1618 1619 o = weakdict.setdefault(key, value2) 1620 self.assertIs(o, value1) 1621 self.assertIn(key, weakdict) 1622 self.assertIs(weakdict.get(key), value1) 1623 self.assertIs(weakdict[key], value1) 1624 1625 def test_weak_valued_dict_setdefault(self): 1626 self.check_setdefault(weakref.WeakValueDictionary, 1627 "key", C(), C()) 1628 1629 def test_weak_keyed_dict_setdefault(self): 1630 self.check_setdefault(weakref.WeakKeyDictionary, 1631 C(), "value 1", "value 2") 1632 1633 def check_update(self, klass, dict): 1634 # 1635 # This exercises d.update(), len(d), d.keys(), k in d, 1636 # d.get(), d[]. 1637 # 1638 weakdict = klass() 1639 weakdict.update(dict) 1640 self.assertEqual(len(weakdict), len(dict)) 1641 for k in weakdict.keys(): 1642 self.assertIn(k, dict, "mysterious new key appeared in weak dict") 1643 v = dict.get(k) 1644 self.assertIs(v, weakdict[k]) 1645 self.assertIs(v, weakdict.get(k)) 1646 for k in dict.keys(): 1647 self.assertIn(k, weakdict, "original key disappeared in weak dict") 1648 v = dict[k] 1649 self.assertIs(v, weakdict[k]) 1650 self.assertIs(v, weakdict.get(k)) 1651 1652 def test_weak_valued_dict_update(self): 1653 self.check_update(weakref.WeakValueDictionary, 1654 {1: C(), 'a': C(), C(): C()}) 1655 # errors 1656 self.assertRaises(TypeError, weakref.WeakValueDictionary.update) 1657 d = weakref.WeakValueDictionary() 1658 self.assertRaises(TypeError, d.update, {}, {}) 1659 self.assertRaises(TypeError, d.update, (), ()) 1660 self.assertEqual(list(d.keys()), []) 1661 # special keyword arguments 1662 o = Object(3) 1663 for kw in 'self', 'dict', 'other', 'iterable': 1664 d = weakref.WeakValueDictionary() 1665 d.update(**{kw: o}) 1666 self.assertEqual(list(d.keys()), [kw]) 1667 self.assertEqual(d[kw], o) 1668 1669 def test_weak_valued_union_operators(self): 1670 a = C() 1671 b = C() 1672 c = C() 1673 wvd1 = weakref.WeakValueDictionary({1: a}) 1674 wvd2 = weakref.WeakValueDictionary({1: b, 2: a}) 1675 wvd3 = wvd1.copy() 1676 d1 = {1: c, 3: b} 1677 pairs = [(5, c), (6, b)] 1678 1679 tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries 1680 self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2)) 1681 self.assertIs(type(tmp1), weakref.WeakValueDictionary) 1682 wvd1 |= wvd2 1683 self.assertEqual(wvd1, tmp1) 1684 1685 tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping 1686 self.assertEqual(dict(tmp2), dict(wvd2) | d1) 1687 self.assertIs(type(tmp2), weakref.WeakValueDictionary) 1688 wvd2 |= d1 1689 self.assertEqual(wvd2, tmp2) 1690 1691 tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value 1692 tmp3 |= pairs 1693 self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs)) 1694 self.assertIs(type(tmp3), weakref.WeakValueDictionary) 1695 1696 tmp4 = d1 | wvd3 # Testing .__ror__ 1697 self.assertEqual(dict(tmp4), d1 | dict(wvd3)) 1698 self.assertIs(type(tmp4), weakref.WeakValueDictionary) 1699 1700 del a 1701 self.assertNotIn(2, tmp1) 1702 self.assertNotIn(2, tmp2) 1703 self.assertNotIn(1, tmp3) 1704 self.assertNotIn(1, tmp4) 1705 1706 def test_weak_keyed_dict_update(self): 1707 self.check_update(weakref.WeakKeyDictionary, 1708 {C(): 1, C(): 2, C(): 3}) 1709 1710 def test_weak_keyed_delitem(self): 1711 d = weakref.WeakKeyDictionary() 1712 o1 = Object('1') 1713 o2 = Object('2') 1714 d[o1] = 'something' 1715 d[o2] = 'something' 1716 self.assertEqual(len(d), 2) 1717 del d[o1] 1718 self.assertEqual(len(d), 1) 1719 self.assertEqual(list(d.keys()), [o2]) 1720 1721 def test_weak_keyed_union_operators(self): 1722 o1 = C() 1723 o2 = C() 1724 o3 = C() 1725 wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2}) 1726 wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4}) 1727 wkd3 = wkd1.copy() 1728 d1 = {o2: '5', o3: '6'} 1729 pairs = [(o2, 7), (o3, 8)] 1730 1731 tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries 1732 self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2)) 1733 self.assertIs(type(tmp1), weakref.WeakKeyDictionary) 1734 wkd1 |= wkd2 1735 self.assertEqual(wkd1, tmp1) 1736 1737 tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping 1738 self.assertEqual(dict(tmp2), dict(wkd2) | d1) 1739 self.assertIs(type(tmp2), weakref.WeakKeyDictionary) 1740 wkd2 |= d1 1741 self.assertEqual(wkd2, tmp2) 1742 1743 tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value 1744 tmp3 |= pairs 1745 self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs)) 1746 self.assertIs(type(tmp3), weakref.WeakKeyDictionary) 1747 1748 tmp4 = d1 | wkd3 # Testing .__ror__ 1749 self.assertEqual(dict(tmp4), d1 | dict(wkd3)) 1750 self.assertIs(type(tmp4), weakref.WeakKeyDictionary) 1751 1752 del o1 1753 self.assertNotIn(4, tmp1.values()) 1754 self.assertNotIn(4, tmp2.values()) 1755 self.assertNotIn(1, tmp3.values()) 1756 self.assertNotIn(1, tmp4.values()) 1757 1758 def test_weak_valued_delitem(self): 1759 d = weakref.WeakValueDictionary() 1760 o1 = Object('1') 1761 o2 = Object('2') 1762 d['something'] = o1 1763 d['something else'] = o2 1764 self.assertEqual(len(d), 2) 1765 del d['something'] 1766 self.assertEqual(len(d), 1) 1767 self.assertEqual(list(d.items()), [('something else', o2)]) 1768 1769 def test_weak_keyed_bad_delitem(self): 1770 d = weakref.WeakKeyDictionary() 1771 o = Object('1') 1772 # An attempt to delete an object that isn't there should raise 1773 # KeyError. It didn't before 2.3. 1774 self.assertRaises(KeyError, d.__delitem__, o) 1775 self.assertRaises(KeyError, d.__getitem__, o) 1776 1777 # If a key isn't of a weakly referencable type, __getitem__ and 1778 # __setitem__ raise TypeError. __delitem__ should too. 1779 self.assertRaises(TypeError, d.__delitem__, 13) 1780 self.assertRaises(TypeError, d.__getitem__, 13) 1781 self.assertRaises(TypeError, d.__setitem__, 13, 13) 1782 1783 def test_weak_keyed_cascading_deletes(self): 1784 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated 1785 # over the keys via self.data.iterkeys(). If things vanished from 1786 # the dict during this (or got added), that caused a RuntimeError. 1787 1788 d = weakref.WeakKeyDictionary() 1789 mutate = False 1790 1791 class C(object): 1792 def __init__(self, i): 1793 self.value = i 1794 def __hash__(self): 1795 return hash(self.value) 1796 def __eq__(self, other): 1797 if mutate: 1798 # Side effect that mutates the dict, by removing the 1799 # last strong reference to a key. 1800 del objs[-1] 1801 return self.value == other.value 1802 1803 objs = [C(i) for i in range(4)] 1804 for o in objs: 1805 d[o] = o.value 1806 del o # now the only strong references to keys are in objs 1807 # Find the order in which iterkeys sees the keys. 1808 objs = list(d.keys()) 1809 # Reverse it, so that the iteration implementation of __delitem__ 1810 # has to keep looping to find the first object we delete. 1811 objs.reverse() 1812 1813 # Turn on mutation in C.__eq__. The first time through the loop, 1814 # under the iterkeys() business the first comparison will delete 1815 # the last item iterkeys() would see, and that causes a 1816 # RuntimeError: dictionary changed size during iteration 1817 # when the iterkeys() loop goes around to try comparing the next 1818 # key. After this was fixed, it just deletes the last object *our* 1819 # "for o in obj" loop would have gotten to. 1820 mutate = True 1821 count = 0 1822 for o in objs: 1823 count += 1 1824 del d[o] 1825 gc_collect() # For PyPy or other GCs. 1826 self.assertEqual(len(d), 0) 1827 self.assertEqual(count, 2) 1828 1829 def test_make_weak_valued_dict_repr(self): 1830 dict = weakref.WeakValueDictionary() 1831 self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>') 1832 1833 def test_make_weak_keyed_dict_repr(self): 1834 dict = weakref.WeakKeyDictionary() 1835 self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>') 1836 1837 @threading_helper.requires_working_threading() 1838 def test_threaded_weak_valued_setdefault(self): 1839 d = weakref.WeakValueDictionary() 1840 with collect_in_thread(): 1841 for i in range(100000): 1842 x = d.setdefault(10, RefCycle()) 1843 self.assertIsNot(x, None) # we never put None in there! 1844 del x 1845 1846 @threading_helper.requires_working_threading() 1847 def test_threaded_weak_valued_pop(self): 1848 d = weakref.WeakValueDictionary() 1849 with collect_in_thread(): 1850 for i in range(100000): 1851 d[10] = RefCycle() 1852 x = d.pop(10, 10) 1853 self.assertIsNot(x, None) # we never put None in there! 1854 1855 @threading_helper.requires_working_threading() 1856 def test_threaded_weak_valued_consistency(self): 1857 # Issue #28427: old keys should not remove new values from 1858 # WeakValueDictionary when collecting from another thread. 1859 d = weakref.WeakValueDictionary() 1860 with collect_in_thread(): 1861 for i in range(200000): 1862 o = RefCycle() 1863 d[10] = o 1864 # o is still alive, so the dict can't be empty 1865 self.assertEqual(len(d), 1) 1866 o = None # lose ref 1867 1868 def check_threaded_weak_dict_copy(self, type_, deepcopy): 1869 # `type_` should be either WeakKeyDictionary or WeakValueDictionary. 1870 # `deepcopy` should be either True or False. 1871 exc = [] 1872 1873 class DummyKey: 1874 def __init__(self, ctr): 1875 self.ctr = ctr 1876 1877 class DummyValue: 1878 def __init__(self, ctr): 1879 self.ctr = ctr 1880 1881 def dict_copy(d, exc): 1882 try: 1883 if deepcopy is True: 1884 _ = copy.deepcopy(d) 1885 else: 1886 _ = d.copy() 1887 except Exception as ex: 1888 exc.append(ex) 1889 1890 def pop_and_collect(lst): 1891 gc_ctr = 0 1892 while lst: 1893 i = random.randint(0, len(lst) - 1) 1894 gc_ctr += 1 1895 lst.pop(i) 1896 if gc_ctr % 10000 == 0: 1897 gc.collect() # just in case 1898 1899 self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary)) 1900 1901 d = type_() 1902 keys = [] 1903 values = [] 1904 # Initialize d with many entries 1905 for i in range(70000): 1906 k, v = DummyKey(i), DummyValue(i) 1907 keys.append(k) 1908 values.append(v) 1909 d[k] = v 1910 del k 1911 del v 1912 1913 t_copy = threading.Thread(target=dict_copy, args=(d, exc,)) 1914 if type_ is weakref.WeakKeyDictionary: 1915 t_collect = threading.Thread(target=pop_and_collect, args=(keys,)) 1916 else: # weakref.WeakValueDictionary 1917 t_collect = threading.Thread(target=pop_and_collect, args=(values,)) 1918 1919 t_copy.start() 1920 t_collect.start() 1921 1922 t_copy.join() 1923 t_collect.join() 1924 1925 # Test exceptions 1926 if exc: 1927 raise exc[0] 1928 1929 @threading_helper.requires_working_threading() 1930 def test_threaded_weak_key_dict_copy(self): 1931 # Issue #35615: Weakref keys or values getting GC'ed during dict 1932 # copying should not result in a crash. 1933 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False) 1934 1935 @threading_helper.requires_working_threading() 1936 def test_threaded_weak_key_dict_deepcopy(self): 1937 # Issue #35615: Weakref keys or values getting GC'ed during dict 1938 # copying should not result in a crash. 1939 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True) 1940 1941 @threading_helper.requires_working_threading() 1942 def test_threaded_weak_value_dict_copy(self): 1943 # Issue #35615: Weakref keys or values getting GC'ed during dict 1944 # copying should not result in a crash. 1945 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False) 1946 1947 @threading_helper.requires_working_threading() 1948 def test_threaded_weak_value_dict_deepcopy(self): 1949 # Issue #35615: Weakref keys or values getting GC'ed during dict 1950 # copying should not result in a crash. 1951 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True) 1952 1953 @support.cpython_only 1954 def test_remove_closure(self): 1955 d = weakref.WeakValueDictionary() 1956 self.assertIsNone(d._remove.__closure__) 1957 1958 1959from test import mapping_tests 1960 1961class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1962 """Check that WeakValueDictionary conforms to the mapping protocol""" 1963 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} 1964 type2test = weakref.WeakValueDictionary 1965 def _reference(self): 1966 return self.__ref.copy() 1967 1968class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1969 """Check that WeakKeyDictionary conforms to the mapping protocol""" 1970 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} 1971 type2test = weakref.WeakKeyDictionary 1972 def _reference(self): 1973 return self.__ref.copy() 1974 1975 1976class FinalizeTestCase(unittest.TestCase): 1977 1978 class A: 1979 pass 1980 1981 def _collect_if_necessary(self): 1982 # we create no ref-cycles so in CPython no gc should be needed 1983 if sys.implementation.name != 'cpython': 1984 support.gc_collect() 1985 1986 def test_finalize(self): 1987 def add(x,y,z): 1988 res.append(x + y + z) 1989 return x + y + z 1990 1991 a = self.A() 1992 1993 res = [] 1994 f = weakref.finalize(a, add, 67, 43, z=89) 1995 self.assertEqual(f.alive, True) 1996 self.assertEqual(f.peek(), (a, add, (67,43), {'z':89})) 1997 self.assertEqual(f(), 199) 1998 self.assertEqual(f(), None) 1999 self.assertEqual(f(), None) 2000 self.assertEqual(f.peek(), None) 2001 self.assertEqual(f.detach(), None) 2002 self.assertEqual(f.alive, False) 2003 self.assertEqual(res, [199]) 2004 2005 res = [] 2006 f = weakref.finalize(a, add, 67, 43, 89) 2007 self.assertEqual(f.peek(), (a, add, (67,43,89), {})) 2008 self.assertEqual(f.detach(), (a, add, (67,43,89), {})) 2009 self.assertEqual(f(), None) 2010 self.assertEqual(f(), None) 2011 self.assertEqual(f.peek(), None) 2012 self.assertEqual(f.detach(), None) 2013 self.assertEqual(f.alive, False) 2014 self.assertEqual(res, []) 2015 2016 res = [] 2017 f = weakref.finalize(a, add, x=67, y=43, z=89) 2018 del a 2019 self._collect_if_necessary() 2020 self.assertEqual(f(), None) 2021 self.assertEqual(f(), None) 2022 self.assertEqual(f.peek(), None) 2023 self.assertEqual(f.detach(), None) 2024 self.assertEqual(f.alive, False) 2025 self.assertEqual(res, [199]) 2026 2027 def test_arg_errors(self): 2028 def fin(*args, **kwargs): 2029 res.append((args, kwargs)) 2030 2031 a = self.A() 2032 2033 res = [] 2034 f = weakref.finalize(a, fin, 1, 2, func=3, obj=4) 2035 self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4})) 2036 f() 2037 self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})]) 2038 2039 with self.assertRaises(TypeError): 2040 weakref.finalize(a, func=fin, arg=1) 2041 with self.assertRaises(TypeError): 2042 weakref.finalize(obj=a, func=fin, arg=1) 2043 self.assertRaises(TypeError, weakref.finalize, a) 2044 self.assertRaises(TypeError, weakref.finalize) 2045 2046 def test_order(self): 2047 a = self.A() 2048 res = [] 2049 2050 f1 = weakref.finalize(a, res.append, 'f1') 2051 f2 = weakref.finalize(a, res.append, 'f2') 2052 f3 = weakref.finalize(a, res.append, 'f3') 2053 f4 = weakref.finalize(a, res.append, 'f4') 2054 f5 = weakref.finalize(a, res.append, 'f5') 2055 2056 # make sure finalizers can keep themselves alive 2057 del f1, f4 2058 2059 self.assertTrue(f2.alive) 2060 self.assertTrue(f3.alive) 2061 self.assertTrue(f5.alive) 2062 2063 self.assertTrue(f5.detach()) 2064 self.assertFalse(f5.alive) 2065 2066 f5() # nothing because previously unregistered 2067 res.append('A') 2068 f3() # => res.append('f3') 2069 self.assertFalse(f3.alive) 2070 res.append('B') 2071 f3() # nothing because previously called 2072 res.append('C') 2073 del a 2074 self._collect_if_necessary() 2075 # => res.append('f4') 2076 # => res.append('f2') 2077 # => res.append('f1') 2078 self.assertFalse(f2.alive) 2079 res.append('D') 2080 f2() # nothing because previously called by gc 2081 2082 expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D'] 2083 self.assertEqual(res, expected) 2084 2085 def test_all_freed(self): 2086 # we want a weakrefable subclass of weakref.finalize 2087 class MyFinalizer(weakref.finalize): 2088 pass 2089 2090 a = self.A() 2091 res = [] 2092 def callback(): 2093 res.append(123) 2094 f = MyFinalizer(a, callback) 2095 2096 wr_callback = weakref.ref(callback) 2097 wr_f = weakref.ref(f) 2098 del callback, f 2099 2100 self.assertIsNotNone(wr_callback()) 2101 self.assertIsNotNone(wr_f()) 2102 2103 del a 2104 self._collect_if_necessary() 2105 2106 self.assertIsNone(wr_callback()) 2107 self.assertIsNone(wr_f()) 2108 self.assertEqual(res, [123]) 2109 2110 @classmethod 2111 def run_in_child(cls): 2112 def error(): 2113 # Create an atexit finalizer from inside a finalizer called 2114 # at exit. This should be the next to be run. 2115 g1 = weakref.finalize(cls, print, 'g1') 2116 print('f3 error') 2117 1/0 2118 2119 # cls should stay alive till atexit callbacks run 2120 f1 = weakref.finalize(cls, print, 'f1', _global_var) 2121 f2 = weakref.finalize(cls, print, 'f2', _global_var) 2122 f3 = weakref.finalize(cls, error) 2123 f4 = weakref.finalize(cls, print, 'f4', _global_var) 2124 2125 assert f1.atexit == True 2126 f2.atexit = False 2127 assert f3.atexit == True 2128 assert f4.atexit == True 2129 2130 def test_atexit(self): 2131 prog = ('from test.test_weakref import FinalizeTestCase;'+ 2132 'FinalizeTestCase.run_in_child()') 2133 rc, out, err = script_helper.assert_python_ok('-c', prog) 2134 out = out.decode('ascii').splitlines() 2135 self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar']) 2136 self.assertTrue(b'ZeroDivisionError' in err) 2137 2138 2139class ModuleTestCase(unittest.TestCase): 2140 def test_names(self): 2141 for name in ('ReferenceType', 'ProxyType', 'CallableProxyType', 2142 'WeakMethod', 'WeakSet', 'WeakKeyDictionary', 'WeakValueDictionary'): 2143 obj = getattr(weakref, name) 2144 if name != 'WeakSet': 2145 self.assertEqual(obj.__module__, 'weakref') 2146 self.assertEqual(obj.__name__, name) 2147 self.assertEqual(obj.__qualname__, name) 2148 2149 2150libreftest = """ Doctest for examples in the library reference: weakref.rst 2151 2152>>> from test.support import gc_collect 2153>>> import weakref 2154>>> class Dict(dict): 2155... pass 2156... 2157>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable 2158>>> r = weakref.ref(obj) 2159>>> print(r() is obj) 2160True 2161 2162>>> import weakref 2163>>> class Object: 2164... pass 2165... 2166>>> o = Object() 2167>>> r = weakref.ref(o) 2168>>> o2 = r() 2169>>> o is o2 2170True 2171>>> del o, o2 2172>>> gc_collect() # For PyPy or other GCs. 2173>>> print(r()) 2174None 2175 2176>>> import weakref 2177>>> class ExtendedRef(weakref.ref): 2178... def __init__(self, ob, callback=None, **annotations): 2179... super().__init__(ob, callback) 2180... self.__counter = 0 2181... for k, v in annotations.items(): 2182... setattr(self, k, v) 2183... def __call__(self): 2184... '''Return a pair containing the referent and the number of 2185... times the reference has been called. 2186... ''' 2187... ob = super().__call__() 2188... if ob is not None: 2189... self.__counter += 1 2190... ob = (ob, self.__counter) 2191... return ob 2192... 2193>>> class A: # not in docs from here, just testing the ExtendedRef 2194... pass 2195... 2196>>> a = A() 2197>>> r = ExtendedRef(a, foo=1, bar="baz") 2198>>> r.foo 21991 2200>>> r.bar 2201'baz' 2202>>> r()[1] 22031 2204>>> r()[1] 22052 2206>>> r()[0] is a 2207True 2208 2209 2210>>> import weakref 2211>>> _id2obj_dict = weakref.WeakValueDictionary() 2212>>> def remember(obj): 2213... oid = id(obj) 2214... _id2obj_dict[oid] = obj 2215... return oid 2216... 2217>>> def id2obj(oid): 2218... return _id2obj_dict[oid] 2219... 2220>>> a = A() # from here, just testing 2221>>> a_id = remember(a) 2222>>> id2obj(a_id) is a 2223True 2224>>> del a 2225>>> gc_collect() # For PyPy or other GCs. 2226>>> try: 2227... id2obj(a_id) 2228... except KeyError: 2229... print('OK') 2230... else: 2231... print('WeakValueDictionary error') 2232OK 2233 2234""" 2235 2236__test__ = {'libreftest' : libreftest} 2237 2238def load_tests(loader, tests, pattern): 2239 tests.addTest(doctest.DocTestSuite()) 2240 return tests 2241 2242 2243if __name__ == "__main__": 2244 unittest.main() 2245