1import gc 2import sys 3import unittest 4import UserList 5import weakref 6import operator 7import contextlib 8import copy 9import time 10 11from test import test_support 12 13# Used in ReferencesTestCase.test_ref_created_during_del() . 14ref_from_del = None 15 16class C: 17 def method(self): 18 pass 19 20 21class Callable: 22 bar = None 23 24 def __call__(self, x): 25 self.bar = x 26 27 28def create_function(): 29 def f(): pass 30 return f 31 32def create_bound_method(): 33 return C().method 34 35def create_unbound_method(): 36 return C.method 37 38 39class Object: 40 def __init__(self, arg): 41 self.arg = arg 42 def __repr__(self): 43 return "<Object %r>" % self.arg 44 def __eq__(self, other): 45 if isinstance(other, Object): 46 return self.arg == other.arg 47 return NotImplemented 48 def __ne__(self, other): 49 if isinstance(other, Object): 50 return self.arg != other.arg 51 return NotImplemented 52 def __hash__(self): 53 return hash(self.arg) 54 55class RefCycle: 56 def __init__(self): 57 self.cycle = self 58 59 60@contextlib.contextmanager 61def collect_in_thread(period=0.001): 62 """ 63 Ensure GC collections happen in a different thread, at a high frequency. 64 """ 65 threading = test_support.import_module('threading') 66 please_stop = False 67 68 def collect(): 69 while not please_stop: 70 time.sleep(period) 71 gc.collect() 72 73 with test_support.disable_gc(): 74 old_interval = sys.getcheckinterval() 75 sys.setcheckinterval(20) 76 t = threading.Thread(target=collect) 77 t.start() 78 try: 79 yield 80 finally: 81 please_stop = True 82 t.join() 83 sys.setcheckinterval(old_interval) 84 85 86class TestBase(unittest.TestCase): 87 88 def setUp(self): 89 self.cbcalled = 0 90 91 def callback(self, ref): 92 self.cbcalled += 1 93 94 95class ReferencesTestCase(TestBase): 96 97 def test_basic_ref(self): 98 self.check_basic_ref(C) 99 self.check_basic_ref(create_function) 100 self.check_basic_ref(create_bound_method) 101 self.check_basic_ref(create_unbound_method) 102 103 # Just make sure the tp_repr handler doesn't raise an exception. 104 # Live reference: 105 o = C() 106 wr = weakref.ref(o) 107 repr(wr) 108 # Dead reference: 109 del o 110 repr(wr) 111 112 def test_basic_callback(self): 113 self.check_basic_callback(C) 114 self.check_basic_callback(create_function) 115 self.check_basic_callback(create_bound_method) 116 self.check_basic_callback(create_unbound_method) 117 118 def test_multiple_callbacks(self): 119 o = C() 120 ref1 = weakref.ref(o, self.callback) 121 ref2 = weakref.ref(o, self.callback) 122 del o 123 self.assertIsNone(ref1(), "expected reference to be invalidated") 124 self.assertIsNone(ref2(), "expected reference to be invalidated") 125 self.assertEqual(self.cbcalled, 2, 126 "callback not called the right number of times") 127 128 def test_multiple_selfref_callbacks(self): 129 # Make sure all references are invalidated before callbacks are called 130 # 131 # What's important here is that we're using the first 132 # reference in the callback invoked on the second reference 133 # (the most recently created ref is cleaned up first). This 134 # tests that all references to the object are invalidated 135 # before any of the callbacks are invoked, so that we only 136 # have one invocation of _weakref.c:cleanup_helper() active 137 # for a particular object at a time. 138 # 139 def callback(object, self=self): 140 self.ref() 141 c = C() 142 self.ref = weakref.ref(c, callback) 143 ref1 = weakref.ref(c, callback) 144 del c 145 146 def test_constructor_kwargs(self): 147 c = C() 148 self.assertRaises(TypeError, weakref.ref, c, callback=None) 149 150 def test_proxy_ref(self): 151 o = C() 152 o.bar = 1 153 ref1 = weakref.proxy(o, self.callback) 154 ref2 = weakref.proxy(o, self.callback) 155 del o 156 157 def check(proxy): 158 proxy.bar 159 160 self.assertRaises(weakref.ReferenceError, check, ref1) 161 self.assertRaises(weakref.ReferenceError, check, ref2) 162 self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C())) 163 self.assertEqual(self.cbcalled, 2) 164 165 def check_basic_ref(self, factory): 166 o = factory() 167 ref = weakref.ref(o) 168 self.assertIsNotNone(ref(), 169 "weak reference to live object should be live") 170 o2 = ref() 171 self.assertIs(o, o2, 172 "<ref>() should return original object if live") 173 174 def check_basic_callback(self, factory): 175 self.cbcalled = 0 176 o = factory() 177 ref = weakref.ref(o, self.callback) 178 del o 179 self.assertEqual(self.cbcalled, 1, 180 "callback did not properly set 'cbcalled'") 181 self.assertIsNone(ref(), 182 "ref2 should be dead after deleting object reference") 183 184 def test_ref_reuse(self): 185 o = C() 186 ref1 = weakref.ref(o) 187 # create a proxy to make sure that there's an intervening creation 188 # between these two; it should make no difference 189 proxy = weakref.proxy(o) 190 ref2 = weakref.ref(o) 191 self.assertIs(ref1, ref2, 192 "reference object w/out callback should be re-used") 193 194 o = C() 195 proxy = weakref.proxy(o) 196 ref1 = weakref.ref(o) 197 ref2 = weakref.ref(o) 198 self.assertIs(ref1, ref2, 199 "reference object w/out callback should be re-used") 200 self.assertEqual(weakref.getweakrefcount(o), 2, 201 "wrong weak ref count for object") 202 del proxy 203 self.assertEqual(weakref.getweakrefcount(o), 1, 204 "wrong weak ref count for object after deleting proxy") 205 206 def test_proxy_reuse(self): 207 o = C() 208 proxy1 = weakref.proxy(o) 209 ref = weakref.ref(o) 210 proxy2 = weakref.proxy(o) 211 self.assertIs(proxy1, proxy2, 212 "proxy object w/out callback should have been re-used") 213 214 def test_basic_proxy(self): 215 o = C() 216 self.check_proxy(o, weakref.proxy(o)) 217 218 L = UserList.UserList() 219 p = weakref.proxy(L) 220 self.assertFalse(p, "proxy for empty UserList should be false") 221 p.append(12) 222 self.assertEqual(len(L), 1) 223 self.assertTrue(p, "proxy for non-empty UserList should be true") 224 with test_support.check_py3k_warnings(): 225 p[:] = [2, 3] 226 self.assertEqual(len(L), 2) 227 self.assertEqual(len(p), 2) 228 self.assertIn(3, p, "proxy didn't support __contains__() properly") 229 p[1] = 5 230 self.assertEqual(L[1], 5) 231 self.assertEqual(p[1], 5) 232 L2 = UserList.UserList(L) 233 p2 = weakref.proxy(L2) 234 self.assertEqual(p, p2) 235 ## self.assertEqual(repr(L2), repr(p2)) 236 L3 = UserList.UserList(range(10)) 237 p3 = weakref.proxy(L3) 238 with test_support.check_py3k_warnings(): 239 self.assertEqual(L3[:], p3[:]) 240 self.assertEqual(L3[5:], p3[5:]) 241 self.assertEqual(L3[:5], p3[:5]) 242 self.assertEqual(L3[2:5], p3[2:5]) 243 244 def test_proxy_unicode(self): 245 # See bug 5037 246 class C(object): 247 def __str__(self): 248 return "string" 249 def __unicode__(self): 250 return u"unicode" 251 instance = C() 252 self.assertIn("__unicode__", dir(weakref.proxy(instance))) 253 self.assertEqual(unicode(weakref.proxy(instance)), u"unicode") 254 255 def test_proxy_index(self): 256 class C: 257 def __index__(self): 258 return 10 259 o = C() 260 p = weakref.proxy(o) 261 self.assertEqual(operator.index(p), 10) 262 263 def test_proxy_div(self): 264 class C: 265 def __floordiv__(self, other): 266 return 42 267 def __ifloordiv__(self, other): 268 return 21 269 o = C() 270 p = weakref.proxy(o) 271 self.assertEqual(p // 5, 42) 272 p //= 5 273 self.assertEqual(p, 21) 274 275 # The PyWeakref_* C API is documented as allowing either NULL or 276 # None as the value for the callback, where either means "no 277 # callback". The "no callback" ref and proxy objects are supposed 278 # to be shared so long as they exist by all callers so long as 279 # they are active. In Python 2.3.3 and earlier, this guarantee 280 # was not honored, and was broken in different ways for 281 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) 282 283 def test_shared_ref_without_callback(self): 284 self.check_shared_without_callback(weakref.ref) 285 286 def test_shared_proxy_without_callback(self): 287 self.check_shared_without_callback(weakref.proxy) 288 289 def check_shared_without_callback(self, makeref): 290 o = Object(1) 291 p1 = makeref(o, None) 292 p2 = makeref(o, None) 293 self.assertIs(p1, p2, "both callbacks were None in the C API") 294 del p1, p2 295 p1 = makeref(o) 296 p2 = makeref(o, None) 297 self.assertIs(p1, p2, "callbacks were NULL, None in the C API") 298 del p1, p2 299 p1 = makeref(o) 300 p2 = makeref(o) 301 self.assertIs(p1, p2, "both callbacks were NULL in the C API") 302 del p1, p2 303 p1 = makeref(o, None) 304 p2 = makeref(o) 305 self.assertIs(p1, p2, "callbacks were None, NULL in the C API") 306 307 def test_callable_proxy(self): 308 o = Callable() 309 ref1 = weakref.proxy(o) 310 311 self.check_proxy(o, ref1) 312 313 self.assertIs(type(ref1), weakref.CallableProxyType, 314 "proxy is not of callable type") 315 ref1('twinkies!') 316 self.assertEqual(o.bar, 'twinkies!', 317 "call through proxy not passed through to original") 318 ref1(x='Splat.') 319 self.assertEqual(o.bar, 'Splat.', 320 "call through proxy not passed through to original") 321 322 # expect due to too few args 323 self.assertRaises(TypeError, ref1) 324 325 # expect due to too many args 326 self.assertRaises(TypeError, ref1, 1, 2, 3) 327 328 def check_proxy(self, o, proxy): 329 o.foo = 1 330 self.assertEqual(proxy.foo, 1, 331 "proxy does not reflect attribute addition") 332 o.foo = 2 333 self.assertEqual(proxy.foo, 2, 334 "proxy does not reflect attribute modification") 335 del o.foo 336 self.assertFalse(hasattr(proxy, 'foo'), 337 "proxy does not reflect attribute removal") 338 339 proxy.foo = 1 340 self.assertEqual(o.foo, 1, 341 "object does not reflect attribute addition via proxy") 342 proxy.foo = 2 343 self.assertEqual(o.foo, 2, 344 "object does not reflect attribute modification via proxy") 345 del proxy.foo 346 self.assertFalse(hasattr(o, 'foo'), 347 "object does not reflect attribute removal via proxy") 348 349 def test_proxy_deletion(self): 350 # Test clearing of SF bug #762891 351 class Foo: 352 result = None 353 def __delitem__(self, accessor): 354 self.result = accessor 355 g = Foo() 356 f = weakref.proxy(g) 357 del f[0] 358 self.assertEqual(f.result, 0) 359 360 def test_proxy_bool(self): 361 # Test clearing of SF bug #1170766 362 class List(list): pass 363 lyst = List() 364 self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) 365 366 def test_getweakrefcount(self): 367 o = C() 368 ref1 = weakref.ref(o) 369 ref2 = weakref.ref(o, self.callback) 370 self.assertEqual(weakref.getweakrefcount(o), 2, 371 "got wrong number of weak reference objects") 372 373 proxy1 = weakref.proxy(o) 374 proxy2 = weakref.proxy(o, self.callback) 375 self.assertEqual(weakref.getweakrefcount(o), 4, 376 "got wrong number of weak reference objects") 377 378 del ref1, ref2, proxy1, proxy2 379 self.assertEqual(weakref.getweakrefcount(o), 0, 380 "weak reference objects not unlinked from" 381 " referent when discarded.") 382 383 # assumes ints do not support weakrefs 384 self.assertEqual(weakref.getweakrefcount(1), 0, 385 "got wrong number of weak reference objects for int") 386 387 def test_getweakrefs(self): 388 o = C() 389 ref1 = weakref.ref(o, self.callback) 390 ref2 = weakref.ref(o, self.callback) 391 del ref1 392 self.assertEqual(weakref.getweakrefs(o), [ref2], 393 "list of refs does not match") 394 395 o = C() 396 ref1 = weakref.ref(o, self.callback) 397 ref2 = weakref.ref(o, self.callback) 398 del ref2 399 self.assertEqual(weakref.getweakrefs(o), [ref1], 400 "list of refs does not match") 401 402 del ref1 403 self.assertEqual(weakref.getweakrefs(o), [], 404 "list of refs not cleared") 405 406 # assumes ints do not support weakrefs 407 self.assertEqual(weakref.getweakrefs(1), [], 408 "list of refs does not match for int") 409 410 def test_newstyle_number_ops(self): 411 class F(float): 412 pass 413 f = F(2.0) 414 p = weakref.proxy(f) 415 self.assertEqual(p + 1.0, 3.0) 416 self.assertEqual(1.0 + p, 3.0) # this used to SEGV 417 418 def test_callbacks_protected(self): 419 # Callbacks protected from already-set exceptions? 420 # Regression test for SF bug #478534. 421 class BogusError(Exception): 422 pass 423 data = {} 424 def remove(k): 425 del data[k] 426 def encapsulate(): 427 f = lambda : () 428 data[weakref.ref(f, remove)] = None 429 raise BogusError 430 try: 431 encapsulate() 432 except BogusError: 433 pass 434 else: 435 self.fail("exception not properly restored") 436 try: 437 encapsulate() 438 except BogusError: 439 pass 440 else: 441 self.fail("exception not properly restored") 442 443 def test_sf_bug_840829(self): 444 # "weakref callbacks and gc corrupt memory" 445 # subtype_dealloc erroneously exposed a new-style instance 446 # already in the process of getting deallocated to gc, 447 # causing double-deallocation if the instance had a weakref 448 # callback that triggered gc. 449 # If the bug exists, there probably won't be an obvious symptom 450 # in a release build. In a debug build, a segfault will occur 451 # when the second attempt to remove the instance from the "list 452 # of all objects" occurs. 453 454 import gc 455 456 class C(object): 457 pass 458 459 c = C() 460 wr = weakref.ref(c, lambda ignore: gc.collect()) 461 del c 462 463 # There endeth the first part. It gets worse. 464 del wr 465 466 c1 = C() 467 c1.i = C() 468 wr = weakref.ref(c1.i, lambda ignore: gc.collect()) 469 470 c2 = C() 471 c2.c1 = c1 472 del c1 # still alive because c2 points to it 473 474 # Now when subtype_dealloc gets called on c2, it's not enough just 475 # that c2 is immune from gc while the weakref callbacks associated 476 # with c2 execute (there are none in this 2nd half of the test, btw). 477 # subtype_dealloc goes on to call the base classes' deallocs too, 478 # so any gc triggered by weakref callbacks associated with anything 479 # torn down by a base class dealloc can also trigger double 480 # deallocation of c2. 481 del c2 482 483 def test_callback_in_cycle_1(self): 484 import gc 485 486 class J(object): 487 pass 488 489 class II(object): 490 def acallback(self, ignore): 491 self.J 492 493 I = II() 494 I.J = J 495 I.wr = weakref.ref(J, I.acallback) 496 497 # Now J and II are each in a self-cycle (as all new-style class 498 # objects are, since their __mro__ points back to them). I holds 499 # both a weak reference (I.wr) and a strong reference (I.J) to class 500 # J. I is also in a cycle (I.wr points to a weakref that references 501 # I.acallback). When we del these three, they all become trash, but 502 # the cycles prevent any of them from getting cleaned up immediately. 503 # Instead they have to wait for cyclic gc to deduce that they're 504 # trash. 505 # 506 # gc used to call tp_clear on all of them, and the order in which 507 # it does that is pretty accidental. The exact order in which we 508 # built up these things manages to provoke gc into running tp_clear 509 # in just the right order (I last). Calling tp_clear on II leaves 510 # behind an insane class object (its __mro__ becomes NULL). Calling 511 # tp_clear on J breaks its self-cycle, but J doesn't get deleted 512 # just then because of the strong reference from I.J. Calling 513 # tp_clear on I starts to clear I's __dict__, and just happens to 514 # clear I.J first -- I.wr is still intact. That removes the last 515 # reference to J, which triggers the weakref callback. The callback 516 # tries to do "self.J", and instances of new-style classes look up 517 # attributes ("J") in the class dict first. The class (II) wants to 518 # search II.__mro__, but that's NULL. The result was a segfault in 519 # a release build, and an assert failure in a debug build. 520 del I, J, II 521 gc.collect() 522 523 def test_callback_in_cycle_2(self): 524 import gc 525 526 # This is just like test_callback_in_cycle_1, except that II is an 527 # old-style class. The symptom is different then: an instance of an 528 # old-style class looks in its own __dict__ first. 'J' happens to 529 # get cleared from I.__dict__ before 'wr', and 'J' was never in II's 530 # __dict__, so the attribute isn't found. The difference is that 531 # the old-style II doesn't have a NULL __mro__ (it doesn't have any 532 # __mro__), so no segfault occurs. Instead it got: 533 # test_callback_in_cycle_2 (__main__.ReferencesTestCase) ... 534 # Exception exceptions.AttributeError: 535 # "II instance has no attribute 'J'" in <bound method II.acallback 536 # of <?.II instance at 0x00B9B4B8>> ignored 537 538 class J(object): 539 pass 540 541 class II: 542 def acallback(self, ignore): 543 self.J 544 545 I = II() 546 I.J = J 547 I.wr = weakref.ref(J, I.acallback) 548 549 del I, J, II 550 gc.collect() 551 552 def test_callback_in_cycle_3(self): 553 import gc 554 555 # This one broke the first patch that fixed the last two. In this 556 # case, the objects reachable from the callback aren't also reachable 557 # from the object (c1) *triggering* the callback: you can get to 558 # c1 from c2, but not vice-versa. The result was that c2's __dict__ 559 # got tp_clear'ed by the time the c2.cb callback got invoked. 560 561 class C: 562 def cb(self, ignore): 563 self.me 564 self.c1 565 self.wr 566 567 c1, c2 = C(), C() 568 569 c2.me = c2 570 c2.c1 = c1 571 c2.wr = weakref.ref(c1, c2.cb) 572 573 del c1, c2 574 gc.collect() 575 576 def test_callback_in_cycle_4(self): 577 import gc 578 579 # Like test_callback_in_cycle_3, except c2 and c1 have different 580 # classes. c2's class (C) isn't reachable from c1 then, so protecting 581 # objects reachable from the dying object (c1) isn't enough to stop 582 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. 583 # The result was a segfault (C.__mro__ was NULL when the callback 584 # tried to look up self.me). 585 586 class C(object): 587 def cb(self, ignore): 588 self.me 589 self.c1 590 self.wr 591 592 class D: 593 pass 594 595 c1, c2 = D(), C() 596 597 c2.me = c2 598 c2.c1 = c1 599 c2.wr = weakref.ref(c1, c2.cb) 600 601 del c1, c2, C, D 602 gc.collect() 603 604 @test_support.requires_type_collecting 605 def test_callback_in_cycle_resurrection(self): 606 import gc 607 608 # Do something nasty in a weakref callback: resurrect objects 609 # from dead cycles. For this to be attempted, the weakref and 610 # its callback must also be part of the cyclic trash (else the 611 # objects reachable via the callback couldn't be in cyclic trash 612 # to begin with -- the callback would act like an external root). 613 # But gc clears trash weakrefs with callbacks early now, which 614 # disables the callbacks, so the callbacks shouldn't get called 615 # at all (and so nothing actually gets resurrected). 616 617 alist = [] 618 class C(object): 619 def __init__(self, value): 620 self.attribute = value 621 622 def acallback(self, ignore): 623 alist.append(self.c) 624 625 c1, c2 = C(1), C(2) 626 c1.c = c2 627 c2.c = c1 628 c1.wr = weakref.ref(c2, c1.acallback) 629 c2.wr = weakref.ref(c1, c2.acallback) 630 631 def C_went_away(ignore): 632 alist.append("C went away") 633 wr = weakref.ref(C, C_went_away) 634 635 del c1, c2, C # make them all trash 636 self.assertEqual(alist, []) # del isn't enough to reclaim anything 637 638 gc.collect() 639 # c1.wr and c2.wr were part of the cyclic trash, so should have 640 # been cleared without their callbacks executing. OTOH, the weakref 641 # to C is bound to a function local (wr), and wasn't trash, so that 642 # callback should have been invoked when C went away. 643 self.assertEqual(alist, ["C went away"]) 644 # The remaining weakref should be dead now (its callback ran). 645 self.assertEqual(wr(), None) 646 647 del alist[:] 648 gc.collect() 649 self.assertEqual(alist, []) 650 651 def test_callbacks_on_callback(self): 652 import gc 653 654 # Set up weakref callbacks *on* weakref callbacks. 655 alist = [] 656 def safe_callback(ignore): 657 alist.append("safe_callback called") 658 659 class C(object): 660 def cb(self, ignore): 661 alist.append("cb called") 662 663 c, d = C(), C() 664 c.other = d 665 d.other = c 666 callback = c.cb 667 c.wr = weakref.ref(d, callback) # this won't trigger 668 d.wr = weakref.ref(callback, d.cb) # ditto 669 external_wr = weakref.ref(callback, safe_callback) # but this will 670 self.assertIs(external_wr(), callback) 671 672 # The weakrefs attached to c and d should get cleared, so that 673 # C.cb is never called. But external_wr isn't part of the cyclic 674 # trash, and no cyclic trash is reachable from it, so safe_callback 675 # should get invoked when the bound method object callback (c.cb) 676 # -- which is itself a callback, and also part of the cyclic trash -- 677 # gets reclaimed at the end of gc. 678 679 del callback, c, d, C 680 self.assertEqual(alist, []) # del isn't enough to clean up cycles 681 gc.collect() 682 self.assertEqual(alist, ["safe_callback called"]) 683 self.assertEqual(external_wr(), None) 684 685 del alist[:] 686 gc.collect() 687 self.assertEqual(alist, []) 688 689 def test_gc_during_ref_creation(self): 690 self.check_gc_during_creation(weakref.ref) 691 692 def test_gc_during_proxy_creation(self): 693 self.check_gc_during_creation(weakref.proxy) 694 695 def check_gc_during_creation(self, makeref): 696 thresholds = gc.get_threshold() 697 gc.set_threshold(1, 1, 1) 698 gc.collect() 699 class A: 700 pass 701 702 def callback(*args): 703 pass 704 705 referenced = A() 706 707 a = A() 708 a.a = a 709 a.wr = makeref(referenced) 710 711 try: 712 # now make sure the object and the ref get labeled as 713 # cyclic trash: 714 a = A() 715 weakref.ref(referenced, callback) 716 717 finally: 718 gc.set_threshold(*thresholds) 719 720 def test_ref_created_during_del(self): 721 # Bug #1377858 722 # A weakref created in an object's __del__() would crash the 723 # interpreter when the weakref was cleaned up since it would refer to 724 # non-existent memory. This test should not segfault the interpreter. 725 class Target(object): 726 def __del__(self): 727 global ref_from_del 728 ref_from_del = weakref.ref(self) 729 730 w = Target() 731 732 def test_init(self): 733 # Issue 3634 734 # <weakref to class>.__init__() doesn't check errors correctly 735 r = weakref.ref(Exception) 736 self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0) 737 # No exception should be raised here 738 gc.collect() 739 740 def test_classes(self): 741 # Check that both old-style classes and new-style classes 742 # are weakrefable. 743 class A(object): 744 pass 745 class B: 746 pass 747 l = [] 748 weakref.ref(int) 749 a = weakref.ref(A, l.append) 750 A = None 751 gc.collect() 752 self.assertEqual(a(), None) 753 self.assertEqual(l, [a]) 754 b = weakref.ref(B, l.append) 755 B = None 756 gc.collect() 757 self.assertEqual(b(), None) 758 self.assertEqual(l, [a, b]) 759 760 def test_equality(self): 761 # Alive weakrefs defer equality testing to their underlying object. 762 x = Object(1) 763 y = Object(1) 764 z = Object(2) 765 a = weakref.ref(x) 766 b = weakref.ref(y) 767 c = weakref.ref(z) 768 d = weakref.ref(x) 769 # Note how we directly test the operators here, to stress both 770 # __eq__ and __ne__. 771 self.assertTrue(a == b) 772 self.assertFalse(a != b) 773 self.assertFalse(a == c) 774 self.assertTrue(a != c) 775 self.assertTrue(a == d) 776 self.assertFalse(a != d) 777 del x, y, z 778 gc.collect() 779 for r in a, b, c: 780 # Sanity check 781 self.assertIs(r(), None) 782 # Dead weakrefs compare by identity: whether `a` and `d` are the 783 # same weakref object is an implementation detail, since they pointed 784 # to the same original object and didn't have a callback. 785 # (see issue #16453). 786 self.assertFalse(a == b) 787 self.assertTrue(a != b) 788 self.assertFalse(a == c) 789 self.assertTrue(a != c) 790 self.assertEqual(a == d, a is d) 791 self.assertEqual(a != d, a is not d) 792 793 def test_hashing(self): 794 # Alive weakrefs hash the same as the underlying object 795 x = Object(42) 796 y = Object(42) 797 a = weakref.ref(x) 798 b = weakref.ref(y) 799 self.assertEqual(hash(a), hash(42)) 800 del x, y 801 gc.collect() 802 # Dead weakrefs: 803 # - retain their hash is they were hashed when alive; 804 # - otherwise, cannot be hashed. 805 self.assertEqual(hash(a), hash(42)) 806 self.assertRaises(TypeError, hash, b) 807 808 def test_trashcan_16602(self): 809 # Issue #16602: when a weakref's target was part of a long 810 # deallocation chain, the trashcan mechanism could delay clearing 811 # of the weakref and make the target object visible from outside 812 # code even though its refcount had dropped to 0. A crash ensued. 813 class C(object): 814 def __init__(self, parent): 815 if not parent: 816 return 817 wself = weakref.ref(self) 818 def cb(wparent): 819 o = wself() 820 self.wparent = weakref.ref(parent, cb) 821 822 d = weakref.WeakKeyDictionary() 823 root = c = C(None) 824 for n in range(100): 825 d[c] = c = C(c) 826 del root 827 gc.collect() 828 829 830class SubclassableWeakrefTestCase(TestBase): 831 832 def test_subclass_refs(self): 833 class MyRef(weakref.ref): 834 def __init__(self, ob, callback=None, value=42): 835 self.value = value 836 super(MyRef, self).__init__(ob, callback) 837 def __call__(self): 838 self.called = True 839 return super(MyRef, self).__call__() 840 o = Object("foo") 841 mr = MyRef(o, value=24) 842 self.assertIs(mr(), o) 843 self.assertTrue(mr.called) 844 self.assertEqual(mr.value, 24) 845 del o 846 self.assertIsNone(mr()) 847 self.assertTrue(mr.called) 848 849 def test_subclass_refs_dont_replace_standard_refs(self): 850 class MyRef(weakref.ref): 851 pass 852 o = Object(42) 853 r1 = MyRef(o) 854 r2 = weakref.ref(o) 855 self.assertIsNot(r1, r2) 856 self.assertEqual(weakref.getweakrefs(o), [r2, r1]) 857 self.assertEqual(weakref.getweakrefcount(o), 2) 858 r3 = MyRef(o) 859 self.assertEqual(weakref.getweakrefcount(o), 3) 860 refs = weakref.getweakrefs(o) 861 self.assertEqual(len(refs), 3) 862 self.assertIs(r2, refs[0]) 863 self.assertIn(r1, refs[1:]) 864 self.assertIn(r3, refs[1:]) 865 866 def test_subclass_refs_dont_conflate_callbacks(self): 867 class MyRef(weakref.ref): 868 pass 869 o = Object(42) 870 r1 = MyRef(o, id) 871 r2 = MyRef(o, str) 872 self.assertIsNot(r1, r2) 873 refs = weakref.getweakrefs(o) 874 self.assertIn(r1, refs) 875 self.assertIn(r2, refs) 876 877 def test_subclass_refs_with_slots(self): 878 class MyRef(weakref.ref): 879 __slots__ = "slot1", "slot2" 880 def __new__(type, ob, callback, slot1, slot2): 881 return weakref.ref.__new__(type, ob, callback) 882 def __init__(self, ob, callback, slot1, slot2): 883 self.slot1 = slot1 884 self.slot2 = slot2 885 def meth(self): 886 return self.slot1 + self.slot2 887 o = Object(42) 888 r = MyRef(o, None, "abc", "def") 889 self.assertEqual(r.slot1, "abc") 890 self.assertEqual(r.slot2, "def") 891 self.assertEqual(r.meth(), "abcdef") 892 self.assertFalse(hasattr(r, "__dict__")) 893 894 def test_subclass_refs_with_cycle(self): 895 # Bug #3110 896 # An instance of a weakref subclass can have attributes. 897 # If such a weakref holds the only strong reference to the object, 898 # deleting the weakref will delete the object. In this case, 899 # the callback must not be called, because the ref object is 900 # being deleted. 901 class MyRef(weakref.ref): 902 pass 903 904 # Use a local callback, for "regrtest -R::" 905 # to detect refcounting problems 906 def callback(w): 907 self.cbcalled += 1 908 909 o = C() 910 r1 = MyRef(o, callback) 911 r1.o = o 912 del o 913 914 del r1 # Used to crash here 915 916 self.assertEqual(self.cbcalled, 0) 917 918 # Same test, with two weakrefs to the same object 919 # (since code paths are different) 920 o = C() 921 r1 = MyRef(o, callback) 922 r2 = MyRef(o, callback) 923 r1.r = r2 924 r2.o = o 925 del o 926 del r2 927 928 del r1 # Used to crash here 929 930 self.assertEqual(self.cbcalled, 0) 931 932 933class MappingTestCase(TestBase): 934 935 COUNT = 10 936 937 def check_len_cycles(self, dict_type, cons): 938 N = 20 939 items = [RefCycle() for i in range(N)] 940 dct = dict_type(cons(i, o) for i, o in enumerate(items)) 941 # Keep an iterator alive 942 it = dct.iteritems() 943 try: 944 next(it) 945 except StopIteration: 946 pass 947 del items 948 gc.collect() 949 n1 = len(dct) 950 list(it) 951 del it 952 gc.collect() 953 n2 = len(dct) 954 # iteration should prevent garbage collection here 955 # Note that this is a test on an implementation detail. The requirement 956 # is only to provide stable iteration, not that the size of the container 957 # stay fixed. 958 self.assertEqual(n1, 20) 959 #self.assertIn(n1, (0, 1)) 960 self.assertEqual(n2, 0) 961 962 def test_weak_keyed_len_cycles(self): 963 self.check_len_cycles(weakref.WeakKeyDictionary, lambda n, k: (k, n)) 964 965 def test_weak_valued_len_cycles(self): 966 self.check_len_cycles(weakref.WeakValueDictionary, lambda n, k: (n, k)) 967 968 def check_len_race(self, dict_type, cons): 969 # Extended sanity checks for len() in the face of cyclic collection 970 self.addCleanup(gc.set_threshold, *gc.get_threshold()) 971 for th in range(1, 100): 972 N = 20 973 gc.collect(0) 974 gc.set_threshold(th, th, th) 975 items = [RefCycle() for i in range(N)] 976 dct = dict_type(cons(o) for o in items) 977 del items 978 # All items will be collected at next garbage collection pass 979 it = dct.iteritems() 980 try: 981 next(it) 982 except StopIteration: 983 pass 984 n1 = len(dct) 985 del it 986 n2 = len(dct) 987 self.assertGreaterEqual(n1, 0) 988 self.assertLessEqual(n1, N) 989 self.assertGreaterEqual(n2, 0) 990 self.assertLessEqual(n2, n1) 991 992 def test_weak_keyed_len_race(self): 993 self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1)) 994 995 def test_weak_valued_len_race(self): 996 self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k)) 997 998 def test_weak_values(self): 999 # 1000 # This exercises d.copy(), d.items(), d[], del d[], len(d). 1001 # 1002 dict, objects = self.make_weak_valued_dict() 1003 for o in objects: 1004 self.assertEqual(weakref.getweakrefcount(o), 1, 1005 "wrong number of weak references to %r!" % o) 1006 self.assertIs(o, dict[o.arg], 1007 "wrong object returned by weak dict!") 1008 items1 = dict.items() 1009 items2 = dict.copy().items() 1010 items1.sort() 1011 items2.sort() 1012 self.assertEqual(items1, items2, 1013 "cloning of weak-valued dictionary did not work!") 1014 del items1, items2 1015 self.assertEqual(len(dict), self.COUNT) 1016 del objects[0] 1017 self.assertEqual(len(dict), (self.COUNT - 1), 1018 "deleting object did not cause dictionary update") 1019 del objects, o 1020 self.assertEqual(len(dict), 0, 1021 "deleting the values did not clear the dictionary") 1022 # regression on SF bug #447152: 1023 dict = weakref.WeakValueDictionary() 1024 self.assertRaises(KeyError, dict.__getitem__, 1) 1025 dict[2] = C() 1026 self.assertRaises(KeyError, dict.__getitem__, 2) 1027 1028 def test_weak_keys(self): 1029 # 1030 # This exercises d.copy(), d.items(), d[] = v, d[], del d[], 1031 # len(d), in d. 1032 # 1033 dict, objects = self.make_weak_keyed_dict() 1034 for o in objects: 1035 self.assertEqual(weakref.getweakrefcount(o), 1, 1036 "wrong number of weak references to %r!" % o) 1037 self.assertIs(o.arg, dict[o], 1038 "wrong object returned by weak dict!") 1039 items1 = dict.items() 1040 items2 = dict.copy().items() 1041 self.assertEqual(set(items1), set(items2), 1042 "cloning of weak-keyed dictionary did not work!") 1043 del items1, items2 1044 self.assertEqual(len(dict), self.COUNT) 1045 del objects[0] 1046 self.assertEqual(len(dict), (self.COUNT - 1), 1047 "deleting object did not cause dictionary update") 1048 del objects, o 1049 self.assertEqual(len(dict), 0, 1050 "deleting the keys did not clear the dictionary") 1051 o = Object(42) 1052 dict[o] = "What is the meaning of the universe?" 1053 self.assertIn(o, dict) 1054 self.assertNotIn(34, dict) 1055 1056 def test_weak_keyed_iters(self): 1057 dict, objects = self.make_weak_keyed_dict() 1058 self.check_iters(dict) 1059 1060 # Test keyrefs() 1061 refs = dict.keyrefs() 1062 self.assertEqual(len(refs), len(objects)) 1063 objects2 = list(objects) 1064 for wr in refs: 1065 ob = wr() 1066 self.assertIn(ob, dict) 1067 self.assertEqual(ob.arg, dict[ob]) 1068 objects2.remove(ob) 1069 self.assertEqual(len(objects2), 0) 1070 1071 # Test iterkeyrefs() 1072 objects2 = list(objects) 1073 self.assertEqual(len(list(dict.iterkeyrefs())), len(objects)) 1074 for wr in dict.iterkeyrefs(): 1075 ob = wr() 1076 self.assertIn(ob, dict) 1077 self.assertEqual(ob.arg, dict[ob]) 1078 objects2.remove(ob) 1079 self.assertEqual(len(objects2), 0) 1080 1081 def test_weak_valued_iters(self): 1082 dict, objects = self.make_weak_valued_dict() 1083 self.check_iters(dict) 1084 1085 # Test valuerefs() 1086 refs = dict.valuerefs() 1087 self.assertEqual(len(refs), len(objects)) 1088 objects2 = list(objects) 1089 for wr in refs: 1090 ob = wr() 1091 self.assertEqual(ob, dict[ob.arg]) 1092 self.assertEqual(ob.arg, dict[ob.arg].arg) 1093 objects2.remove(ob) 1094 self.assertEqual(len(objects2), 0) 1095 1096 # Test itervaluerefs() 1097 objects2 = list(objects) 1098 self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) 1099 for wr in dict.itervaluerefs(): 1100 ob = wr() 1101 self.assertEqual(ob, dict[ob.arg]) 1102 self.assertEqual(ob.arg, dict[ob.arg].arg) 1103 objects2.remove(ob) 1104 self.assertEqual(len(objects2), 0) 1105 1106 def check_iters(self, dict): 1107 # item iterator: 1108 items = dict.items() 1109 for item in dict.iteritems(): 1110 items.remove(item) 1111 self.assertEqual(len(items), 0, "iteritems() did not touch all items") 1112 1113 # key iterator, via __iter__(): 1114 keys = dict.keys() 1115 for k in dict: 1116 keys.remove(k) 1117 self.assertEqual(len(keys), 0, "__iter__() did not touch all keys") 1118 1119 # key iterator, via iterkeys(): 1120 keys = dict.keys() 1121 for k in dict.iterkeys(): 1122 keys.remove(k) 1123 self.assertEqual(len(keys), 0, "iterkeys() did not touch all keys") 1124 1125 # value iterator: 1126 values = dict.values() 1127 for v in dict.itervalues(): 1128 values.remove(v) 1129 self.assertEqual(len(values), 0, 1130 "itervalues() did not touch all values") 1131 1132 def check_weak_destroy_while_iterating(self, dict, objects, iter_name): 1133 n = len(dict) 1134 it = iter(getattr(dict, iter_name)()) 1135 next(it) # Trigger internal iteration 1136 # Destroy an object 1137 del objects[-1] 1138 gc.collect() # just in case 1139 # We have removed either the first consumed object, or another one 1140 self.assertIn(len(list(it)), [len(objects), len(objects) - 1]) 1141 del it 1142 # The removal has been committed 1143 self.assertEqual(len(dict), n - 1) 1144 1145 def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext): 1146 # Check that we can explicitly mutate the weak dict without 1147 # interfering with delayed removal. 1148 # `testcontext` should create an iterator, destroy one of the 1149 # weakref'ed objects and then return a new key/value pair corresponding 1150 # to the destroyed object. 1151 with testcontext() as (k, v): 1152 self.assertFalse(k in dict) 1153 with testcontext() as (k, v): 1154 self.assertRaises(KeyError, dict.__delitem__, k) 1155 self.assertFalse(k in dict) 1156 with testcontext() as (k, v): 1157 self.assertRaises(KeyError, dict.pop, k) 1158 self.assertFalse(k in dict) 1159 with testcontext() as (k, v): 1160 dict[k] = v 1161 self.assertEqual(dict[k], v) 1162 ddict = copy.copy(dict) 1163 with testcontext() as (k, v): 1164 dict.update(ddict) 1165 self.assertEqual(dict, ddict) 1166 with testcontext() as (k, v): 1167 dict.clear() 1168 self.assertEqual(len(dict), 0) 1169 1170 def test_weak_keys_destroy_while_iterating(self): 1171 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1172 dict, objects = self.make_weak_keyed_dict() 1173 self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys') 1174 self.check_weak_destroy_while_iterating(dict, objects, 'iteritems') 1175 self.check_weak_destroy_while_iterating(dict, objects, 'itervalues') 1176 self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs') 1177 dict, objects = self.make_weak_keyed_dict() 1178 @contextlib.contextmanager 1179 def testcontext(): 1180 try: 1181 it = iter(dict.iteritems()) 1182 next(it) 1183 # Schedule a key/value for removal and recreate it 1184 v = objects.pop().arg 1185 gc.collect() # just in case 1186 yield Object(v), v 1187 finally: 1188 it = None # should commit all removals 1189 gc.collect() 1190 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1191 1192 def test_weak_values_destroy_while_iterating(self): 1193 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1194 dict, objects = self.make_weak_valued_dict() 1195 self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys') 1196 self.check_weak_destroy_while_iterating(dict, objects, 'iteritems') 1197 self.check_weak_destroy_while_iterating(dict, objects, 'itervalues') 1198 self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') 1199 dict, objects = self.make_weak_valued_dict() 1200 @contextlib.contextmanager 1201 def testcontext(): 1202 try: 1203 it = iter(dict.iteritems()) 1204 next(it) 1205 # Schedule a key/value for removal and recreate it 1206 k = objects.pop().arg 1207 gc.collect() # just in case 1208 yield k, Object(k) 1209 finally: 1210 it = None # should commit all removals 1211 gc.collect() 1212 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1213 1214 def test_make_weak_keyed_dict_from_dict(self): 1215 o = Object(3) 1216 dict = weakref.WeakKeyDictionary({o:364}) 1217 self.assertEqual(dict[o], 364) 1218 1219 def test_make_weak_keyed_dict_from_weak_keyed_dict(self): 1220 o = Object(3) 1221 dict = weakref.WeakKeyDictionary({o:364}) 1222 dict2 = weakref.WeakKeyDictionary(dict) 1223 self.assertEqual(dict[o], 364) 1224 1225 def make_weak_keyed_dict(self): 1226 dict = weakref.WeakKeyDictionary() 1227 objects = map(Object, range(self.COUNT)) 1228 for o in objects: 1229 dict[o] = o.arg 1230 return dict, objects 1231 1232 def test_make_weak_valued_dict_misc(self): 1233 # errors 1234 self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__) 1235 self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {}) 1236 self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ()) 1237 # special keyword arguments 1238 o = Object(3) 1239 for kw in 'self', 'other', 'iterable': 1240 d = weakref.WeakValueDictionary(**{kw: o}) 1241 self.assertEqual(list(d.keys()), [kw]) 1242 self.assertEqual(d[kw], o) 1243 1244 def make_weak_valued_dict(self): 1245 dict = weakref.WeakValueDictionary() 1246 objects = map(Object, range(self.COUNT)) 1247 for o in objects: 1248 dict[o.arg] = o 1249 return dict, objects 1250 1251 def check_popitem(self, klass, key1, value1, key2, value2): 1252 weakdict = klass() 1253 weakdict[key1] = value1 1254 weakdict[key2] = value2 1255 self.assertEqual(len(weakdict), 2) 1256 k, v = weakdict.popitem() 1257 self.assertEqual(len(weakdict), 1) 1258 if k is key1: 1259 self.assertIs(v, value1) 1260 else: 1261 self.assertIs(v, value2) 1262 k, v = weakdict.popitem() 1263 self.assertEqual(len(weakdict), 0) 1264 if k is key1: 1265 self.assertIs(v, value1) 1266 else: 1267 self.assertIs(v, value2) 1268 1269 def test_weak_valued_dict_popitem(self): 1270 self.check_popitem(weakref.WeakValueDictionary, 1271 "key1", C(), "key2", C()) 1272 1273 def test_weak_keyed_dict_popitem(self): 1274 self.check_popitem(weakref.WeakKeyDictionary, 1275 C(), "value 1", C(), "value 2") 1276 1277 def check_setdefault(self, klass, key, value1, value2): 1278 self.assertIsNot(value1, value2, 1279 "invalid test" 1280 " -- value parameters must be distinct objects") 1281 weakdict = klass() 1282 o = weakdict.setdefault(key, value1) 1283 self.assertIs(o, value1) 1284 self.assertIn(key, weakdict) 1285 self.assertIs(weakdict.get(key), value1) 1286 self.assertIs(weakdict[key], value1) 1287 1288 o = weakdict.setdefault(key, value2) 1289 self.assertIs(o, value1) 1290 self.assertIn(key, weakdict) 1291 self.assertIs(weakdict.get(key), value1) 1292 self.assertIs(weakdict[key], value1) 1293 1294 def test_weak_valued_dict_setdefault(self): 1295 self.check_setdefault(weakref.WeakValueDictionary, 1296 "key", C(), C()) 1297 1298 def test_weak_keyed_dict_setdefault(self): 1299 self.check_setdefault(weakref.WeakKeyDictionary, 1300 C(), "value 1", "value 2") 1301 1302 def check_update(self, klass, dict): 1303 # 1304 # This exercises d.update(), len(d), d.keys(), in d, 1305 # d.get(), d[]. 1306 # 1307 weakdict = klass() 1308 weakdict.update(dict) 1309 self.assertEqual(len(weakdict), len(dict)) 1310 for k in weakdict.keys(): 1311 self.assertIn(k, dict, 1312 "mysterious new key appeared in weak dict") 1313 v = dict.get(k) 1314 self.assertIs(v, weakdict[k]) 1315 self.assertIs(v, weakdict.get(k)) 1316 for k in dict.keys(): 1317 self.assertIn(k, weakdict, 1318 "original key disappeared in weak dict") 1319 v = dict[k] 1320 self.assertIs(v, weakdict[k]) 1321 self.assertIs(v, weakdict.get(k)) 1322 1323 def test_weak_valued_dict_update(self): 1324 self.check_update(weakref.WeakValueDictionary, 1325 {1: C(), 'a': C(), C(): C()}) 1326 # errors 1327 self.assertRaises(TypeError, weakref.WeakValueDictionary.update) 1328 d = weakref.WeakValueDictionary() 1329 self.assertRaises(TypeError, d.update, {}, {}) 1330 self.assertRaises(TypeError, d.update, (), ()) 1331 self.assertEqual(list(d.keys()), []) 1332 # special keyword arguments 1333 o = Object(3) 1334 for kw in 'self', 'dict', 'other', 'iterable': 1335 d = weakref.WeakValueDictionary() 1336 d.update(**{kw: o}) 1337 self.assertEqual(list(d.keys()), [kw]) 1338 self.assertEqual(d[kw], o) 1339 1340 def test_weak_keyed_dict_update(self): 1341 self.check_update(weakref.WeakKeyDictionary, 1342 {C(): 1, C(): 2, C(): 3}) 1343 1344 def test_weak_keyed_delitem(self): 1345 d = weakref.WeakKeyDictionary() 1346 o1 = Object('1') 1347 o2 = Object('2') 1348 d[o1] = 'something' 1349 d[o2] = 'something' 1350 self.assertEqual(len(d), 2) 1351 del d[o1] 1352 self.assertEqual(len(d), 1) 1353 self.assertEqual(d.keys(), [o2]) 1354 1355 def test_weak_valued_delitem(self): 1356 d = weakref.WeakValueDictionary() 1357 o1 = Object('1') 1358 o2 = Object('2') 1359 d['something'] = o1 1360 d['something else'] = o2 1361 self.assertEqual(len(d), 2) 1362 del d['something'] 1363 self.assertEqual(len(d), 1) 1364 self.assertEqual(d.items(), [('something else', o2)]) 1365 1366 def test_weak_keyed_bad_delitem(self): 1367 d = weakref.WeakKeyDictionary() 1368 o = Object('1') 1369 # An attempt to delete an object that isn't there should raise 1370 # KeyError. It didn't before 2.3. 1371 self.assertRaises(KeyError, d.__delitem__, o) 1372 self.assertRaises(KeyError, d.__getitem__, o) 1373 1374 # If a key isn't of a weakly referencable type, __getitem__ and 1375 # __setitem__ raise TypeError. __delitem__ should too. 1376 self.assertRaises(TypeError, d.__delitem__, 13) 1377 self.assertRaises(TypeError, d.__getitem__, 13) 1378 self.assertRaises(TypeError, d.__setitem__, 13, 13) 1379 1380 def test_weak_keyed_cascading_deletes(self): 1381 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated 1382 # over the keys via self.data.iterkeys(). If things vanished from 1383 # the dict during this (or got added), that caused a RuntimeError. 1384 1385 d = weakref.WeakKeyDictionary() 1386 mutate = False 1387 1388 class C(object): 1389 def __init__(self, i): 1390 self.value = i 1391 def __hash__(self): 1392 return hash(self.value) 1393 def __eq__(self, other): 1394 if mutate: 1395 # Side effect that mutates the dict, by removing the 1396 # last strong reference to a key. 1397 del objs[-1] 1398 return self.value == other.value 1399 1400 objs = [C(i) for i in range(4)] 1401 for o in objs: 1402 d[o] = o.value 1403 del o # now the only strong references to keys are in objs 1404 # Find the order in which iterkeys sees the keys. 1405 objs = d.keys() 1406 # Reverse it, so that the iteration implementation of __delitem__ 1407 # has to keep looping to find the first object we delete. 1408 objs.reverse() 1409 1410 # Turn on mutation in C.__eq__. The first time thru the loop, 1411 # under the iterkeys() business the first comparison will delete 1412 # the last item iterkeys() would see, and that causes a 1413 # RuntimeError: dictionary changed size during iteration 1414 # when the iterkeys() loop goes around to try comparing the next 1415 # key. After this was fixed, it just deletes the last object *our* 1416 # "for o in obj" loop would have gotten to. 1417 mutate = True 1418 count = 0 1419 for o in objs: 1420 count += 1 1421 del d[o] 1422 self.assertEqual(len(d), 0) 1423 self.assertEqual(count, 2) 1424 1425 def test_threaded_weak_valued_setdefault(self): 1426 d = weakref.WeakValueDictionary() 1427 with collect_in_thread(): 1428 for i in range(50000): 1429 x = d.setdefault(10, RefCycle()) 1430 self.assertIsNot(x, None) # we never put None in there! 1431 del x 1432 1433 def test_threaded_weak_valued_pop(self): 1434 d = weakref.WeakValueDictionary() 1435 with collect_in_thread(): 1436 for i in range(50000): 1437 d[10] = RefCycle() 1438 x = d.pop(10, 10) 1439 self.assertIsNot(x, None) # we never put None in there! 1440 1441 def test_threaded_weak_valued_consistency(self): 1442 # Issue #28427: old keys should not remove new values from 1443 # WeakValueDictionary when collecting from another thread. 1444 d = weakref.WeakValueDictionary() 1445 with collect_in_thread(): 1446 for i in range(200000): 1447 o = RefCycle() 1448 d[10] = o 1449 # o is still alive, so the dict can't be empty 1450 self.assertEqual(len(d), 1) 1451 o = None # lose ref 1452 1453 1454from test import mapping_tests 1455 1456class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1457 """Check that WeakValueDictionary conforms to the mapping protocol""" 1458 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} 1459 type2test = weakref.WeakValueDictionary 1460 def _reference(self): 1461 return self.__ref.copy() 1462 1463class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1464 """Check that WeakKeyDictionary conforms to the mapping protocol""" 1465 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} 1466 type2test = weakref.WeakKeyDictionary 1467 def _reference(self): 1468 return self.__ref.copy() 1469 1470libreftest = """ Doctest for examples in the library reference: weakref.rst 1471 1472>>> import weakref 1473>>> class Dict(dict): 1474... pass 1475... 1476>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable 1477>>> r = weakref.ref(obj) 1478>>> print r() is obj 1479True 1480 1481>>> import weakref 1482>>> class Object: 1483... pass 1484... 1485>>> o = Object() 1486>>> r = weakref.ref(o) 1487>>> o2 = r() 1488>>> o is o2 1489True 1490>>> del o, o2 1491>>> print r() 1492None 1493 1494>>> import weakref 1495>>> class ExtendedRef(weakref.ref): 1496... def __init__(self, ob, callback=None, **annotations): 1497... super(ExtendedRef, self).__init__(ob, callback) 1498... self.__counter = 0 1499... for k, v in annotations.iteritems(): 1500... setattr(self, k, v) 1501... def __call__(self): 1502... '''Return a pair containing the referent and the number of 1503... times the reference has been called. 1504... ''' 1505... ob = super(ExtendedRef, self).__call__() 1506... if ob is not None: 1507... self.__counter += 1 1508... ob = (ob, self.__counter) 1509... return ob 1510... 1511>>> class A: # not in docs from here, just testing the ExtendedRef 1512... pass 1513... 1514>>> a = A() 1515>>> r = ExtendedRef(a, foo=1, bar="baz") 1516>>> r.foo 15171 1518>>> r.bar 1519'baz' 1520>>> r()[1] 15211 1522>>> r()[1] 15232 1524>>> r()[0] is a 1525True 1526 1527 1528>>> import weakref 1529>>> _id2obj_dict = weakref.WeakValueDictionary() 1530>>> def remember(obj): 1531... oid = id(obj) 1532... _id2obj_dict[oid] = obj 1533... return oid 1534... 1535>>> def id2obj(oid): 1536... return _id2obj_dict[oid] 1537... 1538>>> a = A() # from here, just testing 1539>>> a_id = remember(a) 1540>>> id2obj(a_id) is a 1541True 1542>>> del a 1543>>> try: 1544... id2obj(a_id) 1545... except KeyError: 1546... print 'OK' 1547... else: 1548... print 'WeakValueDictionary error' 1549OK 1550 1551""" 1552 1553__test__ = {'libreftest' : libreftest} 1554 1555def test_main(): 1556 test_support.run_unittest( 1557 ReferencesTestCase, 1558 MappingTestCase, 1559 WeakValueDictionaryTestCase, 1560 WeakKeyDictionaryTestCase, 1561 SubclassableWeakrefTestCase, 1562 ) 1563 test_support.run_doctest(sys.modules[__name__]) 1564 1565 1566if __name__ == "__main__": 1567 test_main() 1568